Skip to content

Commit

Permalink
refactor: use generics instead of dyn
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj authored and pauldix committed Feb 29, 2024
1 parent e2c4777 commit 42d36d5
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 40 deletions.
27 changes: 16 additions & 11 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::http::HttpApi;
use async_trait::async_trait;
use datafusion::execution::SendableRecordBatchStream;
use hyper::service::service_fn;
use influxdb3_write::{persister, Persister, WriteBuffer};
use influxdb3_write::{Persister, WriteBuffer};
use iox_query::QueryNamespaceProvider;
use observability_deps::tracing::{error, info};
use service::hybrid;
Expand Down Expand Up @@ -115,10 +115,12 @@ impl CommonServerState {
}
}

#[allow(dead_code)]
#[derive(Debug)]
pub struct Server<W, Q> {
pub struct Server<W, Q, P> {
common_state: CommonServerState,
http: Arc<HttpApi<W, Q>>,
persister: Arc<P>,
}

#[async_trait]
Expand All @@ -132,13 +134,14 @@ pub trait QueryExecutor: QueryNamespaceProvider + Debug + Send + Sync + 'static
) -> Result<SendableRecordBatchStream>;
}

impl<W, Q> Server<W, Q>
impl<W, Q, P> Server<W, Q, P>
where
Q: QueryExecutor,
P: Persister,
{
pub fn new(
common_state: CommonServerState,
_persister: Arc<dyn Persister<Error = persister::Error>>,
persister: Arc<P>,
write_buffer: Arc<W>,
query_executor: Arc<Q>,
max_http_request_size: usize,
Expand All @@ -150,14 +153,19 @@ where
max_http_request_size,
));

Self { common_state, http }
Self {
common_state,
http,
persister,
}
}
}

pub async fn serve<W, Q>(server: Server<W, Q>, shutdown: CancellationToken) -> Result<()>
pub async fn serve<W, Q, P>(server: Server<W, Q, P>, shutdown: CancellationToken) -> Result<()>
where
W: WriteBuffer,
Q: QueryExecutor,
P: Persister,
{
// TODO:
// 1. load the persisted catalog and segments from the persister
Expand Down Expand Up @@ -227,7 +235,6 @@ mod tests {
use datafusion::parquet::data_type::AsBytes;
use hyper::{body, Body, Client, Request, Response, StatusCode};
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::{persister, Persister};
use iox_query::exec::{Executor, ExecutorConfig};
use object_store::DynObjectStore;
use parquet_file::storage::{ParquetStorage, StorageId};
Expand Down Expand Up @@ -407,8 +414,7 @@ mod tests {
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
}));
let persister: Arc<dyn Persister<Error = persister::Error>> =
Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));

let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Expand Down Expand Up @@ -582,8 +588,7 @@ mod tests {
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
}));
let persister: Arc<dyn Persister<Error = persister::Error>> =
Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));

let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Expand Down
17 changes: 8 additions & 9 deletions influxdb3_write/src/write_buffer/buffer_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use crate::paths::ParquetFilePath;
use crate::write_buffer::flusher::BufferedWriteResult;
use crate::write_buffer::{parse_validate_and_update_catalog, FieldData, Row, TableBatch};
use crate::{
persister, wal, write_buffer::Result, DatabaseTables, ParquetFile, PersistedSegment, Persister,
Precision, SegmentId, SequenceNumber, TableParquetFiles, WalOp, WalSegmentReader,
WalSegmentWriter,
wal, write_buffer::Result, DatabaseTables, ParquetFile, PersistedSegment, Persister, Precision,
SegmentId, SequenceNumber, TableParquetFiles, WalOp, WalSegmentReader, WalSegmentWriter,
};
use arrow::array::{
ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder, StringDictionaryBuilder,
Expand Down Expand Up @@ -425,10 +424,11 @@ impl ClosedBufferSegment {
}

#[allow(dead_code)]
pub async fn persist(
&self,
persister: Arc<dyn Persister<Error = persister::Error>>,
) -> crate::Result<()> {
pub async fn persist<P>(&self, persister: Arc<P>) -> crate::Result<()>
where
P: Persister,
crate::Error: From<P::Error>,
{
if self.catalog_start_sequence_number != self.catalog_end_sequence_number {
let inner_catalog = self.catalog.clone_inner();

Expand Down Expand Up @@ -591,8 +591,7 @@ mod tests {
let catalog = Arc::new(catalog);
let closed_buffer_segment = open_segment.into_closed_segment(Arc::clone(&catalog));

let persister: Arc<dyn Persister<Error = persister::Error>> =
Arc::new(TestPersister::default());
let persister = Arc::new(TestPersister::default());
closed_buffer_segment
.persist(Arc::clone(&persister))
.await
Expand Down
27 changes: 15 additions & 12 deletions influxdb3_write/src/write_buffer/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use crate::write_buffer::{
buffer_segment::{load_buffer_from_segment, ClosedBufferSegment, OpenBufferSegment},
Result,
};
use crate::{persister, Wal};
use crate::Wal;
use crate::{PersistedCatalog, PersistedSegment, Persister, SegmentId};
use std::sync::Arc;

use super::Error;

const SEGMENTS_TO_LOAD: usize = 1000;

/// The state loaded and initialized from the persister and wal.
Expand All @@ -22,10 +24,15 @@ pub struct LoadedState {
pub persisted_segments: Vec<PersistedSegment>,
}

pub async fn load_starting_state<W: Wal>(
persister: Arc<dyn Persister<Error = persister::Error>>,
pub async fn load_starting_state<W, P>(
persister: Arc<P>,
wal: Option<Arc<W>>,
) -> Result<LoadedState> {
) -> Result<LoadedState>
where
W: Wal,
P: Persister,
Error: From<P::Error>,
{
let PersistedCatalog { catalog, .. } = persister.load_catalog().await?.unwrap_or_default();
let catalog = Arc::new(Catalog::from_inner(catalog));

Expand Down Expand Up @@ -125,8 +132,7 @@ mod tests {
#[tokio::test]
async fn loads_without_wal() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister: Arc<dyn Persister<Error = persister::Error>> =
Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));

let segment_id = SegmentId::new(4);
let segment_writer = Box::new(WalSegmentWriterNoopImpl::new(segment_id));
Expand Down Expand Up @@ -192,8 +198,7 @@ mod tests {
#[tokio::test]
async fn loads_with_no_persisted_segments_and_wal() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister: Arc<dyn Persister<Error = persister::Error>> =
Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal = Arc::new(WalImpl::new(dir.clone()).unwrap());
let db_name = "db1";
Expand Down Expand Up @@ -267,8 +272,7 @@ mod tests {
#[tokio::test]
async fn loads_with_persisted_segments_and_wal() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister: Arc<dyn Persister<Error = persister::Error>> =
Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal = Arc::new(WalImpl::new(dir.clone()).unwrap());
let db_name = "db1";
Expand Down Expand Up @@ -418,8 +422,7 @@ mod tests {
#[tokio::test]
async fn loads_with_persisting() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister: Arc<dyn Persister<Error = persister::Error>> =
Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal = Arc::new(WalImpl::new(dir.clone()).unwrap());
let db_name = "db1";
Expand Down
16 changes: 8 additions & 8 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::write_buffer::buffer_segment::{ClosedBufferSegment, OpenBufferSegment
use crate::write_buffer::flusher::WriteBufferFlusher;
use crate::write_buffer::loader::load_starting_state;
use crate::{
persister, BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister,
Precision, SegmentId, Wal, WalOp, WriteBuffer, WriteLineError,
BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister, Precision,
SegmentId, Wal, WalOp, WriteBuffer, WriteLineError,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
Expand Down Expand Up @@ -95,10 +95,11 @@ impl SegmentState {
}

impl<W: Wal> WriteBufferImpl<W> {
pub async fn new(
persister: Arc<dyn Persister<Error = persister::Error>>,
wal: Option<Arc<W>>,
) -> Result<Self> {
pub async fn new<P>(persister: Arc<P>, wal: Option<Arc<W>>) -> Result<Self>
where
P: Persister,
Error: From<P::Error>,
{
let loaded_state = load_starting_state(persister, wal.clone()).await?;
let segment_state = Arc::new(RwLock::new(SegmentState::new(loaded_state.open_segment)));

Expand Down Expand Up @@ -752,8 +753,7 @@ mod tests {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal = WalImpl::new(dir.clone()).unwrap();
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister: Arc<dyn Persister<Error = persister::Error>> =
Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let write_buffer = WriteBufferImpl::new(Arc::clone(&persister), Some(Arc::new(wal)))
.await
.unwrap();
Expand Down

0 comments on commit 42d36d5

Please sign in to comment.