Skip to content

Commit

Permalink
Fairly extensive rewrite of the decoding stream. Decoding now divides…
Browse files Browse the repository at this point in the history
… waiting for scheduled decoders and waiting for IO into two different phases. See PR for more details.
  • Loading branch information
westonpace committed May 15, 2024
1 parent 31e37c6 commit f7ed387
Show file tree
Hide file tree
Showing 9 changed files with 570 additions and 266 deletions.
303 changes: 268 additions & 35 deletions rust/lance-encoding/src/decoder.rs

Large diffs are not rendered by default.

41 changes: 20 additions & 21 deletions rust/lance-encoding/src/encodings/logical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ use arrow_array::{

use arrow_buffer::ScalarBuffer;
use arrow_schema::{DataType, Field};
use futures::{future::BoxFuture, FutureExt};
use futures::future::BoxFuture;
use lance_core::Result;
use log::trace;

use crate::{
decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask},
decoder::{
DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext,
},
encoder::{EncodeTask, FieldEncoder},
};

Expand Down Expand Up @@ -45,21 +47,20 @@ impl LogicalPageScheduler for BinaryPageScheduler {
fn schedule_ranges(
&self,
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} ranges", ranges.len());
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut temp_context = context.temporary();
self.varbin_scheduler
.schedule_ranges(ranges, scheduler, &tx, top_level_row)?;
.schedule_ranges(ranges, &mut temp_context, top_level_row)?;

while let Some(decoder) = rx.recv().now_or_never() {
let wrapped = BinaryPageDecoder {
inner: decoder.unwrap(),
for decoder in temp_context.into_decoders() {
let decoder = Box::new(BinaryPageDecoder {
inner: decoder,
data_type: self.data_type.clone(),
};
sink.send(Box::new(wrapped)).unwrap();
});
context.emit(decoder);
}

Ok(())
Expand All @@ -68,8 +69,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
fn schedule_take(
&self,
indices: &[u32],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} indices", indices.len());
Expand All @@ -78,8 +78,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
.iter()
.map(|&idx| idx..(idx + 1))
.collect::<Vec<_>>(),
scheduler,
sink,
context,
top_level_row,
)
}
Expand All @@ -96,12 +95,8 @@ pub struct BinaryPageDecoder {
}

impl LogicalPageDecoder for BinaryPageDecoder {
fn wait<'a>(
&'a mut self,
num_rows: u32,
source: &'a mut tokio::sync::mpsc::UnboundedReceiver<Box<dyn LogicalPageDecoder>>,
) -> BoxFuture<'a, Result<()>> {
self.inner.wait(num_rows, source)
fn wait<'a>(&'a mut self, num_rows: u32) -> BoxFuture<'a, Result<()>> {
self.inner.wait(num_rows)
}

fn drain(&mut self, num_rows: u32) -> Result<NextDecodeTask> {
Expand All @@ -123,6 +118,10 @@ impl LogicalPageDecoder for BinaryPageDecoder {
fn avail(&self) -> u32 {
self.inner.avail()
}

fn data_type(&self) -> &DataType {
&self.data_type
}
}

pub struct BinaryArrayDecoder {
Expand Down
51 changes: 26 additions & 25 deletions rust/lance-encoding/src/encodings/logical/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
use std::{ops::Range, sync::Arc};

use arrow_array::{ArrayRef, FixedSizeListArray};
use arrow_schema::Field;
use arrow_schema::{DataType, Field};
use futures::future::BoxFuture;
use log::trace;
use tokio::sync::mpsc;

use crate::{
decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask},
EncodingsIo,
use crate::decoder::{
DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext,
};
use lance_core::Result;

Expand Down Expand Up @@ -51,8 +49,7 @@ impl LogicalPageScheduler for FslPageScheduler {
fn schedule_ranges(
&self,
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
let expanded_ranges = ranges
Expand All @@ -63,32 +60,35 @@ impl LogicalPageScheduler for FslPageScheduler {
"Scheduling expanded ranges {:?} from items scheduler",
expanded_ranges
);
let (tx, mut rx) = mpsc::unbounded_channel();
let mut temp_context = context.temporary();
self.items_scheduler
.schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?;
let inner_page_decoder = rx.blocking_recv().unwrap();
sink.send(Box::new(FslPageDecoder {
inner: inner_page_decoder,
dimension: self.dimension,
}))
.unwrap();
.schedule_ranges(&expanded_ranges, &mut temp_context, top_level_row)?;
for decoder in temp_context.into_decoders() {
let data_type = DataType::FixedSizeList(
Arc::new(Field::new("item", decoder.data_type().clone(), true)),
self.dimension as i32,
);
context.emit(Box::new(FslPageDecoder {
inner: decoder,
dimension: self.dimension,
data_type,
}));
}
Ok(())
}

fn schedule_take(
&self,
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
context: &mut SchedulerContext,
top_level_row: u64,
) -> Result<()> {
self.schedule_ranges(
&indices
.iter()
.map(|&idx| idx..(idx + 1))
.collect::<Vec<_>>(),
scheduler,
sink,
context,
top_level_row,
)
}
Expand All @@ -102,15 +102,12 @@ impl LogicalPageScheduler for FslPageScheduler {
struct FslPageDecoder {
inner: Box<dyn LogicalPageDecoder>,
dimension: u32,
data_type: DataType,
}

impl LogicalPageDecoder for FslPageDecoder {
fn wait<'a>(
&'a mut self,
num_rows: u32,
source: &'a mut mpsc::UnboundedReceiver<Box<dyn LogicalPageDecoder>>,
) -> BoxFuture<'a, Result<()>> {
self.inner.wait(num_rows * self.dimension, source)
fn wait<'a>(&'a mut self, num_rows: u32) -> BoxFuture<'a, Result<()>> {
self.inner.wait(num_rows * self.dimension)
}

fn unawaited(&self) -> u32 {
Expand All @@ -136,6 +133,10 @@ impl LogicalPageDecoder for FslPageDecoder {
fn avail(&self) -> u32 {
self.inner.avail() / self.dimension
}

fn data_type(&self) -> &DataType {
&self.data_type
}
}

struct FslDecodeTask {
Expand Down
Loading

0 comments on commit f7ed387

Please sign in to comment.