diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index a0098a24c7..372b339a17 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -196,6 +196,7 @@ //! * The "batch overhead" is very small in Lance compared to other formats because it has no //! relation to the way the data is stored. +use std::collections::VecDeque; use std::{ops::Range, sync::Arc}; use arrow_array::cast::AsArray; @@ -513,7 +514,8 @@ impl DecodeBatchScheduler { Self::create_field_scheduler(field.data_type(), &mut col_info_iter, buffers) }) .collect::>(); - let root_scheduler = SimpleStructScheduler::new(field_schedulers, schema.fields.clone()); + let root_scheduler = + SimpleStructScheduler::new_root(field_schedulers, schema.fields.clone()); Self { root_scheduler } } @@ -528,20 +530,17 @@ impl DecodeBatchScheduler { pub async fn schedule_range( &mut self, range: Range, - sink: mpsc::UnboundedSender>, - scheduler: &Arc, + sink: mpsc::UnboundedSender, + scheduler: Arc, ) -> Result<()> { let rows_to_read = range.end - range.start; trace!("Scheduling range {:?} ({} rows)", range, rows_to_read); let range = range.start as u32..range.end as u32; - self.root_scheduler.schedule_ranges( - &[range.clone()], - scheduler, - &sink, - range.start as u64, - )?; + let mut context = SchedulerContext::new(sink, scheduler); + self.root_scheduler + .schedule_ranges(&[range.clone()], &mut context, range.start as u64)?; trace!("Finished scheduling of range {:?}", range); Ok(()) @@ -557,8 +556,8 @@ impl DecodeBatchScheduler { pub async fn schedule_take( &mut self, indices: &[u64], - sink: mpsc::UnboundedSender>, - scheduler: &Arc, + sink: mpsc::UnboundedSender, + scheduler: Arc, ) -> Result<()> { debug_assert!(indices.windows(2).all(|w| w[0] < w[1])); if indices.is_empty() { @@ -578,8 +577,10 @@ impl DecodeBatchScheduler { } // TODO: Figure out how to handle u64 indices let indices = indices.iter().map(|i| *i as u32).collect::>(); + let mut context = SchedulerContext::new(sink, scheduler); + self.root_scheduler - .schedule_take(&indices, scheduler, &sink, indices[0] as u64)?; + .schedule_take(&indices, &mut context, indices[0] as u64)?; trace!("Finished scheduling take of {} rows", indices.len()); Ok(()) } @@ -592,10 +593,12 @@ pub struct ReadBatchTask { /// A stream that takes scheduled jobs and generates decode tasks from them. pub struct BatchDecodeStream { - scheduled: mpsc::UnboundedReceiver>, - current: Option>, + context: DecoderContext, + root_decoders: VecDeque>, rows_remaining: u64, rows_per_batch: u32, + rows_scheduled: u64, + rows_drained: u64, } impl BatchDecodeStream { @@ -610,23 +613,69 @@ impl BatchDecodeStream { /// * `num_rows` the total number of rows scheduled /// * `num_columns` the total number of columns in the file pub fn new( - scheduled: mpsc::UnboundedReceiver>, + scheduled: mpsc::UnboundedReceiver, rows_per_batch: u32, num_rows: u64, ) -> Self { Self { - scheduled, - current: None, + context: DecoderContext::new(scheduled), + root_decoders: VecDeque::new(), rows_remaining: num_rows, rows_per_batch, + rows_scheduled: 0, + rows_drained: 0, + } + } + + fn accept_decoder(&mut self, decoder: DecoderReady) -> Result<()> { + if decoder.path.is_empty() { + self.root_decoders.push_back(decoder.decoder); + } else { + let root = self + .root_decoders + .front_mut() + .ok_or_else(|| Error::Internal { + message: format!( + "A child decoder with path {:?} arrived before any root decoder", + decoder.path + ), + location: location!(), + })?; + root.accept_child(decoder)?; } + Ok(()) + } + + async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<()> { + while self.rows_scheduled < scheduled_need { + let next_message = self.context.source.recv().await; + match next_message { + Some(DecoderMessage::ScanLine(rows_scheduled)) => { + self.rows_scheduled = rows_scheduled; + } + Some(DecoderMessage::Decoder(decoder)) => { + self.accept_decoder(decoder)?; + } + None => { + return Err(Error::Internal { + message: + "The scheduler finished while the decoder was still waiting for input" + .to_string(), + location: location!(), + }); + } + } + } + Ok(()) } #[instrument(level = "debug", skip_all)] async fn next_batch_task(&mut self) -> Result> { trace!( - "Draining batch task (rows_remaining={})", - self.rows_remaining + "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})", + self.rows_remaining, + self.rows_drained, + self.rows_scheduled, ); if self.rows_remaining == 0 { return Ok(None); @@ -635,20 +684,34 @@ impl BatchDecodeStream { let to_take = self.rows_remaining.min(self.rows_per_batch as u64) as u32; self.rows_remaining -= to_take as u64; - if self.current.is_none() { - trace!("Loading new top-level page"); - self.current = Some(self.scheduled.recv().await.unwrap()); + let scheduled_need = + (self.rows_drained + to_take as u64).saturating_sub(self.rows_scheduled); + if scheduled_need > 0 { + let desired_scheduled = scheduled_need + self.rows_scheduled; + trace!( + "Draining from scheduler (desire at least {} scheduled rows)", + desired_scheduled + ); + self.wait_for_scheduled(desired_scheduled).await?; } - let current = self.current.as_mut().unwrap(); + + let current = self + .root_decoders + .front_mut() + .ok_or_else(|| Error::Internal { + message: "the scheduler never emitted a top-level decoder".into(), + location: location!(), + })?; let avail = current.avail(); trace!("Top level page has {} rows already available", avail); if avail < to_take { - current.wait(to_take, &mut self.scheduled).await?; + current.wait(to_take).await?; } let next_task = current.drain(to_take)?; if !next_task.has_more { - self.current = None; + self.root_decoders.pop_front(); } + self.rows_drained += to_take as u64; Ok(Some(next_task)) } @@ -760,6 +823,132 @@ pub trait PhysicalPageScheduler: Send + Sync + std::fmt::Debug { ) -> BoxFuture<'static, Result>>; } +/// Contains the context for a scheduler +pub struct SchedulerContext { + /// The sink that sends decodeable tasks to the decode stage + pub(crate) sink: mpsc::UnboundedSender, + recv: Option>, + io: Arc, + name: String, + path: Vec, + path_names: Vec, +} + +pub struct ScopedSchedulerContext<'a> { + pub context: &'a mut SchedulerContext, +} + +impl<'a> ScopedSchedulerContext<'a> { + pub fn pop(self) -> &'a mut SchedulerContext { + self.context.pop(); + self.context + } +} + +impl SchedulerContext { + pub fn new(sink: mpsc::UnboundedSender, io: Arc) -> Self { + Self { + sink, + io, + recv: None, + name: "".to_string(), + path: Vec::new(), + path_names: Vec::new(), + } + } + + pub fn io(&self) -> &dyn EncodingsIo { + self.io.as_ref() + } + + pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext { + self.path.push(index); + self.path_names.push(name.to_string()); + ScopedSchedulerContext { context: self } + } + + pub fn pop(&mut self) { + self.path.pop(); + self.path_names.pop(); + } + + pub fn path_name(&self) -> String { + let path = self.path_names.join("/"); + if self.recv.is_some() { + format!("TEMP({}){}", self.name, path) + } else { + format!("ROOT{}", path) + } + } + + pub fn emit(&mut self, decoder: Box) { + trace!( + "Scheduling decoder of type {:?} for {:?}", + decoder.data_type(), + self.path, + ); + self.sink + .send(DecoderMessage::Decoder(DecoderReady { + decoder, + path: VecDeque::from_iter(self.path.iter().copied()), + })) + .unwrap(); + } + + // Temporary might not be the best name for this. We create a new context that + // shared the same I/O scheduler and has a different name. This is used in two + // situations. + // + // 1. When we need to create a new indirect I/O phase. + // 2. When we need to wrap a set of decoders and so we want to intercept them + // before they are emitted. + pub fn temporary(&self) -> Self { + let (tx, rx) = unbounded_channel(); + let mut name = self.name.clone(); + name.push_str(&self.path_names.join("/")); + Self { + sink: tx, + io: self.io.clone(), + recv: Some(rx), + name, + path: Vec::new(), + path_names: Vec::new(), + } + } + + // Consumes the temporary context returning the decoder messages + // + // Used when the temporary context is used to create a new indirect I/O phase + // where all the messages need to be replayed + pub fn into_messages(self) -> Vec { + let mut recv = self + .recv + .expect("Call to `finish` on a non-temporary scheduler context"); + let mut decoders = Vec::new(); + while let Ok(decoder) = recv.try_recv() { + decoders.push(decoder); + } + decoders + } + + // Consumes the temporary context returning only the decoders + // + // Used when the temporary context is used to wrap a set of decoders + pub fn into_decoders(self) -> Vec> { + self.into_messages() + .into_iter() + .filter_map(|msg| { + match msg { + DecoderMessage::ScanLine(_) => None, + // Should we ignore path here? Currently, all "wrapping" layers should not have + // children and so there should be no path. We could maybe debug_assert this. + DecoderMessage::Decoder(decoder_ready) => Some(decoder_ready.decoder), + } + }) + .collect::>() + } +} + /// A scheduler for a field's worth of data /// /// Each page of incoming data maps to one `LogicalPageScheduler` instance. However, this @@ -791,8 +980,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug { fn schedule_ranges( &self, ranges: &[Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()>; /// Schedules I/O for the requested rows (identified by row offsets from start of page) @@ -800,8 +988,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug { fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()>; /// The number of rows covered by this page @@ -824,6 +1011,48 @@ pub struct NextDecodeTask { pub has_more: bool, } +pub struct DecoderReady { + // The decoder that is ready to be decoded + pub decoder: Box, + // The path to the decoder, the first value is the column index + // following values, if present, are nested child indices + // + // For example, a path of [1, 1, 0] would mean to grab the second + // column, then the second child, and then the first child. + // + // It could represent x in the following schema: + // + // score: float64 + // points: struct + // color: string + // location: struct + // x: float64 + // + // Currently, only struct decoders have "children" although other + // decoders may at some point as well. List children are only + // handled through indirect I/O at the moment and so they don't + // need to be represented (yet) + pub path: VecDeque, +} + +pub enum DecoderMessage { + // Emitted whenever the scheduler has made another pass through the columns. + // Contains the number of rows that have been scheduled so far. + ScanLine(u64), + // Emitted whenever a decoder has been emitted + Decoder(DecoderReady), +} + +pub struct DecoderContext { + source: mpsc::UnboundedReceiver, +} + +impl DecoderContext { + pub fn new(source: mpsc::UnboundedReceiver) -> Self { + Self { source } + } +} + /// A decoder for a field's worth of data /// /// The decoder is initially "unloaded" (doesn't have all its data). The [`Self::wait`] @@ -833,18 +1062,29 @@ pub struct NextDecodeTask { /// Unlike the other decoder types it is assumed that `LogicalPageDecoder` is stateful /// and only `Send`. This is why we don't need a `rows_to_skip` argument in [`Self::drain`] pub trait LogicalPageDecoder: std::fmt::Debug + Send { + /// Add a newly scheduled child decoder + /// + /// The default implementation does not expect children and returns + /// an error. + fn accept_child(&mut self, _child: DecoderReady) -> Result<()> { + Err(Error::Internal { + message: format!( + "The decoder {:?} does not expect children but received a child", + self + ), + location: location!(), + }) + } /// Waits for enough data to be loaded to decode `num_rows` of data - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>>; + fn wait(&mut self, num_rows: u32) -> BoxFuture>; /// Creates a task to decode `num_rows` of data into an array fn drain(&mut self, num_rows: u32) -> Result; /// The number of rows that are in the page but haven't yet been "waited" fn unawaited(&self) -> u32; /// The number of rows that have been "waited" but not yet decoded fn avail(&self) -> u32; + /// The data type of the decoded data + fn data_type(&self) -> &DataType; } /// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`] @@ -854,7 +1094,7 @@ pub async fn decode_batch(batch: &EncodedBatch) -> Result { let (tx, rx) = unbounded_channel(); let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc; decode_scheduler - .schedule_range(0..batch.num_rows, tx, &io_scheduler) + .schedule_range(0..batch.num_rows, tx, io_scheduler) .await?; let stream = BatchDecodeStream::new(rx, batch.num_rows as u32, batch.num_rows); stream.into_stream().next().await.unwrap().task.await diff --git a/rust/lance-encoding/src/encodings/logical/binary.rs b/rust/lance-encoding/src/encodings/logical/binary.rs index a71037780e..e927d2bc87 100644 --- a/rust/lance-encoding/src/encodings/logical/binary.rs +++ b/rust/lance-encoding/src/encodings/logical/binary.rs @@ -11,12 +11,14 @@ use arrow_array::{ use arrow_buffer::ScalarBuffer; use arrow_schema::{DataType, Field}; -use futures::{future::BoxFuture, FutureExt}; +use futures::future::BoxFuture; use lance_core::Result; use log::trace; use crate::{ - decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask}, + decoder::{ + DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext, + }, encoder::{EncodeTask, FieldEncoder}, }; @@ -45,21 +47,20 @@ impl LogicalPageScheduler for BinaryPageScheduler { fn schedule_ranges( &self, ranges: &[std::ops::Range], - scheduler: &Arc, - sink: &tokio::sync::mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { trace!("Scheduling binary for {} ranges", ranges.len()); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let mut temp_context = context.temporary(); self.varbin_scheduler - .schedule_ranges(ranges, scheduler, &tx, top_level_row)?; + .schedule_ranges(ranges, &mut temp_context, top_level_row)?; - while let Some(decoder) = rx.recv().now_or_never() { - let wrapped = BinaryPageDecoder { - inner: decoder.unwrap(), + for decoder in temp_context.into_decoders() { + let decoder = Box::new(BinaryPageDecoder { + inner: decoder, data_type: self.data_type.clone(), - }; - sink.send(Box::new(wrapped)).unwrap(); + }); + context.emit(decoder); } Ok(()) @@ -68,8 +69,7 @@ impl LogicalPageScheduler for BinaryPageScheduler { fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &tokio::sync::mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { trace!("Scheduling binary for {} indices", indices.len()); @@ -78,8 +78,7 @@ impl LogicalPageScheduler for BinaryPageScheduler { .iter() .map(|&idx| idx..(idx + 1)) .collect::>(), - scheduler, - sink, + context, top_level_row, ) } @@ -96,12 +95,8 @@ pub struct BinaryPageDecoder { } impl LogicalPageDecoder for BinaryPageDecoder { - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut tokio::sync::mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { - self.inner.wait(num_rows, source) + fn wait(&mut self, num_rows: u32) -> BoxFuture> { + self.inner.wait(num_rows) } fn drain(&mut self, num_rows: u32) -> Result { @@ -123,6 +118,10 @@ impl LogicalPageDecoder for BinaryPageDecoder { fn avail(&self) -> u32 { self.inner.avail() } + + fn data_type(&self) -> &DataType { + &self.data_type + } } pub struct BinaryArrayDecoder { diff --git a/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs b/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs index 44ead7fa63..caf17a2fc5 100644 --- a/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs +++ b/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs @@ -4,14 +4,12 @@ use std::{ops::Range, sync::Arc}; use arrow_array::{ArrayRef, FixedSizeListArray}; -use arrow_schema::Field; +use arrow_schema::{DataType, Field}; use futures::future::BoxFuture; use log::trace; -use tokio::sync::mpsc; -use crate::{ - decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask}, - EncodingsIo, +use crate::decoder::{ + DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext, }; use lance_core::Result; @@ -51,8 +49,7 @@ impl LogicalPageScheduler for FslPageScheduler { fn schedule_ranges( &self, ranges: &[Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { let expanded_ranges = ranges @@ -63,23 +60,27 @@ impl LogicalPageScheduler for FslPageScheduler { "Scheduling expanded ranges {:?} from items scheduler", expanded_ranges ); - let (tx, mut rx) = mpsc::unbounded_channel(); + let mut temp_context = context.temporary(); self.items_scheduler - .schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?; - let inner_page_decoder = rx.blocking_recv().unwrap(); - sink.send(Box::new(FslPageDecoder { - inner: inner_page_decoder, - dimension: self.dimension, - })) - .unwrap(); + .schedule_ranges(&expanded_ranges, &mut temp_context, top_level_row)?; + for decoder in temp_context.into_decoders() { + let data_type = DataType::FixedSizeList( + Arc::new(Field::new("item", decoder.data_type().clone(), true)), + self.dimension as i32, + ); + context.emit(Box::new(FslPageDecoder { + inner: decoder, + dimension: self.dimension, + data_type, + })); + } Ok(()) } fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { self.schedule_ranges( @@ -87,8 +88,7 @@ impl LogicalPageScheduler for FslPageScheduler { .iter() .map(|&idx| idx..(idx + 1)) .collect::>(), - scheduler, - sink, + context, top_level_row, ) } @@ -102,15 +102,12 @@ impl LogicalPageScheduler for FslPageScheduler { struct FslPageDecoder { inner: Box, dimension: u32, + data_type: DataType, } impl LogicalPageDecoder for FslPageDecoder { - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { - self.inner.wait(num_rows * self.dimension, source) + fn wait(&mut self, num_rows: u32) -> BoxFuture> { + self.inner.wait(num_rows * self.dimension) } fn unawaited(&self) -> u32 { @@ -136,6 +133,10 @@ impl LogicalPageDecoder for FslPageDecoder { fn avail(&self) -> u32 { self.inner.avail() / self.dimension } + + fn data_type(&self) -> &DataType { + &self.data_type + } } struct FslDecodeTask { diff --git a/rust/lance-encoding/src/encodings/logical/list.rs b/rust/lance-encoding/src/encodings/logical/list.rs index 912be8bcb2..9fe3fdfef3 100644 --- a/rust/lance-encoding/src/encodings/logical/list.rs +++ b/rust/lance-encoding/src/encodings/logical/list.rs @@ -14,15 +14,16 @@ use arrow_schema::{DataType, Field}; use futures::{future::BoxFuture, FutureExt}; use log::trace; use snafu::{location, Location}; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::task::JoinHandle; use lance_core::{Error, Result}; use crate::{ - decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask}, + decoder::{ + DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext, + }, encoder::{ArrayEncoder, EncodeTask, EncodedArray, EncodedPage, FieldEncoder}, format::pb, - EncodingsIo, }; use super::primitive::{AccumulationQueue, PrimitiveFieldEncoder}; @@ -225,8 +226,7 @@ impl LogicalPageScheduler for ListPageScheduler { fn schedule_ranges( &self, ranges: &[std::ops::Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { // TODO: Shortcut here if the request covers the entire range (can be determined by @@ -257,13 +257,23 @@ impl LogicalPageScheduler for ListPageScheduler { trace!("Scheduling list offsets ranges: {:?}", offsets_ranges); // Create a channel for the internal schedule / decode loop that is unique // to this page. - let (tx, mut rx) = mpsc::unbounded_channel(); + let mut temporary = context.temporary(); self.offsets_scheduler - .schedule_ranges(&offsets_ranges, scheduler, &tx, top_level_row)?; - let mut scheduled_offsets = rx.try_recv().unwrap(); + .schedule_ranges(&offsets_ranges, &mut temporary, top_level_row)?; + let offset_decoders = temporary.into_decoders(); + let num_offset_decoders = offset_decoders.len(); + let mut scheduled_offsets = + offset_decoders + .into_iter() + .next() + .ok_or_else(|| Error::Internal { + message: format!("scheduling offsets yielded {} pages", num_offset_decoders), + location: location!(), + })?; let items_schedulers = self.items_schedulers.clone(); let ranges = ranges.to_vec(); - let scheduler = scheduler.clone(); + + let mut indirect_context = context.temporary(); // First we schedule, as normal, the I/O for the offsets. Then we immediately spawn // a task to decode those offsets and schedule the I/O for the items AND wait for @@ -273,14 +283,12 @@ impl LogicalPageScheduler for ListPageScheduler { let indirect_fut = tokio::task::spawn(async move { // We know the offsets are a primitive array and thus will not need additional // pages. We can use a dummy receiver to match the decoder API - let (_, mut dummy_rx) = mpsc::unbounded_channel(); - scheduled_offsets.wait(num_rows, &mut dummy_rx).await?; + scheduled_offsets.wait(num_rows).await?; let decode_task = scheduled_offsets.drain(num_offsets)?; let offsets = decode_task.task.decode()?; let (mut item_ranges, offsets, validity) = Self::decode_offsets(offsets.as_ref(), &ranges, null_offset_adjustment); - let (tx, mut rx) = mpsc::unbounded_channel(); trace!( "Indirectly scheduling items ranges {:?} from {} list items pages", @@ -327,8 +335,7 @@ impl LogicalPageScheduler for ListPageScheduler { // that way. Still, this is probably good enough for a while next_scheduler.schedule_ranges( &next_item_ranges, - &scheduler, - &tx, + &mut indirect_context, top_level_row, )?; next_item_ranges.clear(); @@ -355,8 +362,7 @@ impl LogicalPageScheduler for ListPageScheduler { if !next_item_ranges.is_empty() { next_scheduler.schedule_ranges( &next_item_ranges, - &scheduler, - &tx, + &mut indirect_context, top_level_row, )?; next_item_ranges.clear(); @@ -367,16 +373,13 @@ impl LogicalPageScheduler for ListPageScheduler { if !next_item_ranges.is_empty() { next_scheduler.schedule_ranges( &next_item_ranges, - &scheduler, - &tx, + &mut indirect_context, top_level_row, )?; } - let mut item_decoders = Vec::new(); - drop(tx); - while let Some(item_decoder) = rx.recv().await { - item_decoders.push(item_decoder); - } + // TODO: Should probably use into_messages here. Can figure out once we add + // tests for List> + let item_decoders = indirect_context.into_decoders(); Ok(IndirectlyLoaded { offsets, @@ -384,7 +387,16 @@ impl LogicalPageScheduler for ListPageScheduler { item_decoders, }) }); - sink.send(Box::new(ListPageDecoder { + let data_type = match &self.offset_type { + DataType::Int32 => { + DataType::List(Arc::new(Field::new("item", self.items_type.clone(), true))) + } + DataType::Int64 => { + DataType::LargeList(Arc::new(Field::new("item", self.items_type.clone(), true))) + } + _ => panic!("Unexpected offset type {}", self.offset_type), + }; + context.emit(Box::new(ListPageDecoder { offsets: Vec::new(), validity: BooleanBuffer::new(Buffer::from_vec(Vec::::default()), 0, 0), unawaited_item_decoders: VecDeque::new(), @@ -395,8 +407,8 @@ impl LogicalPageScheduler for ListPageScheduler { unloaded: Some(indirect_fut), items_type: self.items_type.clone(), offset_type: self.offset_type.clone(), - })) - .unwrap(); + data_type, + })); Ok(()) } @@ -407,8 +419,7 @@ impl LogicalPageScheduler for ListPageScheduler { fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { trace!("Scheduling list offsets for {} indices", indices.len()); @@ -417,8 +428,7 @@ impl LogicalPageScheduler for ListPageScheduler { .iter() .map(|&idx| idx..(idx + 1)) .collect::>(), - scheduler, - sink, + context, top_level_row, ) } @@ -448,6 +458,7 @@ struct ListPageDecoder { rows_drained: u32, items_type: DataType, offset_type: DataType, + data_type: DataType, } struct ListDecodeTask { @@ -517,11 +528,7 @@ impl DecodeArrayTask for ListDecodeTask { } impl LogicalPageDecoder for ListPageDecoder { - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { + fn wait(&mut self, num_rows: u32) -> BoxFuture> { async move { // wait for the indirect I/O to finish, then wait for enough items to arrive if self.unloaded.is_some() { @@ -574,7 +581,7 @@ impl LogicalPageDecoder for ListPageDecoder { to_await ); if to_await > 0 { - last_decoder.wait(to_await, source).await?; + last_decoder.wait(to_await).await?; items_needed = items_needed.saturating_sub(last_decoder.avail() as u64); } } @@ -588,7 +595,7 @@ impl LogicalPageDecoder for ListPageDecoder { ); let to_await = items_needed.min(unawaited) as u32; // TODO: Seems like this will fail in List case - next_item_decoder.wait(to_await, source).await?; + next_item_decoder.wait(to_await).await?; // Might end up loading more items than needed so use saturating_sub items_needed = items_needed.saturating_sub(next_item_decoder.avail() as u64); self.awaited_item_decoders.push_back(next_item_decoder); @@ -674,6 +681,10 @@ impl LogicalPageDecoder for ListPageDecoder { fn avail(&self) -> u32 { self.lists_available } + + fn data_type(&self) -> &DataType { + &self.data_type + } } struct IndirectlyLoaded { diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index c6291650d8..9d5176259c 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -25,19 +25,17 @@ use log::{debug, trace}; use snafu::{location, Location}; use lance_core::{Error, Result}; -use tokio::sync::mpsc; use crate::{ decoder::{ DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, PageInfo, - PhysicalPageDecoder, PhysicalPageScheduler, + PhysicalPageDecoder, PhysicalPageScheduler, SchedulerContext, }, encoder::{ArrayEncoder, EncodeTask, EncodedPage, FieldEncoder}, encodings::physical::{ basic::BasicEncoder, decoder_from_array_encoding, fixed_size_list::FslEncoder, value::ValueEncoder, ColumnBuffers, PageBuffers, }, - EncodingsIo, }; /// A page scheduler for primitive fields @@ -78,15 +76,13 @@ impl LogicalPageScheduler for PrimitivePageScheduler { fn schedule_ranges( &self, ranges: &[std::ops::Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { let num_rows = ranges.iter().map(|r| r.end - r.start).sum(); - trace!("Scheduling ranges {:?} from physical page", ranges); let physical_decoder = self.physical_decoder - .schedule_ranges(ranges, scheduler.as_ref(), top_level_row); + .schedule_ranges(ranges, context.io(), top_level_row); let logical_decoder = PrimitiveFieldDecoder { data_type: self.data_type.clone(), @@ -96,15 +92,14 @@ impl LogicalPageScheduler for PrimitivePageScheduler { num_rows, }; - sink.send(Box::new(logical_decoder)).unwrap(); + context.emit(Box::new(logical_decoder)); Ok(()) } fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { trace!( @@ -116,8 +111,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler { .iter() .map(|&idx| idx..(idx + 1)) .collect::>(), - scheduler, - sink, + context, top_level_row, ) } @@ -392,11 +386,9 @@ impl PrimitiveFieldDecodeTask { } impl LogicalPageDecoder for PrimitiveFieldDecoder { - fn wait<'a>( - &'a mut self, - _: u32, - _: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { + // TODO: In the future, at some point, we may consider partially waiting for primitive pages by + // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode" + fn wait(&mut self, _: u32) -> BoxFuture> { async move { let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?; self.physical_decoder = Some(Arc::from(physical_decoder)); @@ -440,6 +432,10 @@ impl LogicalPageDecoder for PrimitiveFieldDecoder { self.num_rows - self.rows_drained } } + + fn data_type(&self) -> &DataType { + &self.data_type + } } #[derive(Debug)] diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 1672cf619b..055544ae22 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -4,18 +4,20 @@ use std::{collections::VecDeque, ops::Range, sync::Arc}; use arrow_array::{cast::AsArray, ArrayRef, StructArray}; -use arrow_schema::Fields; +use arrow_schema::{DataType, Fields}; use futures::{future::BoxFuture, FutureExt}; use log::trace; -use tokio::sync::mpsc; +use snafu::{location, Location}; use crate::{ - decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask}, + decoder::{ + DecodeArrayTask, DecoderReady, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, + SchedulerContext, + }, encoder::{EncodeTask, EncodedArray, EncodedPage, FieldEncoder}, format::pb, - EncodingsIo, }; -use lance_core::Result; +use lance_core::{Error, Result}; /// A scheduler for structs /// @@ -32,10 +34,16 @@ pub struct SimpleStructScheduler { children: Vec>>, child_fields: Fields, num_rows: u32, + // True if this is the top-level decoder (and we should send scan line messages) + is_root: bool, } impl SimpleStructScheduler { - pub fn new(children: Vec>>, child_fields: Fields) -> Self { + fn new_with_params( + children: Vec>>, + child_fields: Fields, + is_root: bool, + ) -> Self { debug_assert!(!children.is_empty()); let num_rows = children[0].iter().map(|page| page.num_rows()).sum(); // Ensure that all the children have the same number of rows @@ -43,8 +51,20 @@ impl SimpleStructScheduler { children, child_fields, num_rows, + is_root, } } + + pub fn new(children: Vec>>, child_fields: Fields) -> Self { + Self::new_with_params(children, child_fields, false) + } + + pub fn new_root( + children: Vec>>, + child_fields: Fields, + ) -> Self { + Self::new_with_params(children, child_fields, true) + } } // As we schedule a range we keep one of these per column so that we know @@ -108,8 +128,7 @@ impl LogicalPageScheduler for SimpleStructScheduler { fn schedule_ranges( &self, ranges: &[Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + mut context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { for range in ranges.iter().cloned() { @@ -125,11 +144,10 @@ impl LogicalPageScheduler for SimpleStructScheduler { // // This will need to get a tiny bit more complicated once structs have their own nullability and that nullability // information starts to span multiple pages. - sink.send(Box::new(SimpleStructDecoder::new( + context.emit(Box::new(SimpleStructDecoder::new( self.child_fields.clone(), rows_to_read, - ))) - .unwrap(); + ))); let mut field_status = vec![RangeFieldWalkStatus::new_from_range(range); self.children.len()]; @@ -180,18 +198,18 @@ impl LogicalPageScheduler for SimpleStructScheduler { let page_range = page_range_start..(page_range_start + rows_to_take); trace!( - "Taking {} rows from column {} starting at page offset {} from page {:?}", + "Taking {} rows from column {} starting at page offset {}", rows_to_take, col_idx, page_range_start, - next_page ); + let scope = context.push(self.child_fields[col_idx].name(), col_idx as u32); next_page.schedule_ranges( &[page_range], - scheduler, - sink, + scope.context, current_top_level_row, )?; + context = scope.pop(); status.rows_queued += rows_to_take; status.rows_to_take -= rows_to_take; @@ -200,6 +218,11 @@ impl LogicalPageScheduler for SimpleStructScheduler { min_rows_added = min_rows_added.min(rows_to_take); } else { + trace!( + "Using {} queued rows for column {}", + col_idx, + status.rows_queued + ); min_rows_added = min_rows_added.min(status.rows_queued); } } @@ -208,6 +231,18 @@ impl LogicalPageScheduler for SimpleStructScheduler { } rows_to_read -= min_rows_added; current_top_level_row += min_rows_added as u64; + if self.is_root { + trace!( + "Scheduler scan complete ({} rows now scheduled)", + current_top_level_row - top_level_row + ); + context + .sink + .send(crate::decoder::DecoderMessage::ScanLine( + current_top_level_row - top_level_row, + )) + .unwrap(); + } for field_status in &mut field_status { field_status.rows_queued -= min_rows_added; } @@ -223,22 +258,24 @@ impl LogicalPageScheduler for SimpleStructScheduler { fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + mut context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { - trace!("Scheduling struct decode of {} indices", indices.len()); + trace!( + "Scheduling struct decode of {} indices with top_level_row={}", + indices.len(), + top_level_row + ); // Before we do anything, send a struct decoder to the decode thread so it can start decoding the pages // we are about to send. // // This will need to get a tiny bit more complicated once structs have their own nullability and that nullability // information starts to span multiple pages. - sink.send(Box::new(SimpleStructDecoder::new( + context.emit(Box::new(SimpleStructDecoder::new( self.child_fields.clone(), indices.len() as u32, - ))) - .unwrap(); + ))); // Create a cursor into indices for each column let mut field_status = @@ -283,12 +320,13 @@ impl LogicalPageScheduler for SimpleStructScheduler { // We should be guaranteed to get at least one page let next_page = next_page.unwrap(); + let scope = context.push(self.child_fields[col_idx].name(), col_idx as u32); next_page.schedule_take( &indices_in_page, - scheduler, - sink, + scope.context, current_top_level_row, )?; + context = scope.pop(); let rows_scheduled = indices_in_page.len() as u32; status.rows_queued += rows_scheduled; @@ -296,6 +334,11 @@ impl LogicalPageScheduler for SimpleStructScheduler { min_rows_added = min_rows_added.min(rows_scheduled); } else { // TODO: Unit tests are not covering this path right now + trace!( + "Using {} already queued rows for column {}", + status.rows_queued, + col_idx + ); min_rows_added = min_rows_added.min(status.rows_queued); } } @@ -308,6 +351,18 @@ impl LogicalPageScheduler for SimpleStructScheduler { ); rows_to_read -= min_rows_added; current_top_level_row += min_rows_added as u64; + if self.is_root { + trace!( + "Scheduler scan complete ({} rows now scheduled)", + current_top_level_row - top_level_row + ); + context + .sink + .send(crate::decoder::DecoderMessage::ScanLine( + current_top_level_row - top_level_row, + )) + .unwrap(); + } for field_status in &mut field_status { field_status.rows_queued -= min_rows_added; } @@ -318,18 +373,17 @@ impl LogicalPageScheduler for SimpleStructScheduler { #[derive(Debug)] struct ChildState { - // As we decode a column we pull pages out of the channel source and into - // a queue for that column. Since we await as soon as we pull the page from - // the source there is no need for a separate unawaited queue. + // As child decoders are scheduled they are added to this queue + // Once the decoder is fully drained it is popped from this queue // - // Technically though, these pages are only "partially awaited" + // TODO: It may be a minor perf optimization, in some rare cases, if we have a separate + // "fully awaited but not yet drained" queue so we don't loop through fully awaited pages + // during each call to wait. // // Note: This queue may have more than one page in it if the batch size is very large // or pages are very small // TODO: Test this case - // - // Then we drain this queue pages as we decode. - awaited: VecDeque>, + scheduled: VecDeque>, // Rows that should still be coming over the channel source but haven't yet been // put into the awaited queue rows_unawaited: u32, @@ -368,21 +422,15 @@ impl CompositeDecodeTask { impl ChildState { fn new(num_rows: u32, field_index: u32) -> Self { Self { - awaited: VecDeque::new(), + scheduled: VecDeque::new(), rows_unawaited: num_rows, rows_available: 0, field_index, } } - // Wait for the next set of rows to arrive. Return true if finished. Return - // false if more rows are still needed (we can only wait one page at a time - // because we need to move in row-major fashion) - async fn wait_next( - &mut self, - num_rows: u32, - source: &mut mpsc::UnboundedReceiver>, - ) -> Result { + // Wait for the next set of rows to arrive. + async fn wait(&mut self, num_rows: u32) -> Result<()> { trace!( "Struct child {} waiting for {} rows and {} are available already", self.field_index, @@ -390,68 +438,41 @@ impl ChildState { self.rows_available ); let mut remaining = num_rows.saturating_sub(self.rows_available); - if remaining > 0 { - trace!( - "Struct must await {} rows from {} unawaited rows", - remaining, - self.rows_unawaited - ); - if let Some(back) = self.awaited.back_mut() { - if back.unawaited() > 0 { - let rows_to_wait = remaining.min(back.unawaited()); - trace!( - "Struct await an additional {} rows from the current page", - rows_to_wait - ); - // Even though we wait for X rows we might actually end up - // loading more than that - let previously_avail = back.avail(); - back.wait(rows_to_wait, source).await?; - let newly_avail = back.avail() - previously_avail; - trace!("The await loaded {} rows", newly_avail); - self.rows_available += newly_avail; - // Need to use saturating_sub here because we might have asked for range - // 0-1000 and this page we just loaded might cover 900-1100 and so newly_avail - // is 200 but rows_unawaited is only 100 - // - // TODO: Unit tests are not covering this branch right now - self.rows_unawaited = self.rows_unawaited.saturating_sub(newly_avail); - remaining -= rows_to_wait; - if remaining == 0 { - return Ok(true); - } + for next_decoder in &mut self.scheduled { + if next_decoder.unawaited() > 0 { + let rows_to_wait = remaining.min(next_decoder.unawaited()); + trace!( + "Struct await an additional {} rows from the current page", + rows_to_wait + ); + // Even though we wait for X rows we might actually end up + // loading more than that + let previously_avail = next_decoder.avail(); + // We might only await part of a page. This is important for things + // like the struct> case where we have one outer page, one + // middle page, and then a bunch of inner pages. If we await the entire + // middle page then we will have to wait for all the inner pages to arrive + // before we can start decoding. + next_decoder.wait(rows_to_wait).await?; + let newly_avail = next_decoder.avail() - previously_avail; + trace!("The await loaded {} rows", newly_avail); + self.rows_available += newly_avail; + // Need to use saturating_sub here because we might have asked for range + // 0-1000 and this page we just loaded might cover 900-1100 and so newly_avail + // is 200 but rows_unawaited is only 100 + // + // TODO: Unit tests may not be covering this branch right now + self.rows_unawaited = self.rows_unawaited.saturating_sub(newly_avail); + remaining -= rows_to_wait; + if remaining == 0 { + break; } } - - // Because we schedule in row-major fashion we know the next page - // will belong to this column. - let mut decoder = source.recv().await.unwrap(); - let could_await = decoder.unawaited(); - let rows_to_wait = remaining.min(could_await); - trace!( - "Struct received new page and awaiting {} rows out of {}", - rows_to_wait, - could_await - ); - // We might only await part of a page. This is important for things - // like the struct> case where we have one outer page, one - // middle page, and then a bunch of inner pages. If we await the entire - // middle page then we will have to wait for all the inner pages to arrive - // before we can start decoding. - // - // TODO: test this case - let previously_avail = decoder.avail(); - decoder.wait(rows_to_wait, source).await?; - // It's possible that we loaded more rows than asked for so need to calculate - // newly_avail this way (we do this above too) - let newly_avail = decoder.avail() - previously_avail; - self.awaited.push_back(decoder); - trace!("The new await loaded {} rows", newly_avail); - self.rows_available += newly_avail; - self.rows_unawaited = self.rows_unawaited.saturating_sub(newly_avail); - Ok(remaining == rows_to_wait) + } + if remaining > 0 { + Err(Error::Internal { message: format!("The struct field at index {} is still waiting for {} rows but ran out of scheduled pages", self.field_index, remaining), location: location!() }) } else { - Ok(true) + Ok(()) } } @@ -467,12 +488,12 @@ impl ChildState { has_more: true, }; while remaining > 0 { - let next = self.awaited.front_mut().unwrap(); + let next = self.scheduled.front_mut().unwrap(); let rows_to_take = remaining.min(next.avail()); let next_task = next.drain(rows_to_take)?; if next.avail() == 0 && next.unawaited() == 0 { trace!("Completely drained page"); - self.awaited.pop_front(); + self.scheduled.pop_front(); } remaining -= rows_to_take; composite.tasks.push(next_task.task); @@ -487,10 +508,12 @@ impl ChildState { struct SimpleStructDecoder { children: Vec, child_fields: Fields, + data_type: DataType, } impl SimpleStructDecoder { fn new(child_fields: Fields, num_rows: u32) -> Self { + let data_type = DataType::Struct(child_fields.clone()); Self { children: child_fields .iter() @@ -498,27 +521,32 @@ impl SimpleStructDecoder { .map(|(idx, _)| ChildState::new(num_rows, idx as u32)) .collect(), child_fields, + data_type, } } } impl LogicalPageDecoder for SimpleStructDecoder { - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { + fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> { + // children with empty path should not be delivered to this method + let child_idx = child.path.pop_front().unwrap(); + if child.path.is_empty() { + // This decoder is intended for us + self.children[child_idx as usize] + .scheduled + .push_back(child.decoder); + } else { + // This decoder is intended for one of our children + let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?; + intended.accept_child(child)?; + } + Ok(()) + } + + fn wait(&mut self, num_rows: u32) -> BoxFuture> { async move { - // This is basically the inverse of the row-major scheduling algorithm - let mut remaining = Vec::from_iter(self.children.iter_mut()); - while !remaining.is_empty() { - let mut next_remaining = Vec::new(); - for child in remaining { - if !child.wait_next(num_rows, source).await? { - next_remaining.push(child); - } - } - remaining = next_remaining; + for child in self.children.iter_mut() { + child.wait(num_rows).await?; } Ok(()) } @@ -562,6 +590,10 @@ impl LogicalPageDecoder for SimpleStructDecoder { .max() .unwrap() } + + fn data_type(&self) -> &DataType { + &self.data_type + } } struct SimpleStructDecodeTask { @@ -677,6 +709,42 @@ mod tests { check_round_trip_encoding_random(field).await; } + #[test_log::test(tokio::test)] + async fn test_struct_list() { + let data_type = DataType::Struct(Fields::from(vec![ + Field::new( + "inner_list", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + Field::new("outer_int", DataType::Int32, true), + ])); + let field = Field::new("row", data_type, false); + check_round_trip_encoding_random(field).await; + } + + #[test_log::test(tokio::test)] + async fn test_complicated_struct() { + let data_type = DataType::Struct(Fields::from(vec![ + Field::new("int", DataType::Int32, true), + Field::new( + "inner", + DataType::Struct(Fields::from(vec![ + Field::new("inner_int", DataType::Int32, true), + Field::new( + "inner_list", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ])), + true, + ), + Field::new("outer_binary", DataType::Binary, true), + ])); + let field = Field::new("row", data_type, false); + check_round_trip_encoding_random(field).await; + } + #[test_log::test(tokio::test)] async fn test_ragged_scheduling() { // This test covers scheduling when batches straddle page boundaries diff --git a/rust/lance-encoding/src/encodings/physical/basic.rs b/rust/lance-encoding/src/encodings/physical/basic.rs index 88befa5520..afda40c201 100644 --- a/rust/lance-encoding/src/encodings/physical/basic.rs +++ b/rust/lance-encoding/src/encodings/physical/basic.rs @@ -127,18 +127,14 @@ impl PhysicalPageScheduler for BasicPageScheduler { ) -> BoxFuture<'static, Result>> { let validity_future = match &self.mode { SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None, - SchedulerNullStatus::Some(schedulers) => { - trace!("Scheduling ranges {:?} from validity", ranges); - Some( - schedulers - .validity - .schedule_ranges(ranges, scheduler, top_level_row), - ) - } + SchedulerNullStatus::Some(schedulers) => Some(schedulers.validity.schedule_ranges( + ranges, + scheduler, + top_level_row, + )), }; let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() { - trace!("Scheduling range {:?} from values", ranges); Some( values_scheduler .schedule_ranges(ranges, scheduler, top_level_row) diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index d270f9d9aa..d021b8fde9 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -15,7 +15,7 @@ use lance_core::Result; use lance_datagen::{array, gen, RowCount, Seed}; use crate::{ - decoder::{BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, LogicalPageDecoder, PageInfo}, + decoder::{BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, DecoderMessage, PageInfo}, encoder::{BatchEncoder, EncodedPage, FieldEncoder}, EncodingsIo, }; @@ -65,7 +65,7 @@ async fn test_decode( expected: Option>, schedule_fn: impl FnOnce( DecodeBatchScheduler, - UnboundedSender>, + UnboundedSender, ) -> BoxFuture<'static, Result<()>>, ) { let decode_scheduler = DecodeBatchScheduler::new(schema, column_infos, &Vec::new()); @@ -243,7 +243,7 @@ async fn check_round_trip_encoding_inner( |mut decode_scheduler, tx| { async move { decode_scheduler - .schedule_range(0..num_rows, tx, &scheduler_copy) + .schedule_range(0..num_rows, tx, scheduler_copy) .await } .boxed() @@ -266,7 +266,7 @@ async fn check_round_trip_encoding_inner( &column_infos, expected, |mut decode_scheduler, tx| { - async move { decode_scheduler.schedule_range(range, tx, &scheduler).await }.boxed() + async move { decode_scheduler.schedule_range(range, tx, scheduler).await }.boxed() }, ) .await; @@ -299,7 +299,7 @@ async fn check_round_trip_encoding_inner( |mut decode_scheduler, tx| { async move { decode_scheduler - .schedule_take(&indices, tx, &scheduler) + .schedule_take(&indices, tx, scheduler) .await } .boxed() diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 0e103b7639..3ed755f563 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -602,7 +602,7 @@ impl FileReader { let scheduler = self.scheduler.clone() as Arc; tokio::task::spawn( - async move { decode_scheduler.schedule_range(range, tx, &scheduler).await }, + async move { decode_scheduler.schedule_range(range, tx, scheduler).await }, ); Ok(BatchDecodeStream::new(rx, batch_size, num_rows_to_read).into_stream()) @@ -636,7 +636,7 @@ impl FileReader { let scheduler = self.scheduler.clone() as Arc; tokio::task::spawn(async move { decode_scheduler - .schedule_take(&indices, tx, &scheduler) + .schedule_take(&indices, tx, scheduler) .await });