Skip to content

Commit

Permalink
fix: rework decoding to fix bugs in nested struct decoding (#2337)
Browse files Browse the repository at this point in the history
This PR modifies both scheduling and the decode stream. These
modifications should mainly affect the logical encodings (list /
struct). The changes to the other encodings are fairly minimal.

In the scheduler we now track where we are in the field tree. This is
both for debugging purposes (so we can log in trace messages the current
path) and because we now need to send the path as part of the message we
send to the decode stream.

The decoder changes are more significant. Previously, we combined
waiting for I/O and waiting for additional encoders into the same phase.
This logic was more complex, but more importantly, it also assumed that
the decoder could recreate the order in which the scheduler scheduled
pages. For example, if we scan through the columns and encounter one
that needs more data, then we grab exactly one page, and continue the
column scan, grabbing the next page when we come by in another pass.
This works in many cases but doesn't work in others. For example, the
scheduler might schedule the many pages in a row for the same column if
that column is wide or the page size is small. The decoder would get out
of sync in these cases.

Now, the logic is simpler, and more correct. The decoder first waits for
enough scheduling to be done that a batch can be delivered. During this
pass we drain encoders from the scheduler and insert them into the
current decoders not by "current position in the decode" but by the path
that is now included with the decoder.

Once enough scheduling has been done we then wait for I/O.

Once I/O is done we then drain the decoders in the same fashion we did
before.
  • Loading branch information
westonpace committed May 17, 2024
1 parent 711ac03 commit 648adf8
Show file tree
Hide file tree
Showing 9 changed files with 577 additions and 266 deletions.
310 changes: 275 additions & 35 deletions rust/lance-encoding/src/decoder.rs

Large diffs are not rendered by default.

41 changes: 20 additions & 21 deletions rust/lance-encoding/src/encodings/logical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ use arrow_array::{

use arrow_buffer::ScalarBuffer;
use arrow_schema::{DataType, Field};
use futures::{future::BoxFuture, FutureExt};
use futures::future::BoxFuture;
use lance_core::Result;
use log::trace;

use crate::{
decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask},
decoder::{
DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext,
},
encoder::{EncodeTask, FieldEncoder},
};

Expand Down Expand Up @@ -45,21 +47,20 @@ impl LogicalPageScheduler for BinaryPageScheduler {
fn schedule_ranges(
&self,
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} ranges", ranges.len());
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut temp_context = context.temporary();
self.varbin_scheduler
.schedule_ranges(ranges, scheduler, &tx, top_level_row)?;
.schedule_ranges(ranges, &mut temp_context, top_level_row)?;

while let Some(decoder) = rx.recv().now_or_never() {
let wrapped = BinaryPageDecoder {
inner: decoder.unwrap(),
for decoder in temp_context.into_decoders() {
let decoder = Box::new(BinaryPageDecoder {
inner: decoder,
data_type: self.data_type.clone(),
};
sink.send(Box::new(wrapped)).unwrap();
});
context.emit(decoder);
}

Ok(())
Expand All @@ -68,8 +69,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
fn schedule_take(
&self,
indices: &[u32],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} indices", indices.len());
Expand All @@ -78,8 +78,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
.iter()
.map(|&idx| idx..(idx + 1))
.collect::<Vec<_>>(),
scheduler,
sink,
context,
top_level_row,
)
}
Expand All @@ -96,12 +95,8 @@ pub struct BinaryPageDecoder {
}

impl LogicalPageDecoder for BinaryPageDecoder {
fn wait<'a>(
&'a mut self,
num_rows: u32,
source: &'a mut tokio::sync::mpsc::UnboundedReceiver<Box<dyn LogicalPageDecoder>>,
) -> BoxFuture<'a, Result<()>> {
self.inner.wait(num_rows, source)
fn wait(&mut self, num_rows: u32) -> BoxFuture<Result<()>> {
self.inner.wait(num_rows)
}

fn drain(&mut self, num_rows: u32) -> Result<NextDecodeTask> {
Expand All @@ -123,6 +118,10 @@ impl LogicalPageDecoder for BinaryPageDecoder {
fn avail(&self) -> u32 {
self.inner.avail()
}

fn data_type(&self) -> &DataType {
&self.data_type
}
}

pub struct BinaryArrayDecoder {
Expand Down
51 changes: 26 additions & 25 deletions rust/lance-encoding/src/encodings/logical/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
use std::{ops::Range, sync::Arc};

use arrow_array::{ArrayRef, FixedSizeListArray};
use arrow_schema::Field;
use arrow_schema::{DataType, Field};
use futures::future::BoxFuture;
use log::trace;
use tokio::sync::mpsc;

use crate::{
decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask},
EncodingsIo,
use crate::decoder::{
DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext,
};
use lance_core::Result;

Expand Down Expand Up @@ -51,8 +49,7 @@ impl LogicalPageScheduler for FslPageScheduler {
fn schedule_ranges(
&self,
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
let expanded_ranges = ranges
Expand All @@ -63,32 +60,35 @@ impl LogicalPageScheduler for FslPageScheduler {
"Scheduling expanded ranges {:?} from items scheduler",
expanded_ranges
);
let (tx, mut rx) = mpsc::unbounded_channel();
let mut temp_context = context.temporary();
self.items_scheduler
.schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?;
let inner_page_decoder = rx.blocking_recv().unwrap();
sink.send(Box::new(FslPageDecoder {
inner: inner_page_decoder,
dimension: self.dimension,
}))
.unwrap();
.schedule_ranges(&expanded_ranges, &mut temp_context, top_level_row)?;
for decoder in temp_context.into_decoders() {
let data_type = DataType::FixedSizeList(
Arc::new(Field::new("item", decoder.data_type().clone(), true)),
self.dimension as i32,
);
context.emit(Box::new(FslPageDecoder {
inner: decoder,
dimension: self.dimension,
data_type,
}));
}
Ok(())
}

fn schedule_take(
&self,
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
self.schedule_ranges(
&indices
.iter()
.map(|&idx| idx..(idx + 1))
.collect::<Vec<_>>(),
scheduler,
sink,
context,
top_level_row,
)
}
Expand All @@ -102,15 +102,12 @@ impl LogicalPageScheduler for FslPageScheduler {
struct FslPageDecoder {
inner: Box<dyn LogicalPageDecoder>,
dimension: u32,
data_type: DataType,
}

impl LogicalPageDecoder for FslPageDecoder {
fn wait<'a>(
&'a mut self,
num_rows: u32,
source: &'a mut mpsc::UnboundedReceiver<Box<dyn LogicalPageDecoder>>,
) -> BoxFuture<'a, Result<()>> {
self.inner.wait(num_rows * self.dimension, source)
fn wait(&mut self, num_rows: u32) -> BoxFuture<Result<()>> {
self.inner.wait(num_rows * self.dimension)
}

fn unawaited(&self) -> u32 {
Expand All @@ -136,6 +133,10 @@ impl LogicalPageDecoder for FslPageDecoder {
fn avail(&self) -> u32 {
self.inner.avail() / self.dimension
}

fn data_type(&self) -> &DataType {
&self.data_type
}
}

struct FslDecodeTask {
Expand Down
Loading

0 comments on commit 648adf8

Please sign in to comment.