diff --git a/Cargo.toml b/Cargo.toml index 395dafbfd3..6396bf5011 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,6 +103,7 @@ http = "0.2.9" itertools = "0.12" lazy_static = "1" log = "0.4" +mockall = { version = "0.12.1" } mock_instant = { version = "0.3.1", features = ["sync"] } moka = "0.11" num-traits = "0.2" diff --git a/python/src/file.rs b/python/src/file.rs index 470f5d3a1b..bb361c3368 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -23,7 +23,7 @@ use lance_file::v2::{ reader::{BufferDescriptor, CachedFileMetadata, FileReader}, writer::{FileWriter, FileWriterOptions}, }; -use lance_io::{scheduler::StoreScheduler, ReadBatchParams}; +use lance_io::{scheduler::ScanScheduler, ReadBatchParams}; use object_store::path::Path; use pyo3::{ exceptions::{PyIOError, PyRuntimeError, PyValueError}, @@ -267,7 +267,7 @@ impl LanceFileReader { let io_parallelism = std::env::var("IO_THREADS") .map(|val| val.parse::().unwrap_or(8)) .unwrap_or(8); - let scheduler = StoreScheduler::new(Arc::new(object_store), io_parallelism); + let scheduler = ScanScheduler::new(Arc::new(object_store), io_parallelism); let file = scheduler.open_file(&path).await.infer_error()?; let inner = FileReader::try_open(file, None).await.infer_error()?; Ok(Self { diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 0c7e0014b3..f271158379 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -537,8 +537,12 @@ impl DecodeBatchScheduler { let range = range.start as u32..range.end as u32; - self.root_scheduler - .schedule_ranges(&[range.clone()], scheduler, &sink)?; + self.root_scheduler.schedule_ranges( + &[range.clone()], + scheduler, + &sink, + range.start as u64, + )?; trace!("Finished scheduling of range {:?}", range); Ok(()) @@ -567,8 +571,11 @@ impl DecodeBatchScheduler { format!("{}, ..., {}", indices[0], indices[indices.len() - 1]) } ); + if indices.is_empty() { + return Ok(()); + } self.root_scheduler - .schedule_take(indices, scheduler, &sink)?; + .schedule_take(indices, scheduler, &sink, indices[0] as u64)?; trace!("Finished scheduling take of {} rows", indices.len()); Ok(()) } @@ -740,10 +747,13 @@ pub trait PhysicalPageScheduler: Send + Sync + std::fmt::Debug { /// * `range` - the range of row offsets (relative to start of page) requested /// these must be ordered and must not overlap /// * `scheduler` - a scheduler to submit the I/O request to + /// * `top_level_row` - the row offset of the top level field currently being + /// scheduled. This can be used to assign priority to I/O requests fn schedule_ranges( &self, ranges: &[Range], scheduler: &dyn EncodingsIo, + top_level_row: u64, ) -> BoxFuture<'static, Result>>; } @@ -780,6 +790,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug { ranges: &[Range], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()>; /// Schedules I/O for the requested rows (identified by row offsets from start of page) /// TODO: implement this using schedule_ranges @@ -788,6 +799,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug { indices: &[u32], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()>; /// The number of rows covered by this page fn num_rows(&self) -> u32; diff --git a/rust/lance-encoding/src/encodings/logical/binary.rs b/rust/lance-encoding/src/encodings/logical/binary.rs index f4f6fb9eda..a71037780e 100644 --- a/rust/lance-encoding/src/encodings/logical/binary.rs +++ b/rust/lance-encoding/src/encodings/logical/binary.rs @@ -47,11 +47,12 @@ impl LogicalPageScheduler for BinaryPageScheduler { ranges: &[std::ops::Range], scheduler: &Arc, sink: &tokio::sync::mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { trace!("Scheduling binary for {} ranges", ranges.len()); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); self.varbin_scheduler - .schedule_ranges(ranges, scheduler, &tx)?; + .schedule_ranges(ranges, scheduler, &tx, top_level_row)?; while let Some(decoder) = rx.recv().now_or_never() { let wrapped = BinaryPageDecoder { @@ -69,6 +70,7 @@ impl LogicalPageScheduler for BinaryPageScheduler { indices: &[u32], scheduler: &Arc, sink: &tokio::sync::mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { trace!("Scheduling binary for {} indices", indices.len()); self.schedule_ranges( @@ -78,6 +80,7 @@ impl LogicalPageScheduler for BinaryPageScheduler { .collect::>(), scheduler, sink, + top_level_row, ) } diff --git a/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs b/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs index 6240e9a723..44ead7fa63 100644 --- a/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs +++ b/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs @@ -53,6 +53,7 @@ impl LogicalPageScheduler for FslPageScheduler { ranges: &[Range], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { let expanded_ranges = ranges .iter() @@ -64,7 +65,7 @@ impl LogicalPageScheduler for FslPageScheduler { ); let (tx, mut rx) = mpsc::unbounded_channel(); self.items_scheduler - .schedule_ranges(&expanded_ranges, scheduler, &tx)?; + .schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?; let inner_page_decoder = rx.blocking_recv().unwrap(); sink.send(Box::new(FslPageDecoder { inner: inner_page_decoder, @@ -79,6 +80,7 @@ impl LogicalPageScheduler for FslPageScheduler { indices: &[u32], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { self.schedule_ranges( &indices @@ -87,6 +89,7 @@ impl LogicalPageScheduler for FslPageScheduler { .collect::>(), scheduler, sink, + top_level_row, ) } diff --git a/rust/lance-encoding/src/encodings/logical/list.rs b/rust/lance-encoding/src/encodings/logical/list.rs index ace00b9810..912be8bcb2 100644 --- a/rust/lance-encoding/src/encodings/logical/list.rs +++ b/rust/lance-encoding/src/encodings/logical/list.rs @@ -227,6 +227,7 @@ impl LogicalPageScheduler for ListPageScheduler { ranges: &[std::ops::Range], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { // TODO: Shortcut here if the request covers the entire range (can be determined by // the first_invalid_offset). If this is the case we don't need any indirect I/O. We @@ -258,7 +259,7 @@ impl LogicalPageScheduler for ListPageScheduler { // to this page. let (tx, mut rx) = mpsc::unbounded_channel(); self.offsets_scheduler - .schedule_ranges(&offsets_ranges, scheduler, &tx)?; + .schedule_ranges(&offsets_ranges, scheduler, &tx, top_level_row)?; let mut scheduled_offsets = rx.try_recv().unwrap(); let items_schedulers = self.items_schedulers.clone(); let ranges = ranges.to_vec(); @@ -319,7 +320,17 @@ impl LogicalPageScheduler for ListPageScheduler { // All requested items are past this page, continue row_offset += next_scheduler.num_rows() as u64; if !next_item_ranges.is_empty() { - next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?; + // Note: we are providing the same top_level_row to ALL items pages referenced by + // this offsets page. This gives them higher priority. + // TODO: Ideally we would ALSO have a guarantee from the scheduler that items with + // the same top_level_row are scheduled in FCFS order but I don't think it works + // that way. Still, this is probably good enough for a while + next_scheduler.schedule_ranges( + &next_item_ranges, + &scheduler, + &tx, + top_level_row, + )?; next_item_ranges.clear(); } next_scheduler = item_schedulers.pop_front().unwrap(); @@ -342,14 +353,24 @@ impl LogicalPageScheduler for ListPageScheduler { next_item_ranges.push(page_range); row_offset += next_scheduler.num_rows() as u64; if !next_item_ranges.is_empty() { - next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?; + next_scheduler.schedule_ranges( + &next_item_ranges, + &scheduler, + &tx, + top_level_row, + )?; next_item_ranges.clear(); } next_scheduler = item_schedulers.pop_front().unwrap(); } } if !next_item_ranges.is_empty() { - next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?; + next_scheduler.schedule_ranges( + &next_item_ranges, + &scheduler, + &tx, + top_level_row, + )?; } let mut item_decoders = Vec::new(); drop(tx); @@ -388,6 +409,7 @@ impl LogicalPageScheduler for ListPageScheduler { indices: &[u32], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { trace!("Scheduling list offsets for {} indices", indices.len()); self.schedule_ranges( @@ -397,6 +419,7 @@ impl LogicalPageScheduler for ListPageScheduler { .collect::>(), scheduler, sink, + top_level_row, ) } } diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 5d4cc52553..c6291650d8 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -80,12 +80,13 @@ impl LogicalPageScheduler for PrimitivePageScheduler { ranges: &[std::ops::Range], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { let num_rows = ranges.iter().map(|r| r.end - r.start).sum(); trace!("Scheduling ranges {:?} from physical page", ranges); - let physical_decoder = self - .physical_decoder - .schedule_ranges(ranges, scheduler.as_ref()); + let physical_decoder = + self.physical_decoder + .schedule_ranges(ranges, scheduler.as_ref(), top_level_row); let logical_decoder = PrimitiveFieldDecoder { data_type: self.data_type.clone(), @@ -104,6 +105,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler { indices: &[u32], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { trace!( "Scheduling take of {} indices from physical page", @@ -116,6 +118,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler { .collect::>(), scheduler, sink, + top_level_row, ) } } diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 2d880bd3d9..ee1fd33bb8 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -110,6 +110,7 @@ impl LogicalPageScheduler for SimpleStructScheduler { ranges: &[Range], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { for range in ranges.iter().cloned() { let mut rows_to_read = range.end - range.start; @@ -156,6 +157,8 @@ impl LogicalPageScheduler for SimpleStructScheduler { // The downside of the current algorithm is that many tiny I/O batches means less opportunity for in-batch coalescing. // Then again, if our outer batch coalescing is super good then maybe we don't bother + let mut current_top_level_row = top_level_row; + while rows_to_read > 0 { let mut min_rows_added = u32::MAX; for (col_idx, field_scheduler) in self.children.iter().enumerate() { @@ -183,7 +186,12 @@ impl LogicalPageScheduler for SimpleStructScheduler { page_range_start, next_page ); - next_page.schedule_ranges(&[page_range], scheduler, sink)?; + next_page.schedule_ranges( + &[page_range], + scheduler, + sink, + current_top_level_row, + )?; status.rows_queued += rows_to_take; status.rows_to_take -= rows_to_take; @@ -199,6 +207,7 @@ impl LogicalPageScheduler for SimpleStructScheduler { panic!("Error in scheduling logic, panic to avoid infinite loop"); } rows_to_read -= min_rows_added; + current_top_level_row += min_rows_added as u64; for field_status in &mut field_status { field_status.rows_queued -= min_rows_added; } @@ -216,6 +225,7 @@ impl LogicalPageScheduler for SimpleStructScheduler { indices: &[u32], scheduler: &Arc, sink: &mpsc::UnboundedSender>, + top_level_row: u64, ) -> Result<()> { trace!("Scheduling struct decode of {} indices", indices.len()); @@ -236,7 +246,7 @@ impl LogicalPageScheduler for SimpleStructScheduler { let mut rows_to_read = indices.len() as u32; // NOTE: See schedule_range for a description of the scheduling algorithm - + let mut current_top_level_row = top_level_row; while rows_to_read > 0 { let mut min_rows_added = u32::MAX; for (col_idx, field_scheduler) in self.children.iter().enumerate() { @@ -269,7 +279,12 @@ impl LogicalPageScheduler for SimpleStructScheduler { // We should be guaranteed to get at least one page let next_page = next_page.unwrap(); - next_page.schedule_take(&indices_in_page, scheduler, sink)?; + next_page.schedule_take( + &indices_in_page, + scheduler, + sink, + current_top_level_row, + )?; let rows_scheduled = indices_in_page.len() as u32; status.rows_queued += rows_scheduled; @@ -281,6 +296,7 @@ impl LogicalPageScheduler for SimpleStructScheduler { panic!("Error in scheduling logic, panic to avoid infinite loop"); } rows_to_read -= min_rows_added; + current_top_level_row += min_rows_added as u64; for field_status in &mut field_status { field_status.rows_queued -= min_rows_added; } diff --git a/rust/lance-encoding/src/encodings/physical/basic.rs b/rust/lance-encoding/src/encodings/physical/basic.rs index 5ea0ee7790..88befa5520 100644 --- a/rust/lance-encoding/src/encodings/physical/basic.rs +++ b/rust/lance-encoding/src/encodings/physical/basic.rs @@ -123,18 +123,27 @@ impl PhysicalPageScheduler for BasicPageScheduler { &self, ranges: &[std::ops::Range], scheduler: &dyn EncodingsIo, + top_level_row: u64, ) -> BoxFuture<'static, Result>> { let validity_future = match &self.mode { SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None, SchedulerNullStatus::Some(schedulers) => { trace!("Scheduling ranges {:?} from validity", ranges); - Some(schedulers.validity.schedule_ranges(ranges, scheduler)) + Some( + schedulers + .validity + .schedule_ranges(ranges, scheduler, top_level_row), + ) } }; let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() { trace!("Scheduling range {:?} from values", ranges); - Some(values_scheduler.schedule_ranges(ranges, scheduler).boxed()) + Some( + values_scheduler + .schedule_ranges(ranges, scheduler, top_level_row) + .boxed(), + ) } else { trace!("No values fetch needed since values all null"); None diff --git a/rust/lance-encoding/src/encodings/physical/bitmap.rs b/rust/lance-encoding/src/encodings/physical/bitmap.rs index 502ee2c066..97c1079b03 100644 --- a/rust/lance-encoding/src/encodings/physical/bitmap.rs +++ b/rust/lance-encoding/src/encodings/physical/bitmap.rs @@ -35,6 +35,7 @@ impl PhysicalPageScheduler for DenseBitmapScheduler { &self, ranges: &[Range], scheduler: &dyn EncodingsIo, + top_level_row: u64, ) -> BoxFuture<'static, Result>> { let mut min = u64::MAX; let mut max = 0; @@ -62,7 +63,7 @@ impl PhysicalPageScheduler for DenseBitmapScheduler { min, max ); - let bytes = scheduler.submit_request(byte_ranges); + let bytes = scheduler.submit_request(byte_ranges, top_level_row); async move { let bytes = bytes.await?; diff --git a/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs b/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs index 908efc9975..a3a1bfc797 100644 --- a/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs +++ b/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs @@ -36,6 +36,7 @@ impl PhysicalPageScheduler for FixedListScheduler { &self, ranges: &[std::ops::Range], scheduler: &dyn EncodingsIo, + top_level_row: u64, ) -> BoxFuture<'static, Result>> { let expanded_ranges = ranges .iter() @@ -49,9 +50,9 @@ impl PhysicalPageScheduler for FixedListScheduler { expanded_ranges[0].start, expanded_ranges[expanded_ranges.len() - 1].end ); - let inner_page_decoder = self - .items_scheduler - .schedule_ranges(&expanded_ranges, scheduler); + let inner_page_decoder = + self.items_scheduler + .schedule_ranges(&expanded_ranges, scheduler, top_level_row); let dimension = self.dimension; async move { let items_decoder = inner_page_decoder.await?; diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index ed79141b4f..6d4c448d6f 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -43,6 +43,7 @@ impl PhysicalPageScheduler for ValuePageScheduler { &self, ranges: &[std::ops::Range], scheduler: &dyn EncodingsIo, + top_level_row: u64, ) -> BoxFuture<'static, Result>> { let mut min = u64::MAX; let mut max = 0; @@ -63,7 +64,7 @@ impl PhysicalPageScheduler for ValuePageScheduler { min, max ); - let bytes = scheduler.submit_request(byte_ranges); + let bytes = scheduler.submit_request(byte_ranges, top_level_row); let bytes_per_value = self.bytes_per_value; async move { diff --git a/rust/lance-encoding/src/lib.rs b/rust/lance-encoding/src/lib.rs index 76a72bbf0c..805306e7bc 100644 --- a/rust/lance-encoding/src/lib.rs +++ b/rust/lance-encoding/src/lib.rs @@ -31,7 +31,17 @@ pub trait EncodingsIo: Send + Sync { /// # Arguments /// /// * `ranges` - the byte ranges to request - fn submit_request(&self, range: Vec>) -> BoxFuture<'static, Result>>; + /// * `priority` - the priority of the request + /// + /// Priority should be set to the lowest row number that this request is delivering data for. + /// This is important in cases where indirect I/O causes high priority requests to be submitted + /// after low priority requests. We want to fulfill the indirect I/O more quickly so that we + /// can decode as quickly as possible. + fn submit_request( + &self, + range: Vec>, + priority: u64, + ) -> BoxFuture<'static, Result>>; } /// An implementation of EncodingsIo that serves data from an in-memory buffer @@ -50,7 +60,11 @@ impl BufferScheduler { } impl EncodingsIo for BufferScheduler { - fn submit_request(&self, ranges: Vec>) -> BoxFuture<'static, Result>> { + fn submit_request( + &self, + ranges: Vec>, + _priority: u64, + ) -> BoxFuture<'static, Result>> { std::future::ready(Ok(ranges .into_iter() .map(|range| self.satisfy_request(range)) diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index ec3ed96e66..58a2d4da47 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -45,7 +45,11 @@ impl SimulatedScheduler { } impl EncodingsIo for SimulatedScheduler { - fn submit_request(&self, ranges: Vec>) -> BoxFuture<'static, Result>> { + fn submit_request( + &self, + ranges: Vec>, + _priority: u64, + ) -> BoxFuture<'static, Result>> { std::future::ready(Ok(ranges .into_iter() .map(|range| self.satisfy_request(range)) diff --git a/rust/lance-file/benches/reader.rs b/rust/lance-file/benches/reader.rs index 1c63a674bc..0f4b586774 100644 --- a/rust/lance-file/benches/reader.rs +++ b/rust/lance-file/benches/reader.rs @@ -9,7 +9,7 @@ use lance_file::v2::{ reader::FileReader, writer::{FileWriter, FileWriterOptions}, }; -use lance_io::{object_store::ObjectStore, scheduler::StoreScheduler}; +use lance_io::{object_store::ObjectStore, scheduler::ScanScheduler}; fn bench_reader(c: &mut Criterion) { let mut group = c.benchmark_group("reader"); @@ -44,7 +44,7 @@ fn bench_reader(c: &mut Criterion) { let file_path = &file_path; let data = &data; rt.block_on(async move { - let store_scheduler = StoreScheduler::new(Arc::new(object_store.clone()), 8); + let store_scheduler = ScanScheduler::new(Arc::new(object_store.clone()), 8); let scheduler = store_scheduler.open_file(file_path).await.unwrap(); let reader = FileReader::try_open(scheduler.clone(), None).await.unwrap(); let mut stream = reader diff --git a/rust/lance-file/src/v2/io.rs b/rust/lance-file/src/v2/io.rs index e522a492aa..8b0e8f485a 100644 --- a/rust/lance-file/src/v2/io.rs +++ b/rust/lance-file/src/v2/io.rs @@ -12,7 +12,8 @@ impl EncodingsIo for LanceEncodingsIo { fn submit_request( &self, range: Vec>, + priority: u64, ) -> BoxFuture<'static, lance_core::Result>> { - self.0.submit_request(range).boxed() + self.0.submit_request(range, priority).boxed() } } diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 2bd4c38768..f34115b02e 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -128,7 +128,7 @@ impl FileReader { } else { file_size - scheduler.reader().block_size() as u64 }; - let tail_bytes = scheduler.submit_single(begin..file_size).await?; + let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?; Ok((tail_bytes, file_size)) } @@ -198,7 +198,7 @@ impl FileReader { // We can't just grab col_meta_start..cmo_table_start because there may be padding // between the last column and the start of the cmo table. let column_metadata_range = column_metadata_start..footer.global_buff_start; - let column_metadata_bytes = scheduler.submit_single(column_metadata_range).await?; + let column_metadata_bytes = scheduler.submit_single(column_metadata_range, 0).await?; // cmo == column_metadata_offsets let cmo_table_size = 16 * footer.num_columns as usize; @@ -232,6 +232,7 @@ impl FileReader { let missing_bytes = scheduler .submit_single( footer.column_meta_start..footer.column_meta_start + num_bytes_missing, + 0, ) .await; let mut combined = @@ -762,7 +763,7 @@ mod tests { use lance_core::datatypes::Schema; use lance_datagen::{array, gen, BatchCount, RowCount}; use lance_io::{ - object_store::ObjectStore, scheduler::StoreScheduler, stream::RecordBatchStream, + object_store::ObjectStore, scheduler::ScanScheduler, stream::RecordBatchStream, }; use log::debug; use object_store::path::Path; @@ -777,7 +778,7 @@ mod tests { _tmp_dir: TempDir, tmp_path: Path, object_store: Arc, - scheduler: Arc, + scheduler: Arc, } impl Default for FsFixture { @@ -787,7 +788,7 @@ mod tests { let tmp_path = Path::parse(tmp_path).unwrap(); let tmp_path = tmp_path.child("some_file.lance"); let object_store = Arc::new(ObjectStore::local()); - let scheduler = StoreScheduler::new(object_store.clone(), 8); + let scheduler = ScanScheduler::new(object_store.clone(), 8); Self { _tmp_dir: tmp_dir, object_store, diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index b738284445..3abc52d376 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -37,18 +37,19 @@ pin-project.workspace = true prost.workspace = true shellexpand.workspace = true snafu.workspace = true -tokio-stream.workspace = true tokio.workspace = true tracing.workspace = true url.workspace = true path_abs.workspace = true rand.workspace = true +async-priority-channel = "0.2.0" [dev-dependencies] criterion.workspace = true parquet.workspace = true pprof.workspace = true tempfile.workspace = true +mockall.workspace = true [build-dependencies] prost-build.workspace = true diff --git a/rust/lance-io/benches/scheduler.rs b/rust/lance-io/benches/scheduler.rs index 9e30a83c21..996731767e 100644 --- a/rust/lance-io/benches/scheduler.rs +++ b/rust/lance-io/benches/scheduler.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use lance_core::Result; -use lance_io::{object_store::ObjectStore, scheduler::StoreScheduler}; +use lance_io::{object_store::ObjectStore, scheduler::ScanScheduler}; use object_store::path::Path; use rand::{seq::SliceRandom, RngCore}; use std::{fmt::Display, process::Command, sync::Arc}; @@ -88,7 +88,7 @@ fn bench_full_read(c: &mut Criterion) { .unwrap(); } runtime.block_on(async { - let scheduler = StoreScheduler::new(obj_store, params.io_parallelism); + let scheduler = ScanScheduler::new(obj_store, params.io_parallelism); let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap(); let (tx, rx) = mpsc::channel(1024); @@ -97,7 +97,7 @@ fn bench_full_read(c: &mut Criterion) { while offset < DATA_SIZE { #[allow(clippy::single_range_in_vec_init)] let req = vec![offset..(offset + params.page_size)]; - let req = file_scheduler.submit_request(req); + let req = file_scheduler.submit_request(req, 0); tx.send(req).await.unwrap(); offset += params.page_size; } @@ -174,7 +174,7 @@ fn bench_random_read(c: &mut Criterion) { .unwrap(); } runtime.block_on(async { - let scheduler = StoreScheduler::new(obj_store, params.io_parallelism); + let scheduler = ScanScheduler::new(obj_store, params.io_parallelism); let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap(); let (tx, rx) = mpsc::channel(1024); @@ -189,7 +189,7 @@ fn bench_random_read(c: &mut Criterion) { }) .collect::>(); idx += INDICES_PER_BATCH as usize; - let req = file_scheduler.submit_request(iops); + let req = file_scheduler.submit_request(iops, 0); tx.send(req).await.unwrap(); } drop(tx); diff --git a/rust/lance-io/src/lib.rs b/rust/lance-io/src/lib.rs index 467b155d4d..e2f34a42b9 100644 --- a/rust/lance-io/src/lib.rs +++ b/rust/lance-io/src/lib.rs @@ -16,6 +16,8 @@ pub mod object_store; pub mod object_writer; pub mod scheduler; pub mod stream; +#[cfg(test)] +pub mod testing; pub mod traits; pub mod utils; diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index e57e9eaafe..8a12f8a8d6 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -3,14 +3,15 @@ use bytes::Bytes; use futures::channel::oneshot; +use futures::stream::BoxStream; use futures::{FutureExt, StreamExt, TryFutureExt}; use object_store::path::Path; use snafu::{location, Location}; +use std::cmp::Reverse; +use std::fmt::Debug; use std::future::Future; use std::ops::Range; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; use lance_core::{Error, Result}; @@ -95,10 +96,25 @@ impl IoTask { } } +fn receiver_to_stream( + tasks: async_priority_channel::Receiver, +) -> BoxStream<'static, T> { + futures::stream::unfold(tasks, |state| async move { + match state.recv().await { + Ok(val) => Some((val.0, state)), + Err(async_priority_channel::RecvError) => None, + } + }) + .boxed() +} + // Every time a scheduler starts up it launches a task to run the I/O loop. This loop // repeats endlessly until the scheduler is destroyed. -async fn run_io_loop(tasks: mpsc::UnboundedReceiver, io_capacity: u32) { - let io_stream = UnboundedReceiverStream::new(tasks); +async fn run_io_loop( + tasks: async_priority_channel::Receiver>, + io_capacity: u32, +) { + let io_stream = receiver_to_stream(tasks); let tokio_task_stream = io_stream.map(|task| tokio::spawn(task.run())); let mut tokio_task_stream = tokio_task_stream.buffer_unordered(io_capacity as usize); while tokio_task_stream.next().await.is_some() { @@ -109,17 +125,26 @@ async fn run_io_loop(tasks: mpsc::UnboundedReceiver, io_capacity: u32) { } } -/// An I/O scheduler which wraps an ObjectStore and throttles the amount of\ +/// An I/O scheduler which wraps an ObjectStore and throttles the amount of /// parallel I/O that can be run. /// /// TODO: This will also add coalescing -#[derive(Debug)] -pub struct StoreScheduler { +pub struct ScanScheduler { object_store: Arc, - io_submitter: mpsc::UnboundedSender, + io_submitter: async_priority_channel::Sender>, + file_counter: Mutex, +} + +impl Debug for ScanScheduler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ScanScheduler") + .field("object_store", &self.object_store) + .field("file_counter", &self.file_counter) + .finish() + } } -impl StoreScheduler { +impl ScanScheduler { /// Create a new scheduler with the given I/O capacity /// /// # Arguments @@ -137,10 +162,11 @@ impl StoreScheduler { // Once the reader is finished we should revisit. We will probably want to convert // from `when_done` futures to delivering data into a queue. That queue should fill // up, causing the I/O loop to pause. - let (reg_tx, reg_rx) = mpsc::unbounded_channel(); + let (reg_tx, reg_rx) = async_priority_channel::unbounded(); let scheduler = Self { object_store, io_submitter: reg_tx, + file_counter: Mutex::new(0), }; tokio::task::spawn(async move { run_io_loop(reg_rx, io_capacity).await }); Arc::new(scheduler) @@ -149,9 +175,13 @@ impl StoreScheduler { /// Open a file for reading pub async fn open_file(self: &Arc, path: &Path) -> Result { let reader = self.object_store.open(path).await?; + let mut file_counter = self.file_counter.lock().unwrap(); + let file_index = *file_counter; + *file_counter += 1; Ok(FileScheduler { reader: reader.into(), root: self.clone(), + file_index, }) } @@ -160,11 +190,12 @@ impl StoreScheduler { reader: Arc, request: Vec>, tx: oneshot::Sender>>, + priority: u128, ) { let num_iops = request.len() as u32; let when_all_io_done = move |bytes| { - // We don't care if the receiver has given up + // We don't care if the receiver has given up so discard the result let _ = tx.send(bytes); }; @@ -183,7 +214,7 @@ impl StoreScheduler { dest.deliver_data(bytes.map(|bytes| (task_idx, bytes))); }), }; - if self.io_submitter.send(task).is_err() { + if self.io_submitter.try_send(task, Reverse(priority)).is_err() { panic!("unable to submit I/O because the I/O thread has panic'd"); } } @@ -193,10 +224,11 @@ impl StoreScheduler { &self, reader: Arc, request: Vec>, + priority: u128, ) -> impl Future>> + Send { let (tx, rx) = oneshot::channel::>>(); - self.do_submit_request(reader, request, tx); + self.do_submit_request(reader, request, tx, priority); // Right now, it isn't possible for I/O to be cancelled so a cancel error should // not occur @@ -208,7 +240,8 @@ impl StoreScheduler { #[derive(Clone, Debug)] pub struct FileScheduler { reader: Arc, - root: Arc, + root: Arc, + file_index: u32, } impl FileScheduler { @@ -219,16 +252,24 @@ impl FileScheduler { pub fn submit_request( &self, request: Vec>, + priority: u64, ) -> impl Future>> + Send { - self.root.submit_request(self.reader.clone(), request) + // The final priority is a combination of the row offset and the file number + let priority = ((self.file_index as u128) << 64) + priority as u128; + self.root + .submit_request(self.reader.clone(), request, priority) } /// Submit a single IOP to the reader /// /// If you have multpile IOPS to perform then [`Self::submit_request`] is going /// to be more efficient. - pub fn submit_single(&self, range: Range) -> impl Future> + Send { - self.submit_request(vec![range]) + pub fn submit_single( + &self, + range: Range, + priority: u64, + ) -> impl Future> + Send { + self.submit_request(vec![range], priority) .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap()) } @@ -244,11 +285,18 @@ impl FileScheduler { #[cfg(test)] mod tests { - use std::collections::VecDeque; + use std::{collections::VecDeque, time::Duration}; + use futures::poll; use rand::RngCore; use tempfile::tempdir; + use object_store::{memory::InMemory, ObjectStore as OSObjectStore}; + use tokio::time::timeout; + use url::Url; + + use crate::testing::MockObjectStore; + use super::*; #[tokio::test] @@ -266,7 +314,7 @@ mod tests { rand::thread_rng().fill_bytes(&mut some_data); obj_store.put(&tmp_file, &some_data).await.unwrap(); - let scheduler = StoreScheduler::new(obj_store, 16); + let scheduler = ScanScheduler::new(obj_store, 16); let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap(); @@ -278,7 +326,7 @@ mod tests { reqs.push_back( #[allow(clippy::single_range_in_vec_init)] file_scheduler - .submit_request(vec![offset..offset + READ_SIZE]) + .submit_request(vec![offset..offset + READ_SIZE], 0) .await .unwrap(), ); @@ -295,4 +343,83 @@ mod tests { offset += READ_SIZE; } } + + #[tokio::test] + async fn test_priority() { + let some_path = Path::parse("foo").unwrap(); + let base_store = Arc::new(InMemory::new()); + base_store + .put(&some_path, Bytes::from(vec![0; 1000])) + .await + .unwrap(); + + let semaphore = Arc::new(tokio::sync::Semaphore::new(0)); + let mut obj_store = MockObjectStore::default(); + let semaphore_copy = semaphore.clone(); + obj_store + .expect_get_opts() + .returning(move |location, options| { + let semaphore = semaphore.clone(); + let base_store = base_store.clone(); + let location = location.clone(); + async move { + semaphore.acquire().await.unwrap().forget(); + base_store.get_opts(&location, options).await + } + .boxed() + }); + let obj_store = Arc::new(ObjectStore::new( + Arc::new(obj_store), + Url::parse("mem://").unwrap(), + None, + None, + )); + + let scan_scheduler = ScanScheduler::new(obj_store, 1); + + let file_scheduler = scan_scheduler + .open_file(&Path::parse("foo").unwrap()) + .await + .unwrap(); + + // Issue a request, priority doesn't matter, it will be submitted + // immediately (it will go pending) + // Note: the timeout is to prevent a deadlock if the test fails. + let first_fut = timeout( + Duration::from_secs(10), + file_scheduler.submit_single(0..10, 0), + ) + .boxed(); + + // Issue another low priority request (it will go in queue) + let mut second_fut = timeout( + Duration::from_secs(10), + file_scheduler.submit_single(0..20, 100), + ) + .boxed(); + + // Issue a high priority request (it will go in queue and should bump + // the other queued request down) + let mut third_fut = timeout( + Duration::from_secs(10), + file_scheduler.submit_single(0..30, 0), + ) + .boxed(); + + // Finish one file, should be the in-flight first request + semaphore_copy.add_permits(1); + assert!(first_fut.await.unwrap().unwrap().len() == 10); + // Other requests should not be finished + assert!(poll!(&mut second_fut).is_pending()); + assert!(poll!(&mut third_fut).is_pending()); + + // Next should be high priority request + semaphore_copy.add_permits(1); + assert!(third_fut.await.unwrap().unwrap().len() == 30); + assert!(poll!(&mut second_fut).is_pending()); + + // Finally, the low priority request + semaphore_copy.add_permits(1); + assert!(second_fut.await.unwrap().unwrap().len() == 20); + } } diff --git a/rust/lance-io/src/testing.rs b/rust/lance-io/src/testing.rs new file mode 100644 index 0000000000..a0f6bd2e0c --- /dev/null +++ b/rust/lance-io/src/testing.rs @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +use std::fmt::{self, Display, Formatter}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use mockall::mock; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ObjectStore as OSObjectStore, PutOptions, PutResult, Result as OSResult, +}; +use std::future::Future; +use tokio::io::AsyncWrite; + +mock! { + pub ObjectStore {} + + #[async_trait] + impl OSObjectStore for ObjectStore { + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> OSResult; + async fn put_multipart( + &self, + location: &Path, + ) -> OSResult<(MultipartId, Box)>; + async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> OSResult<()>; + fn get_opts<'life0, 'life1, 'async_trait>( + &'life0 self, + location: &'life1 Path, + options: GetOptions + ) -> std::pin::Pin > +Send+'async_trait> > where + Self: 'async_trait, + 'life0: 'async_trait, + 'life1: 'async_trait; + async fn delete(&self, location: &Path) -> OSResult<()>; + fn list<'a>(&'a self, prefix: Option<&'a Path>) -> BoxStream<'_, OSResult>; + async fn list_with_delimiter<'a, 'b>(&'a self, prefix: Option<&'b Path>) -> OSResult; + async fn copy(&self, from: &Path, to: &Path) -> OSResult<()>; + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()>; + } +} + +impl std::fmt::Debug for MockObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "MockObjectStore") + } +} + +impl Display for MockObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "MockObjectStore") + } +} diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index e2b2b5a131..6ac2566fba 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -26,7 +26,7 @@ use lance_file::reader::{read_batch, FileReader}; use lance_file::v2; use lance_file::v2::reader::ReaderProjection; use lance_io::object_store::ObjectStore; -use lance_io::scheduler::StoreScheduler; +use lance_io::scheduler::ScanScheduler; use lance_io::ReadBatchParams; use lance_table::format::{DataFile, DeletionFile, Fragment}; use lance_table::io::deletion::{deletion_file_path, read_deletion_file, write_deletion_file}; @@ -457,7 +457,7 @@ impl FileFragment { Ok(None) } else { let path = self.dataset.data_dir().child(data_file.path.as_str()); - let store_scheduler = StoreScheduler::new(self.dataset.object_store.clone(), 16); + let store_scheduler = ScanScheduler::new(self.dataset.object_store.clone(), 16); let file_scheduler = store_scheduler.open_file(&path).await?; let reader = Arc::new(v2::reader::FileReader::try_open(file_scheduler, None).await?); let field_id_to_column_idx = Arc::new(BTreeMap::from_iter(