From 648adf89d47d068a947e23c16c414f796a6fd55c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 17 May 2024 15:09:38 -0700 Subject: [PATCH] fix: rework decoding to fix bugs in nested struct decoding (#2337) This PR modifies both scheduling and the decode stream. These modifications should mainly affect the logical encodings (list / struct). The changes to the other encodings are fairly minimal. In the scheduler we now track where we are in the field tree. This is both for debugging purposes (so we can log in trace messages the current path) and because we now need to send the path as part of the message we send to the decode stream. The decoder changes are more significant. Previously, we combined waiting for I/O and waiting for additional encoders into the same phase. This logic was more complex, but more importantly, it also assumed that the decoder could recreate the order in which the scheduler scheduled pages. For example, if we scan through the columns and encounter one that needs more data, then we grab exactly one page, and continue the column scan, grabbing the next page when we come by in another pass. This works in many cases but doesn't work in others. For example, the scheduler might schedule the many pages in a row for the same column if that column is wide or the page size is small. The decoder would get out of sync in these cases. Now, the logic is simpler, and more correct. The decoder first waits for enough scheduling to be done that a batch can be delivered. During this pass we drain encoders from the scheduler and insert them into the current decoders not by "current position in the decode" but by the path that is now included with the decoder. Once enough scheduling has been done we then wait for I/O. Once I/O is done we then drain the decoders in the same fashion we did before. --- rust/lance-encoding/src/decoder.rs | 310 ++++++++++++++++-- .../src/encodings/logical/binary.rs | 41 ++- .../src/encodings/logical/fixed_size_list.rs | 51 +-- .../src/encodings/logical/list.rs | 85 ++--- .../src/encodings/logical/primitive.rs | 30 +- .../src/encodings/logical/struct.rs | 298 ++++++++++------- .../src/encodings/physical/basic.rs | 14 +- rust/lance-encoding/src/testing.rs | 10 +- rust/lance-file/src/v2/reader.rs | 4 +- 9 files changed, 577 insertions(+), 266 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index a0098a24c7..372b339a17 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -196,6 +196,7 @@ //! * The "batch overhead" is very small in Lance compared to other formats because it has no //! relation to the way the data is stored. +use std::collections::VecDeque; use std::{ops::Range, sync::Arc}; use arrow_array::cast::AsArray; @@ -513,7 +514,8 @@ impl DecodeBatchScheduler { Self::create_field_scheduler(field.data_type(), &mut col_info_iter, buffers) }) .collect::>(); - let root_scheduler = SimpleStructScheduler::new(field_schedulers, schema.fields.clone()); + let root_scheduler = + SimpleStructScheduler::new_root(field_schedulers, schema.fields.clone()); Self { root_scheduler } } @@ -528,20 +530,17 @@ impl DecodeBatchScheduler { pub async fn schedule_range( &mut self, range: Range, - sink: mpsc::UnboundedSender>, - scheduler: &Arc, + sink: mpsc::UnboundedSender, + scheduler: Arc, ) -> Result<()> { let rows_to_read = range.end - range.start; trace!("Scheduling range {:?} ({} rows)", range, rows_to_read); let range = range.start as u32..range.end as u32; - self.root_scheduler.schedule_ranges( - &[range.clone()], - scheduler, - &sink, - range.start as u64, - )?; + let mut context = SchedulerContext::new(sink, scheduler); + self.root_scheduler + .schedule_ranges(&[range.clone()], &mut context, range.start as u64)?; trace!("Finished scheduling of range {:?}", range); Ok(()) @@ -557,8 +556,8 @@ impl DecodeBatchScheduler { pub async fn schedule_take( &mut self, indices: &[u64], - sink: mpsc::UnboundedSender>, - scheduler: &Arc, + sink: mpsc::UnboundedSender, + scheduler: Arc, ) -> Result<()> { debug_assert!(indices.windows(2).all(|w| w[0] < w[1])); if indices.is_empty() { @@ -578,8 +577,10 @@ impl DecodeBatchScheduler { } // TODO: Figure out how to handle u64 indices let indices = indices.iter().map(|i| *i as u32).collect::>(); + let mut context = SchedulerContext::new(sink, scheduler); + self.root_scheduler - .schedule_take(&indices, scheduler, &sink, indices[0] as u64)?; + .schedule_take(&indices, &mut context, indices[0] as u64)?; trace!("Finished scheduling take of {} rows", indices.len()); Ok(()) } @@ -592,10 +593,12 @@ pub struct ReadBatchTask { /// A stream that takes scheduled jobs and generates decode tasks from them. pub struct BatchDecodeStream { - scheduled: mpsc::UnboundedReceiver>, - current: Option>, + context: DecoderContext, + root_decoders: VecDeque>, rows_remaining: u64, rows_per_batch: u32, + rows_scheduled: u64, + rows_drained: u64, } impl BatchDecodeStream { @@ -610,23 +613,69 @@ impl BatchDecodeStream { /// * `num_rows` the total number of rows scheduled /// * `num_columns` the total number of columns in the file pub fn new( - scheduled: mpsc::UnboundedReceiver>, + scheduled: mpsc::UnboundedReceiver, rows_per_batch: u32, num_rows: u64, ) -> Self { Self { - scheduled, - current: None, + context: DecoderContext::new(scheduled), + root_decoders: VecDeque::new(), rows_remaining: num_rows, rows_per_batch, + rows_scheduled: 0, + rows_drained: 0, + } + } + + fn accept_decoder(&mut self, decoder: DecoderReady) -> Result<()> { + if decoder.path.is_empty() { + self.root_decoders.push_back(decoder.decoder); + } else { + let root = self + .root_decoders + .front_mut() + .ok_or_else(|| Error::Internal { + message: format!( + "A child decoder with path {:?} arrived before any root decoder", + decoder.path + ), + location: location!(), + })?; + root.accept_child(decoder)?; } + Ok(()) + } + + async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<()> { + while self.rows_scheduled < scheduled_need { + let next_message = self.context.source.recv().await; + match next_message { + Some(DecoderMessage::ScanLine(rows_scheduled)) => { + self.rows_scheduled = rows_scheduled; + } + Some(DecoderMessage::Decoder(decoder)) => { + self.accept_decoder(decoder)?; + } + None => { + return Err(Error::Internal { + message: + "The scheduler finished while the decoder was still waiting for input" + .to_string(), + location: location!(), + }); + } + } + } + Ok(()) } #[instrument(level = "debug", skip_all)] async fn next_batch_task(&mut self) -> Result> { trace!( - "Draining batch task (rows_remaining={})", - self.rows_remaining + "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})", + self.rows_remaining, + self.rows_drained, + self.rows_scheduled, ); if self.rows_remaining == 0 { return Ok(None); @@ -635,20 +684,34 @@ impl BatchDecodeStream { let to_take = self.rows_remaining.min(self.rows_per_batch as u64) as u32; self.rows_remaining -= to_take as u64; - if self.current.is_none() { - trace!("Loading new top-level page"); - self.current = Some(self.scheduled.recv().await.unwrap()); + let scheduled_need = + (self.rows_drained + to_take as u64).saturating_sub(self.rows_scheduled); + if scheduled_need > 0 { + let desired_scheduled = scheduled_need + self.rows_scheduled; + trace!( + "Draining from scheduler (desire at least {} scheduled rows)", + desired_scheduled + ); + self.wait_for_scheduled(desired_scheduled).await?; } - let current = self.current.as_mut().unwrap(); + + let current = self + .root_decoders + .front_mut() + .ok_or_else(|| Error::Internal { + message: "the scheduler never emitted a top-level decoder".into(), + location: location!(), + })?; let avail = current.avail(); trace!("Top level page has {} rows already available", avail); if avail < to_take { - current.wait(to_take, &mut self.scheduled).await?; + current.wait(to_take).await?; } let next_task = current.drain(to_take)?; if !next_task.has_more { - self.current = None; + self.root_decoders.pop_front(); } + self.rows_drained += to_take as u64; Ok(Some(next_task)) } @@ -760,6 +823,132 @@ pub trait PhysicalPageScheduler: Send + Sync + std::fmt::Debug { ) -> BoxFuture<'static, Result>>; } +/// Contains the context for a scheduler +pub struct SchedulerContext { + /// The sink that sends decodeable tasks to the decode stage + pub(crate) sink: mpsc::UnboundedSender, + recv: Option>, + io: Arc, + name: String, + path: Vec, + path_names: Vec, +} + +pub struct ScopedSchedulerContext<'a> { + pub context: &'a mut SchedulerContext, +} + +impl<'a> ScopedSchedulerContext<'a> { + pub fn pop(self) -> &'a mut SchedulerContext { + self.context.pop(); + self.context + } +} + +impl SchedulerContext { + pub fn new(sink: mpsc::UnboundedSender, io: Arc) -> Self { + Self { + sink, + io, + recv: None, + name: "".to_string(), + path: Vec::new(), + path_names: Vec::new(), + } + } + + pub fn io(&self) -> &dyn EncodingsIo { + self.io.as_ref() + } + + pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext { + self.path.push(index); + self.path_names.push(name.to_string()); + ScopedSchedulerContext { context: self } + } + + pub fn pop(&mut self) { + self.path.pop(); + self.path_names.pop(); + } + + pub fn path_name(&self) -> String { + let path = self.path_names.join("/"); + if self.recv.is_some() { + format!("TEMP({}){}", self.name, path) + } else { + format!("ROOT{}", path) + } + } + + pub fn emit(&mut self, decoder: Box) { + trace!( + "Scheduling decoder of type {:?} for {:?}", + decoder.data_type(), + self.path, + ); + self.sink + .send(DecoderMessage::Decoder(DecoderReady { + decoder, + path: VecDeque::from_iter(self.path.iter().copied()), + })) + .unwrap(); + } + + // Temporary might not be the best name for this. We create a new context that + // shared the same I/O scheduler and has a different name. This is used in two + // situations. + // + // 1. When we need to create a new indirect I/O phase. + // 2. When we need to wrap a set of decoders and so we want to intercept them + // before they are emitted. + pub fn temporary(&self) -> Self { + let (tx, rx) = unbounded_channel(); + let mut name = self.name.clone(); + name.push_str(&self.path_names.join("/")); + Self { + sink: tx, + io: self.io.clone(), + recv: Some(rx), + name, + path: Vec::new(), + path_names: Vec::new(), + } + } + + // Consumes the temporary context returning the decoder messages + // + // Used when the temporary context is used to create a new indirect I/O phase + // where all the messages need to be replayed + pub fn into_messages(self) -> Vec { + let mut recv = self + .recv + .expect("Call to `finish` on a non-temporary scheduler context"); + let mut decoders = Vec::new(); + while let Ok(decoder) = recv.try_recv() { + decoders.push(decoder); + } + decoders + } + + // Consumes the temporary context returning only the decoders + // + // Used when the temporary context is used to wrap a set of decoders + pub fn into_decoders(self) -> Vec> { + self.into_messages() + .into_iter() + .filter_map(|msg| { + match msg { + DecoderMessage::ScanLine(_) => None, + // Should we ignore path here? Currently, all "wrapping" layers should not have + // children and so there should be no path. We could maybe debug_assert this. + DecoderMessage::Decoder(decoder_ready) => Some(decoder_ready.decoder), + } + }) + .collect::>() + } +} + /// A scheduler for a field's worth of data /// /// Each page of incoming data maps to one `LogicalPageScheduler` instance. However, this @@ -791,8 +980,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug { fn schedule_ranges( &self, ranges: &[Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()>; /// Schedules I/O for the requested rows (identified by row offsets from start of page) @@ -800,8 +988,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug { fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()>; /// The number of rows covered by this page @@ -824,6 +1011,48 @@ pub struct NextDecodeTask { pub has_more: bool, } +pub struct DecoderReady { + // The decoder that is ready to be decoded + pub decoder: Box, + // The path to the decoder, the first value is the column index + // following values, if present, are nested child indices + // + // For example, a path of [1, 1, 0] would mean to grab the second + // column, then the second child, and then the first child. + // + // It could represent x in the following schema: + // + // score: float64 + // points: struct + // color: string + // location: struct + // x: float64 + // + // Currently, only struct decoders have "children" although other + // decoders may at some point as well. List children are only + // handled through indirect I/O at the moment and so they don't + // need to be represented (yet) + pub path: VecDeque, +} + +pub enum DecoderMessage { + // Emitted whenever the scheduler has made another pass through the columns. + // Contains the number of rows that have been scheduled so far. + ScanLine(u64), + // Emitted whenever a decoder has been emitted + Decoder(DecoderReady), +} + +pub struct DecoderContext { + source: mpsc::UnboundedReceiver, +} + +impl DecoderContext { + pub fn new(source: mpsc::UnboundedReceiver) -> Self { + Self { source } + } +} + /// A decoder for a field's worth of data /// /// The decoder is initially "unloaded" (doesn't have all its data). The [`Self::wait`] @@ -833,18 +1062,29 @@ pub struct NextDecodeTask { /// Unlike the other decoder types it is assumed that `LogicalPageDecoder` is stateful /// and only `Send`. This is why we don't need a `rows_to_skip` argument in [`Self::drain`] pub trait LogicalPageDecoder: std::fmt::Debug + Send { + /// Add a newly scheduled child decoder + /// + /// The default implementation does not expect children and returns + /// an error. + fn accept_child(&mut self, _child: DecoderReady) -> Result<()> { + Err(Error::Internal { + message: format!( + "The decoder {:?} does not expect children but received a child", + self + ), + location: location!(), + }) + } /// Waits for enough data to be loaded to decode `num_rows` of data - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>>; + fn wait(&mut self, num_rows: u32) -> BoxFuture>; /// Creates a task to decode `num_rows` of data into an array fn drain(&mut self, num_rows: u32) -> Result; /// The number of rows that are in the page but haven't yet been "waited" fn unawaited(&self) -> u32; /// The number of rows that have been "waited" but not yet decoded fn avail(&self) -> u32; + /// The data type of the decoded data + fn data_type(&self) -> &DataType; } /// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`] @@ -854,7 +1094,7 @@ pub async fn decode_batch(batch: &EncodedBatch) -> Result { let (tx, rx) = unbounded_channel(); let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc; decode_scheduler - .schedule_range(0..batch.num_rows, tx, &io_scheduler) + .schedule_range(0..batch.num_rows, tx, io_scheduler) .await?; let stream = BatchDecodeStream::new(rx, batch.num_rows as u32, batch.num_rows); stream.into_stream().next().await.unwrap().task.await diff --git a/rust/lance-encoding/src/encodings/logical/binary.rs b/rust/lance-encoding/src/encodings/logical/binary.rs index a71037780e..e927d2bc87 100644 --- a/rust/lance-encoding/src/encodings/logical/binary.rs +++ b/rust/lance-encoding/src/encodings/logical/binary.rs @@ -11,12 +11,14 @@ use arrow_array::{ use arrow_buffer::ScalarBuffer; use arrow_schema::{DataType, Field}; -use futures::{future::BoxFuture, FutureExt}; +use futures::future::BoxFuture; use lance_core::Result; use log::trace; use crate::{ - decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask}, + decoder::{ + DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext, + }, encoder::{EncodeTask, FieldEncoder}, }; @@ -45,21 +47,20 @@ impl LogicalPageScheduler for BinaryPageScheduler { fn schedule_ranges( &self, ranges: &[std::ops::Range], - scheduler: &Arc, - sink: &tokio::sync::mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { trace!("Scheduling binary for {} ranges", ranges.len()); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let mut temp_context = context.temporary(); self.varbin_scheduler - .schedule_ranges(ranges, scheduler, &tx, top_level_row)?; + .schedule_ranges(ranges, &mut temp_context, top_level_row)?; - while let Some(decoder) = rx.recv().now_or_never() { - let wrapped = BinaryPageDecoder { - inner: decoder.unwrap(), + for decoder in temp_context.into_decoders() { + let decoder = Box::new(BinaryPageDecoder { + inner: decoder, data_type: self.data_type.clone(), - }; - sink.send(Box::new(wrapped)).unwrap(); + }); + context.emit(decoder); } Ok(()) @@ -68,8 +69,7 @@ impl LogicalPageScheduler for BinaryPageScheduler { fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &tokio::sync::mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { trace!("Scheduling binary for {} indices", indices.len()); @@ -78,8 +78,7 @@ impl LogicalPageScheduler for BinaryPageScheduler { .iter() .map(|&idx| idx..(idx + 1)) .collect::>(), - scheduler, - sink, + context, top_level_row, ) } @@ -96,12 +95,8 @@ pub struct BinaryPageDecoder { } impl LogicalPageDecoder for BinaryPageDecoder { - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut tokio::sync::mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { - self.inner.wait(num_rows, source) + fn wait(&mut self, num_rows: u32) -> BoxFuture> { + self.inner.wait(num_rows) } fn drain(&mut self, num_rows: u32) -> Result { @@ -123,6 +118,10 @@ impl LogicalPageDecoder for BinaryPageDecoder { fn avail(&self) -> u32 { self.inner.avail() } + + fn data_type(&self) -> &DataType { + &self.data_type + } } pub struct BinaryArrayDecoder { diff --git a/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs b/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs index 44ead7fa63..caf17a2fc5 100644 --- a/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs +++ b/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs @@ -4,14 +4,12 @@ use std::{ops::Range, sync::Arc}; use arrow_array::{ArrayRef, FixedSizeListArray}; -use arrow_schema::Field; +use arrow_schema::{DataType, Field}; use futures::future::BoxFuture; use log::trace; -use tokio::sync::mpsc; -use crate::{ - decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask}, - EncodingsIo, +use crate::decoder::{ + DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext, }; use lance_core::Result; @@ -51,8 +49,7 @@ impl LogicalPageScheduler for FslPageScheduler { fn schedule_ranges( &self, ranges: &[Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { let expanded_ranges = ranges @@ -63,23 +60,27 @@ impl LogicalPageScheduler for FslPageScheduler { "Scheduling expanded ranges {:?} from items scheduler", expanded_ranges ); - let (tx, mut rx) = mpsc::unbounded_channel(); + let mut temp_context = context.temporary(); self.items_scheduler - .schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?; - let inner_page_decoder = rx.blocking_recv().unwrap(); - sink.send(Box::new(FslPageDecoder { - inner: inner_page_decoder, - dimension: self.dimension, - })) - .unwrap(); + .schedule_ranges(&expanded_ranges, &mut temp_context, top_level_row)?; + for decoder in temp_context.into_decoders() { + let data_type = DataType::FixedSizeList( + Arc::new(Field::new("item", decoder.data_type().clone(), true)), + self.dimension as i32, + ); + context.emit(Box::new(FslPageDecoder { + inner: decoder, + dimension: self.dimension, + data_type, + })); + } Ok(()) } fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { self.schedule_ranges( @@ -87,8 +88,7 @@ impl LogicalPageScheduler for FslPageScheduler { .iter() .map(|&idx| idx..(idx + 1)) .collect::>(), - scheduler, - sink, + context, top_level_row, ) } @@ -102,15 +102,12 @@ impl LogicalPageScheduler for FslPageScheduler { struct FslPageDecoder { inner: Box, dimension: u32, + data_type: DataType, } impl LogicalPageDecoder for FslPageDecoder { - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { - self.inner.wait(num_rows * self.dimension, source) + fn wait(&mut self, num_rows: u32) -> BoxFuture> { + self.inner.wait(num_rows * self.dimension) } fn unawaited(&self) -> u32 { @@ -136,6 +133,10 @@ impl LogicalPageDecoder for FslPageDecoder { fn avail(&self) -> u32 { self.inner.avail() / self.dimension } + + fn data_type(&self) -> &DataType { + &self.data_type + } } struct FslDecodeTask { diff --git a/rust/lance-encoding/src/encodings/logical/list.rs b/rust/lance-encoding/src/encodings/logical/list.rs index 912be8bcb2..9fe3fdfef3 100644 --- a/rust/lance-encoding/src/encodings/logical/list.rs +++ b/rust/lance-encoding/src/encodings/logical/list.rs @@ -14,15 +14,16 @@ use arrow_schema::{DataType, Field}; use futures::{future::BoxFuture, FutureExt}; use log::trace; use snafu::{location, Location}; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::task::JoinHandle; use lance_core::{Error, Result}; use crate::{ - decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask}, + decoder::{ + DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext, + }, encoder::{ArrayEncoder, EncodeTask, EncodedArray, EncodedPage, FieldEncoder}, format::pb, - EncodingsIo, }; use super::primitive::{AccumulationQueue, PrimitiveFieldEncoder}; @@ -225,8 +226,7 @@ impl LogicalPageScheduler for ListPageScheduler { fn schedule_ranges( &self, ranges: &[std::ops::Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { // TODO: Shortcut here if the request covers the entire range (can be determined by @@ -257,13 +257,23 @@ impl LogicalPageScheduler for ListPageScheduler { trace!("Scheduling list offsets ranges: {:?}", offsets_ranges); // Create a channel for the internal schedule / decode loop that is unique // to this page. - let (tx, mut rx) = mpsc::unbounded_channel(); + let mut temporary = context.temporary(); self.offsets_scheduler - .schedule_ranges(&offsets_ranges, scheduler, &tx, top_level_row)?; - let mut scheduled_offsets = rx.try_recv().unwrap(); + .schedule_ranges(&offsets_ranges, &mut temporary, top_level_row)?; + let offset_decoders = temporary.into_decoders(); + let num_offset_decoders = offset_decoders.len(); + let mut scheduled_offsets = + offset_decoders + .into_iter() + .next() + .ok_or_else(|| Error::Internal { + message: format!("scheduling offsets yielded {} pages", num_offset_decoders), + location: location!(), + })?; let items_schedulers = self.items_schedulers.clone(); let ranges = ranges.to_vec(); - let scheduler = scheduler.clone(); + + let mut indirect_context = context.temporary(); // First we schedule, as normal, the I/O for the offsets. Then we immediately spawn // a task to decode those offsets and schedule the I/O for the items AND wait for @@ -273,14 +283,12 @@ impl LogicalPageScheduler for ListPageScheduler { let indirect_fut = tokio::task::spawn(async move { // We know the offsets are a primitive array and thus will not need additional // pages. We can use a dummy receiver to match the decoder API - let (_, mut dummy_rx) = mpsc::unbounded_channel(); - scheduled_offsets.wait(num_rows, &mut dummy_rx).await?; + scheduled_offsets.wait(num_rows).await?; let decode_task = scheduled_offsets.drain(num_offsets)?; let offsets = decode_task.task.decode()?; let (mut item_ranges, offsets, validity) = Self::decode_offsets(offsets.as_ref(), &ranges, null_offset_adjustment); - let (tx, mut rx) = mpsc::unbounded_channel(); trace!( "Indirectly scheduling items ranges {:?} from {} list items pages", @@ -327,8 +335,7 @@ impl LogicalPageScheduler for ListPageScheduler { // that way. Still, this is probably good enough for a while next_scheduler.schedule_ranges( &next_item_ranges, - &scheduler, - &tx, + &mut indirect_context, top_level_row, )?; next_item_ranges.clear(); @@ -355,8 +362,7 @@ impl LogicalPageScheduler for ListPageScheduler { if !next_item_ranges.is_empty() { next_scheduler.schedule_ranges( &next_item_ranges, - &scheduler, - &tx, + &mut indirect_context, top_level_row, )?; next_item_ranges.clear(); @@ -367,16 +373,13 @@ impl LogicalPageScheduler for ListPageScheduler { if !next_item_ranges.is_empty() { next_scheduler.schedule_ranges( &next_item_ranges, - &scheduler, - &tx, + &mut indirect_context, top_level_row, )?; } - let mut item_decoders = Vec::new(); - drop(tx); - while let Some(item_decoder) = rx.recv().await { - item_decoders.push(item_decoder); - } + // TODO: Should probably use into_messages here. Can figure out once we add + // tests for List> + let item_decoders = indirect_context.into_decoders(); Ok(IndirectlyLoaded { offsets, @@ -384,7 +387,16 @@ impl LogicalPageScheduler for ListPageScheduler { item_decoders, }) }); - sink.send(Box::new(ListPageDecoder { + let data_type = match &self.offset_type { + DataType::Int32 => { + DataType::List(Arc::new(Field::new("item", self.items_type.clone(), true))) + } + DataType::Int64 => { + DataType::LargeList(Arc::new(Field::new("item", self.items_type.clone(), true))) + } + _ => panic!("Unexpected offset type {}", self.offset_type), + }; + context.emit(Box::new(ListPageDecoder { offsets: Vec::new(), validity: BooleanBuffer::new(Buffer::from_vec(Vec::::default()), 0, 0), unawaited_item_decoders: VecDeque::new(), @@ -395,8 +407,8 @@ impl LogicalPageScheduler for ListPageScheduler { unloaded: Some(indirect_fut), items_type: self.items_type.clone(), offset_type: self.offset_type.clone(), - })) - .unwrap(); + data_type, + })); Ok(()) } @@ -407,8 +419,7 @@ impl LogicalPageScheduler for ListPageScheduler { fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { trace!("Scheduling list offsets for {} indices", indices.len()); @@ -417,8 +428,7 @@ impl LogicalPageScheduler for ListPageScheduler { .iter() .map(|&idx| idx..(idx + 1)) .collect::>(), - scheduler, - sink, + context, top_level_row, ) } @@ -448,6 +458,7 @@ struct ListPageDecoder { rows_drained: u32, items_type: DataType, offset_type: DataType, + data_type: DataType, } struct ListDecodeTask { @@ -517,11 +528,7 @@ impl DecodeArrayTask for ListDecodeTask { } impl LogicalPageDecoder for ListPageDecoder { - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { + fn wait(&mut self, num_rows: u32) -> BoxFuture> { async move { // wait for the indirect I/O to finish, then wait for enough items to arrive if self.unloaded.is_some() { @@ -574,7 +581,7 @@ impl LogicalPageDecoder for ListPageDecoder { to_await ); if to_await > 0 { - last_decoder.wait(to_await, source).await?; + last_decoder.wait(to_await).await?; items_needed = items_needed.saturating_sub(last_decoder.avail() as u64); } } @@ -588,7 +595,7 @@ impl LogicalPageDecoder for ListPageDecoder { ); let to_await = items_needed.min(unawaited) as u32; // TODO: Seems like this will fail in List case - next_item_decoder.wait(to_await, source).await?; + next_item_decoder.wait(to_await).await?; // Might end up loading more items than needed so use saturating_sub items_needed = items_needed.saturating_sub(next_item_decoder.avail() as u64); self.awaited_item_decoders.push_back(next_item_decoder); @@ -674,6 +681,10 @@ impl LogicalPageDecoder for ListPageDecoder { fn avail(&self) -> u32 { self.lists_available } + + fn data_type(&self) -> &DataType { + &self.data_type + } } struct IndirectlyLoaded { diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index c6291650d8..9d5176259c 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -25,19 +25,17 @@ use log::{debug, trace}; use snafu::{location, Location}; use lance_core::{Error, Result}; -use tokio::sync::mpsc; use crate::{ decoder::{ DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, PageInfo, - PhysicalPageDecoder, PhysicalPageScheduler, + PhysicalPageDecoder, PhysicalPageScheduler, SchedulerContext, }, encoder::{ArrayEncoder, EncodeTask, EncodedPage, FieldEncoder}, encodings::physical::{ basic::BasicEncoder, decoder_from_array_encoding, fixed_size_list::FslEncoder, value::ValueEncoder, ColumnBuffers, PageBuffers, }, - EncodingsIo, }; /// A page scheduler for primitive fields @@ -78,15 +76,13 @@ impl LogicalPageScheduler for PrimitivePageScheduler { fn schedule_ranges( &self, ranges: &[std::ops::Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { let num_rows = ranges.iter().map(|r| r.end - r.start).sum(); - trace!("Scheduling ranges {:?} from physical page", ranges); let physical_decoder = self.physical_decoder - .schedule_ranges(ranges, scheduler.as_ref(), top_level_row); + .schedule_ranges(ranges, context.io(), top_level_row); let logical_decoder = PrimitiveFieldDecoder { data_type: self.data_type.clone(), @@ -96,15 +92,14 @@ impl LogicalPageScheduler for PrimitivePageScheduler { num_rows, }; - sink.send(Box::new(logical_decoder)).unwrap(); + context.emit(Box::new(logical_decoder)); Ok(()) } fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { trace!( @@ -116,8 +111,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler { .iter() .map(|&idx| idx..(idx + 1)) .collect::>(), - scheduler, - sink, + context, top_level_row, ) } @@ -392,11 +386,9 @@ impl PrimitiveFieldDecodeTask { } impl LogicalPageDecoder for PrimitiveFieldDecoder { - fn wait<'a>( - &'a mut self, - _: u32, - _: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { + // TODO: In the future, at some point, we may consider partially waiting for primitive pages by + // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode" + fn wait(&mut self, _: u32) -> BoxFuture> { async move { let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?; self.physical_decoder = Some(Arc::from(physical_decoder)); @@ -440,6 +432,10 @@ impl LogicalPageDecoder for PrimitiveFieldDecoder { self.num_rows - self.rows_drained } } + + fn data_type(&self) -> &DataType { + &self.data_type + } } #[derive(Debug)] diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 1672cf619b..055544ae22 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -4,18 +4,20 @@ use std::{collections::VecDeque, ops::Range, sync::Arc}; use arrow_array::{cast::AsArray, ArrayRef, StructArray}; -use arrow_schema::Fields; +use arrow_schema::{DataType, Fields}; use futures::{future::BoxFuture, FutureExt}; use log::trace; -use tokio::sync::mpsc; +use snafu::{location, Location}; use crate::{ - decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask}, + decoder::{ + DecodeArrayTask, DecoderReady, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, + SchedulerContext, + }, encoder::{EncodeTask, EncodedArray, EncodedPage, FieldEncoder}, format::pb, - EncodingsIo, }; -use lance_core::Result; +use lance_core::{Error, Result}; /// A scheduler for structs /// @@ -32,10 +34,16 @@ pub struct SimpleStructScheduler { children: Vec>>, child_fields: Fields, num_rows: u32, + // True if this is the top-level decoder (and we should send scan line messages) + is_root: bool, } impl SimpleStructScheduler { - pub fn new(children: Vec>>, child_fields: Fields) -> Self { + fn new_with_params( + children: Vec>>, + child_fields: Fields, + is_root: bool, + ) -> Self { debug_assert!(!children.is_empty()); let num_rows = children[0].iter().map(|page| page.num_rows()).sum(); // Ensure that all the children have the same number of rows @@ -43,8 +51,20 @@ impl SimpleStructScheduler { children, child_fields, num_rows, + is_root, } } + + pub fn new(children: Vec>>, child_fields: Fields) -> Self { + Self::new_with_params(children, child_fields, false) + } + + pub fn new_root( + children: Vec>>, + child_fields: Fields, + ) -> Self { + Self::new_with_params(children, child_fields, true) + } } // As we schedule a range we keep one of these per column so that we know @@ -108,8 +128,7 @@ impl LogicalPageScheduler for SimpleStructScheduler { fn schedule_ranges( &self, ranges: &[Range], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + mut context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { for range in ranges.iter().cloned() { @@ -125,11 +144,10 @@ impl LogicalPageScheduler for SimpleStructScheduler { // // This will need to get a tiny bit more complicated once structs have their own nullability and that nullability // information starts to span multiple pages. - sink.send(Box::new(SimpleStructDecoder::new( + context.emit(Box::new(SimpleStructDecoder::new( self.child_fields.clone(), rows_to_read, - ))) - .unwrap(); + ))); let mut field_status = vec![RangeFieldWalkStatus::new_from_range(range); self.children.len()]; @@ -180,18 +198,18 @@ impl LogicalPageScheduler for SimpleStructScheduler { let page_range = page_range_start..(page_range_start + rows_to_take); trace!( - "Taking {} rows from column {} starting at page offset {} from page {:?}", + "Taking {} rows from column {} starting at page offset {}", rows_to_take, col_idx, page_range_start, - next_page ); + let scope = context.push(self.child_fields[col_idx].name(), col_idx as u32); next_page.schedule_ranges( &[page_range], - scheduler, - sink, + scope.context, current_top_level_row, )?; + context = scope.pop(); status.rows_queued += rows_to_take; status.rows_to_take -= rows_to_take; @@ -200,6 +218,11 @@ impl LogicalPageScheduler for SimpleStructScheduler { min_rows_added = min_rows_added.min(rows_to_take); } else { + trace!( + "Using {} queued rows for column {}", + col_idx, + status.rows_queued + ); min_rows_added = min_rows_added.min(status.rows_queued); } } @@ -208,6 +231,18 @@ impl LogicalPageScheduler for SimpleStructScheduler { } rows_to_read -= min_rows_added; current_top_level_row += min_rows_added as u64; + if self.is_root { + trace!( + "Scheduler scan complete ({} rows now scheduled)", + current_top_level_row - top_level_row + ); + context + .sink + .send(crate::decoder::DecoderMessage::ScanLine( + current_top_level_row - top_level_row, + )) + .unwrap(); + } for field_status in &mut field_status { field_status.rows_queued -= min_rows_added; } @@ -223,22 +258,24 @@ impl LogicalPageScheduler for SimpleStructScheduler { fn schedule_take( &self, indices: &[u32], - scheduler: &Arc, - sink: &mpsc::UnboundedSender>, + mut context: &mut SchedulerContext, top_level_row: u64, ) -> Result<()> { - trace!("Scheduling struct decode of {} indices", indices.len()); + trace!( + "Scheduling struct decode of {} indices with top_level_row={}", + indices.len(), + top_level_row + ); // Before we do anything, send a struct decoder to the decode thread so it can start decoding the pages // we are about to send. // // This will need to get a tiny bit more complicated once structs have their own nullability and that nullability // information starts to span multiple pages. - sink.send(Box::new(SimpleStructDecoder::new( + context.emit(Box::new(SimpleStructDecoder::new( self.child_fields.clone(), indices.len() as u32, - ))) - .unwrap(); + ))); // Create a cursor into indices for each column let mut field_status = @@ -283,12 +320,13 @@ impl LogicalPageScheduler for SimpleStructScheduler { // We should be guaranteed to get at least one page let next_page = next_page.unwrap(); + let scope = context.push(self.child_fields[col_idx].name(), col_idx as u32); next_page.schedule_take( &indices_in_page, - scheduler, - sink, + scope.context, current_top_level_row, )?; + context = scope.pop(); let rows_scheduled = indices_in_page.len() as u32; status.rows_queued += rows_scheduled; @@ -296,6 +334,11 @@ impl LogicalPageScheduler for SimpleStructScheduler { min_rows_added = min_rows_added.min(rows_scheduled); } else { // TODO: Unit tests are not covering this path right now + trace!( + "Using {} already queued rows for column {}", + status.rows_queued, + col_idx + ); min_rows_added = min_rows_added.min(status.rows_queued); } } @@ -308,6 +351,18 @@ impl LogicalPageScheduler for SimpleStructScheduler { ); rows_to_read -= min_rows_added; current_top_level_row += min_rows_added as u64; + if self.is_root { + trace!( + "Scheduler scan complete ({} rows now scheduled)", + current_top_level_row - top_level_row + ); + context + .sink + .send(crate::decoder::DecoderMessage::ScanLine( + current_top_level_row - top_level_row, + )) + .unwrap(); + } for field_status in &mut field_status { field_status.rows_queued -= min_rows_added; } @@ -318,18 +373,17 @@ impl LogicalPageScheduler for SimpleStructScheduler { #[derive(Debug)] struct ChildState { - // As we decode a column we pull pages out of the channel source and into - // a queue for that column. Since we await as soon as we pull the page from - // the source there is no need for a separate unawaited queue. + // As child decoders are scheduled they are added to this queue + // Once the decoder is fully drained it is popped from this queue // - // Technically though, these pages are only "partially awaited" + // TODO: It may be a minor perf optimization, in some rare cases, if we have a separate + // "fully awaited but not yet drained" queue so we don't loop through fully awaited pages + // during each call to wait. // // Note: This queue may have more than one page in it if the batch size is very large // or pages are very small // TODO: Test this case - // - // Then we drain this queue pages as we decode. - awaited: VecDeque>, + scheduled: VecDeque>, // Rows that should still be coming over the channel source but haven't yet been // put into the awaited queue rows_unawaited: u32, @@ -368,21 +422,15 @@ impl CompositeDecodeTask { impl ChildState { fn new(num_rows: u32, field_index: u32) -> Self { Self { - awaited: VecDeque::new(), + scheduled: VecDeque::new(), rows_unawaited: num_rows, rows_available: 0, field_index, } } - // Wait for the next set of rows to arrive. Return true if finished. Return - // false if more rows are still needed (we can only wait one page at a time - // because we need to move in row-major fashion) - async fn wait_next( - &mut self, - num_rows: u32, - source: &mut mpsc::UnboundedReceiver>, - ) -> Result { + // Wait for the next set of rows to arrive. + async fn wait(&mut self, num_rows: u32) -> Result<()> { trace!( "Struct child {} waiting for {} rows and {} are available already", self.field_index, @@ -390,68 +438,41 @@ impl ChildState { self.rows_available ); let mut remaining = num_rows.saturating_sub(self.rows_available); - if remaining > 0 { - trace!( - "Struct must await {} rows from {} unawaited rows", - remaining, - self.rows_unawaited - ); - if let Some(back) = self.awaited.back_mut() { - if back.unawaited() > 0 { - let rows_to_wait = remaining.min(back.unawaited()); - trace!( - "Struct await an additional {} rows from the current page", - rows_to_wait - ); - // Even though we wait for X rows we might actually end up - // loading more than that - let previously_avail = back.avail(); - back.wait(rows_to_wait, source).await?; - let newly_avail = back.avail() - previously_avail; - trace!("The await loaded {} rows", newly_avail); - self.rows_available += newly_avail; - // Need to use saturating_sub here because we might have asked for range - // 0-1000 and this page we just loaded might cover 900-1100 and so newly_avail - // is 200 but rows_unawaited is only 100 - // - // TODO: Unit tests are not covering this branch right now - self.rows_unawaited = self.rows_unawaited.saturating_sub(newly_avail); - remaining -= rows_to_wait; - if remaining == 0 { - return Ok(true); - } + for next_decoder in &mut self.scheduled { + if next_decoder.unawaited() > 0 { + let rows_to_wait = remaining.min(next_decoder.unawaited()); + trace!( + "Struct await an additional {} rows from the current page", + rows_to_wait + ); + // Even though we wait for X rows we might actually end up + // loading more than that + let previously_avail = next_decoder.avail(); + // We might only await part of a page. This is important for things + // like the struct> case where we have one outer page, one + // middle page, and then a bunch of inner pages. If we await the entire + // middle page then we will have to wait for all the inner pages to arrive + // before we can start decoding. + next_decoder.wait(rows_to_wait).await?; + let newly_avail = next_decoder.avail() - previously_avail; + trace!("The await loaded {} rows", newly_avail); + self.rows_available += newly_avail; + // Need to use saturating_sub here because we might have asked for range + // 0-1000 and this page we just loaded might cover 900-1100 and so newly_avail + // is 200 but rows_unawaited is only 100 + // + // TODO: Unit tests may not be covering this branch right now + self.rows_unawaited = self.rows_unawaited.saturating_sub(newly_avail); + remaining -= rows_to_wait; + if remaining == 0 { + break; } } - - // Because we schedule in row-major fashion we know the next page - // will belong to this column. - let mut decoder = source.recv().await.unwrap(); - let could_await = decoder.unawaited(); - let rows_to_wait = remaining.min(could_await); - trace!( - "Struct received new page and awaiting {} rows out of {}", - rows_to_wait, - could_await - ); - // We might only await part of a page. This is important for things - // like the struct> case where we have one outer page, one - // middle page, and then a bunch of inner pages. If we await the entire - // middle page then we will have to wait for all the inner pages to arrive - // before we can start decoding. - // - // TODO: test this case - let previously_avail = decoder.avail(); - decoder.wait(rows_to_wait, source).await?; - // It's possible that we loaded more rows than asked for so need to calculate - // newly_avail this way (we do this above too) - let newly_avail = decoder.avail() - previously_avail; - self.awaited.push_back(decoder); - trace!("The new await loaded {} rows", newly_avail); - self.rows_available += newly_avail; - self.rows_unawaited = self.rows_unawaited.saturating_sub(newly_avail); - Ok(remaining == rows_to_wait) + } + if remaining > 0 { + Err(Error::Internal { message: format!("The struct field at index {} is still waiting for {} rows but ran out of scheduled pages", self.field_index, remaining), location: location!() }) } else { - Ok(true) + Ok(()) } } @@ -467,12 +488,12 @@ impl ChildState { has_more: true, }; while remaining > 0 { - let next = self.awaited.front_mut().unwrap(); + let next = self.scheduled.front_mut().unwrap(); let rows_to_take = remaining.min(next.avail()); let next_task = next.drain(rows_to_take)?; if next.avail() == 0 && next.unawaited() == 0 { trace!("Completely drained page"); - self.awaited.pop_front(); + self.scheduled.pop_front(); } remaining -= rows_to_take; composite.tasks.push(next_task.task); @@ -487,10 +508,12 @@ impl ChildState { struct SimpleStructDecoder { children: Vec, child_fields: Fields, + data_type: DataType, } impl SimpleStructDecoder { fn new(child_fields: Fields, num_rows: u32) -> Self { + let data_type = DataType::Struct(child_fields.clone()); Self { children: child_fields .iter() @@ -498,27 +521,32 @@ impl SimpleStructDecoder { .map(|(idx, _)| ChildState::new(num_rows, idx as u32)) .collect(), child_fields, + data_type, } } } impl LogicalPageDecoder for SimpleStructDecoder { - fn wait<'a>( - &'a mut self, - num_rows: u32, - source: &'a mut mpsc::UnboundedReceiver>, - ) -> BoxFuture<'a, Result<()>> { + fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> { + // children with empty path should not be delivered to this method + let child_idx = child.path.pop_front().unwrap(); + if child.path.is_empty() { + // This decoder is intended for us + self.children[child_idx as usize] + .scheduled + .push_back(child.decoder); + } else { + // This decoder is intended for one of our children + let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?; + intended.accept_child(child)?; + } + Ok(()) + } + + fn wait(&mut self, num_rows: u32) -> BoxFuture> { async move { - // This is basically the inverse of the row-major scheduling algorithm - let mut remaining = Vec::from_iter(self.children.iter_mut()); - while !remaining.is_empty() { - let mut next_remaining = Vec::new(); - for child in remaining { - if !child.wait_next(num_rows, source).await? { - next_remaining.push(child); - } - } - remaining = next_remaining; + for child in self.children.iter_mut() { + child.wait(num_rows).await?; } Ok(()) } @@ -562,6 +590,10 @@ impl LogicalPageDecoder for SimpleStructDecoder { .max() .unwrap() } + + fn data_type(&self) -> &DataType { + &self.data_type + } } struct SimpleStructDecodeTask { @@ -677,6 +709,42 @@ mod tests { check_round_trip_encoding_random(field).await; } + #[test_log::test(tokio::test)] + async fn test_struct_list() { + let data_type = DataType::Struct(Fields::from(vec![ + Field::new( + "inner_list", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + Field::new("outer_int", DataType::Int32, true), + ])); + let field = Field::new("row", data_type, false); + check_round_trip_encoding_random(field).await; + } + + #[test_log::test(tokio::test)] + async fn test_complicated_struct() { + let data_type = DataType::Struct(Fields::from(vec![ + Field::new("int", DataType::Int32, true), + Field::new( + "inner", + DataType::Struct(Fields::from(vec![ + Field::new("inner_int", DataType::Int32, true), + Field::new( + "inner_list", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ])), + true, + ), + Field::new("outer_binary", DataType::Binary, true), + ])); + let field = Field::new("row", data_type, false); + check_round_trip_encoding_random(field).await; + } + #[test_log::test(tokio::test)] async fn test_ragged_scheduling() { // This test covers scheduling when batches straddle page boundaries diff --git a/rust/lance-encoding/src/encodings/physical/basic.rs b/rust/lance-encoding/src/encodings/physical/basic.rs index 88befa5520..afda40c201 100644 --- a/rust/lance-encoding/src/encodings/physical/basic.rs +++ b/rust/lance-encoding/src/encodings/physical/basic.rs @@ -127,18 +127,14 @@ impl PhysicalPageScheduler for BasicPageScheduler { ) -> BoxFuture<'static, Result>> { let validity_future = match &self.mode { SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None, - SchedulerNullStatus::Some(schedulers) => { - trace!("Scheduling ranges {:?} from validity", ranges); - Some( - schedulers - .validity - .schedule_ranges(ranges, scheduler, top_level_row), - ) - } + SchedulerNullStatus::Some(schedulers) => Some(schedulers.validity.schedule_ranges( + ranges, + scheduler, + top_level_row, + )), }; let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() { - trace!("Scheduling range {:?} from values", ranges); Some( values_scheduler .schedule_ranges(ranges, scheduler, top_level_row) diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index d270f9d9aa..d021b8fde9 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -15,7 +15,7 @@ use lance_core::Result; use lance_datagen::{array, gen, RowCount, Seed}; use crate::{ - decoder::{BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, LogicalPageDecoder, PageInfo}, + decoder::{BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, DecoderMessage, PageInfo}, encoder::{BatchEncoder, EncodedPage, FieldEncoder}, EncodingsIo, }; @@ -65,7 +65,7 @@ async fn test_decode( expected: Option>, schedule_fn: impl FnOnce( DecodeBatchScheduler, - UnboundedSender>, + UnboundedSender, ) -> BoxFuture<'static, Result<()>>, ) { let decode_scheduler = DecodeBatchScheduler::new(schema, column_infos, &Vec::new()); @@ -243,7 +243,7 @@ async fn check_round_trip_encoding_inner( |mut decode_scheduler, tx| { async move { decode_scheduler - .schedule_range(0..num_rows, tx, &scheduler_copy) + .schedule_range(0..num_rows, tx, scheduler_copy) .await } .boxed() @@ -266,7 +266,7 @@ async fn check_round_trip_encoding_inner( &column_infos, expected, |mut decode_scheduler, tx| { - async move { decode_scheduler.schedule_range(range, tx, &scheduler).await }.boxed() + async move { decode_scheduler.schedule_range(range, tx, scheduler).await }.boxed() }, ) .await; @@ -299,7 +299,7 @@ async fn check_round_trip_encoding_inner( |mut decode_scheduler, tx| { async move { decode_scheduler - .schedule_take(&indices, tx, &scheduler) + .schedule_take(&indices, tx, scheduler) .await } .boxed() diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 0e103b7639..3ed755f563 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -602,7 +602,7 @@ impl FileReader { let scheduler = self.scheduler.clone() as Arc; tokio::task::spawn( - async move { decode_scheduler.schedule_range(range, tx, &scheduler).await }, + async move { decode_scheduler.schedule_range(range, tx, scheduler).await }, ); Ok(BatchDecodeStream::new(rx, batch_size, num_rows_to_read).into_stream()) @@ -636,7 +636,7 @@ impl FileReader { let scheduler = self.scheduler.clone() as Arc; tokio::task::spawn(async move { decode_scheduler - .schedule_take(&indices, tx, &scheduler) + .schedule_take(&indices, tx, scheduler) .await });