Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rework decoding to fix bugs in nested struct decoding #2337

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 275 additions & 35 deletions rust/lance-encoding/src/decoder.rs

Large diffs are not rendered by default.

41 changes: 20 additions & 21 deletions rust/lance-encoding/src/encodings/logical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ use arrow_array::{

use arrow_buffer::ScalarBuffer;
use arrow_schema::{DataType, Field};
use futures::{future::BoxFuture, FutureExt};
use futures::future::BoxFuture;
use lance_core::Result;
use log::trace;

use crate::{
decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask},
decoder::{
DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext,
},
encoder::{EncodeTask, FieldEncoder},
};

Expand Down Expand Up @@ -45,21 +47,20 @@ impl LogicalPageScheduler for BinaryPageScheduler {
fn schedule_ranges(
&self,
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} ranges", ranges.len());
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut temp_context = context.temporary();
self.varbin_scheduler
.schedule_ranges(ranges, scheduler, &tx, top_level_row)?;
.schedule_ranges(ranges, &mut temp_context, top_level_row)?;

while let Some(decoder) = rx.recv().now_or_never() {
let wrapped = BinaryPageDecoder {
inner: decoder.unwrap(),
for decoder in temp_context.into_decoders() {
let decoder = Box::new(BinaryPageDecoder {
inner: decoder,
data_type: self.data_type.clone(),
};
sink.send(Box::new(wrapped)).unwrap();
});
context.emit(decoder);
}

Ok(())
Expand All @@ -68,8 +69,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
fn schedule_take(
&self,
indices: &[u32],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} indices", indices.len());
Expand All @@ -78,8 +78,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
.iter()
.map(|&idx| idx..(idx + 1))
.collect::<Vec<_>>(),
scheduler,
sink,
context,
top_level_row,
)
}
Expand All @@ -96,12 +95,8 @@ pub struct BinaryPageDecoder {
}

impl LogicalPageDecoder for BinaryPageDecoder {
fn wait<'a>(
&'a mut self,
num_rows: u32,
source: &'a mut tokio::sync::mpsc::UnboundedReceiver<Box<dyn LogicalPageDecoder>>,
) -> BoxFuture<'a, Result<()>> {
self.inner.wait(num_rows, source)
fn wait(&mut self, num_rows: u32) -> BoxFuture<Result<()>> {
self.inner.wait(num_rows)
}

fn drain(&mut self, num_rows: u32) -> Result<NextDecodeTask> {
Expand All @@ -123,6 +118,10 @@ impl LogicalPageDecoder for BinaryPageDecoder {
fn avail(&self) -> u32 {
self.inner.avail()
}

fn data_type(&self) -> &DataType {
&self.data_type
}
}

pub struct BinaryArrayDecoder {
Expand Down
51 changes: 26 additions & 25 deletions rust/lance-encoding/src/encodings/logical/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
use std::{ops::Range, sync::Arc};

use arrow_array::{ArrayRef, FixedSizeListArray};
use arrow_schema::Field;
use arrow_schema::{DataType, Field};
use futures::future::BoxFuture;
use log::trace;
use tokio::sync::mpsc;

use crate::{
decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask},
EncodingsIo,
use crate::decoder::{
DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext,
};
use lance_core::Result;

Expand Down Expand Up @@ -51,8 +49,7 @@ impl LogicalPageScheduler for FslPageScheduler {
fn schedule_ranges(
&self,
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
let expanded_ranges = ranges
Expand All @@ -63,32 +60,35 @@ impl LogicalPageScheduler for FslPageScheduler {
"Scheduling expanded ranges {:?} from items scheduler",
expanded_ranges
);
let (tx, mut rx) = mpsc::unbounded_channel();
let mut temp_context = context.temporary();
self.items_scheduler
.schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?;
let inner_page_decoder = rx.blocking_recv().unwrap();
sink.send(Box::new(FslPageDecoder {
inner: inner_page_decoder,
dimension: self.dimension,
}))
.unwrap();
.schedule_ranges(&expanded_ranges, &mut temp_context, top_level_row)?;
for decoder in temp_context.into_decoders() {
let data_type = DataType::FixedSizeList(
Arc::new(Field::new("item", decoder.data_type().clone(), true)),
self.dimension as i32,
);
context.emit(Box::new(FslPageDecoder {
inner: decoder,
dimension: self.dimension,
data_type,
}));
}
Ok(())
}

fn schedule_take(
&self,
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
self.schedule_ranges(
&indices
.iter()
.map(|&idx| idx..(idx + 1))
.collect::<Vec<_>>(),
scheduler,
sink,
context,
top_level_row,
)
}
Expand All @@ -102,15 +102,12 @@ impl LogicalPageScheduler for FslPageScheduler {
struct FslPageDecoder {
inner: Box<dyn LogicalPageDecoder>,
dimension: u32,
data_type: DataType,
}

impl LogicalPageDecoder for FslPageDecoder {
fn wait<'a>(
&'a mut self,
num_rows: u32,
source: &'a mut mpsc::UnboundedReceiver<Box<dyn LogicalPageDecoder>>,
) -> BoxFuture<'a, Result<()>> {
self.inner.wait(num_rows * self.dimension, source)
fn wait(&mut self, num_rows: u32) -> BoxFuture<Result<()>> {
self.inner.wait(num_rows * self.dimension)
}

fn unawaited(&self) -> u32 {
Expand All @@ -136,6 +133,10 @@ impl LogicalPageDecoder for FslPageDecoder {
fn avail(&self) -> u32 {
self.inner.avail() / self.dimension
}

fn data_type(&self) -> &DataType {
&self.data_type
}
}

struct FslDecodeTask {
Expand Down
Loading
Loading