Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 313 additions & 9 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,8 @@ pub struct FullZipScheduler {
rows_in_page: u64,
bits_per_offset: u8,
details: Arc<FullZipDecodeDetails>,
/// Cached state containing the decoded repetition index
cached_state: Option<Arc<FullZipCacheableState>>,
}

impl FullZipScheduler {
Expand Down Expand Up @@ -1742,6 +1744,7 @@ impl FullZipScheduler {
priority,
rows_in_page,
bits_per_offset,
cached_state: None,
})
}

Expand Down Expand Up @@ -1790,11 +1793,19 @@ impl FullZipScheduler {
match &details.value_decompressor {
PerValueDecompressor::Fixed(decompressor) => {
let bits_per_value = decompressor.bits_per_value();
assert!(bits_per_value > 0);
if bits_per_value == 0 {
return Err(lance_core::Error::Internal {
message: "Invalid encoding: bits_per_value must be greater than 0".into(),
location: location!(),
});
}
if bits_per_value % 8 != 0 {
// Unlikely we will ever want this since full-zip values are so large the few bits we shave off don't
// make much difference.
unimplemented!("Bit-packed full-zip");
return Err(lance_core::Error::NotSupported {
source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
location: location!(),
});
}
let bytes_per_value = bits_per_value / 8;
let total_bytes_per_value =
Expand Down Expand Up @@ -1822,13 +1833,114 @@ impl FullZipScheduler {
}
}

/// Schedules ranges directly using cached repetition index data
fn schedule_ranges_with_cached_rep(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
cached_rep_data: &LanceBuffer,
bytes_per_value: u64,
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
use crate::utils::bytepack::ByteUnpacker;

// Extract byte ranges directly from the cached repetition index
let byte_ranges: Vec<Range<u64>> = ranges
.iter()
.map(|r| {
// Get start and end values from the cached buffer
let start_offset = (r.start * bytes_per_value) as usize;
let end_offset = (r.end * bytes_per_value) as usize;

// Use ByteUnpacker to read single values
let start_slice =
&cached_rep_data[start_offset..start_offset + bytes_per_value as usize];
let start_val =
ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
.next()
.unwrap();

let end_slice = &cached_rep_data[end_offset..end_offset + bytes_per_value as usize];
let end_val =
ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
.next()
.unwrap();

(self.data_buf_position + start_val)..(self.data_buf_position + end_val)
})
.collect();

let data = io.submit_request(byte_ranges, self.priority);
let row_ranges = ranges.to_vec();
let details = self.details.clone();
let bits_per_offset = self.bits_per_offset;

Ok(async move {
let data = data.await?;
let data = data
.into_iter()
.map(|d| LanceBuffer::from_bytes(d, 1))
.collect();
let num_rows = row_ranges.into_iter().map(|r| r.end - r.start).sum();

match &details.value_decompressor {
PerValueDecompressor::Fixed(decompressor) => {
let bits_per_value = decompressor.bits_per_value();
if bits_per_value == 0 {
return Err(lance_core::Error::Internal {
message: "Invalid encoding: bits_per_value must be greater than 0".into(),
location: location!(),
});
}
if bits_per_value % 8 != 0 {
return Err(lance_core::Error::NotSupported {
source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
location: location!(),
});
}
let bytes_per_value = bits_per_value / 8;
let total_bytes_per_value =
bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
Ok(Box::new(FixedFullZipDecoder {
details,
data,
num_rows,
offset_in_current: 0,
bytes_per_value: bytes_per_value as usize,
total_bytes_per_value,
}) as Box<dyn StructuralPageDecoder>)
}
PerValueDecompressor::Variable(_decompressor) => {
Ok(Box::new(VariableFullZipDecoder::new(
details,
data,
num_rows,
bits_per_offset,
bits_per_offset,
)) as Box<dyn StructuralPageDecoder>)
}
}
}
.boxed())
Comment on lines +1877 to +1923

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anyway we can consolidate this logic across the two implementations? That way we don't have duplication?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try in the following PRs.

}

/// Schedules ranges in the presence of a repetition index
fn schedule_ranges_rep(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
rep_index: &FullZipRepIndexDetails,
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
// Check if we have cached repetition index data
if let Some(cached_state) = &self.cached_state {
return self.schedule_ranges_with_cached_rep(
ranges,
io,
&cached_state.rep_index_buffer,
rep_index.bytes_per_value,
);
}

// Fall back to loading from disk
let rep_index_ranges = ranges
.iter()
.flat_map(|r| {
Expand Down Expand Up @@ -1908,16 +2020,69 @@ impl FullZipScheduler {
}
}

/// Cacheable state for FullZip encoding, storing the decoded repetition index
#[derive(Debug)]
struct FullZipCacheableState {
/// The raw repetition index buffer for future decoding
rep_index_buffer: LanceBuffer,
}

impl DeepSizeOf for FullZipCacheableState {
fn deep_size_of_children(&self, _context: &mut Context) -> usize {
self.rep_index_buffer.len()
}
}

impl CachedPageData for FullZipCacheableState {
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
self
}
}

impl StructuralPageScheduler for FullZipScheduler {
// TODO: Add opt-in caching of repetition index
/// Initializes the scheduler. If there's a repetition index, loads and caches it.
/// Otherwise returns NoCachedPageData.
fn initialize<'a>(
&'a mut self,
_io: &Arc<dyn EncodingsIo>,
io: &Arc<dyn EncodingsIo>,
) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
// Check if we have a repetition index
if let Some(rep_index) = &self.rep_index {
// Calculate the total size of the repetition index
let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);

// Load the repetition index buffer
let io_clone = io.clone();
let future = async move {
let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);

// Create and return the cacheable state
Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
};

future.boxed()
} else {
// No repetition index, skip caching
std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
}
}

fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
/// Loads previously cached repetition index data from the cache system.
/// This method is called when a scheduler instance needs to use cached data
/// that was initialized by another instance or in a previous operation.
fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
// Try to downcast to our specific cache type
if let Ok(cached_state) = cache
.clone()
.as_arc_any()
.downcast::<FullZipCacheableState>()
{
// Store the cached state for use in schedule_ranges
self.cached_state = Some(cached_state);
}
}

fn schedule_ranges(
&self,
Expand Down Expand Up @@ -4074,14 +4239,16 @@ impl FieldEncoder for PrimitiveStructuralEncoder {
mod tests {
use std::{collections::VecDeque, sync::Arc};

use arrow_array::{ArrayRef, Int8Array, StringArray};

use crate::encodings::logical::primitive::{
ChunkDrainInstructions, PrimitiveStructuralEncoder,
};
use arrow_array::{ArrayRef, Int8Array, StringArray};

use super::{
ChunkInstructions, DataBlock, DecodeMiniBlockTask, PreambleAction, RepetitionIndex,
ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
FullZipScheduler, PerValueDecompressor, PreambleAction, RepetitionIndex,
StructuralPageScheduler,
};

#[test]
Expand Down Expand Up @@ -4690,4 +4857,141 @@ mod tests {
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 0);
}

#[tokio::test]
async fn test_fullzip_repetition_index_caching() {
use crate::testing::SimulatedScheduler;
use lance_core::cache::LanceCache;

// Simplified FixedPerValueDecompressor for testing
#[derive(Debug)]
struct TestFixedDecompressor;

impl FixedPerValueDecompressor for TestFixedDecompressor {
fn decompress(
&self,
_data: FixedWidthDataBlock,
_num_rows: u64,
) -> crate::Result<DataBlock> {
unimplemented!("Test decompressor")
}

fn bits_per_value(&self) -> u64 {
32
}
}

// Create test repetition index data
let rows_in_page = 100u64;
let bytes_per_value = 4u64;
let _rep_index_size = (rows_in_page + 1) * bytes_per_value;

// Create mock repetition index data
let mut rep_index_data = Vec::new();
for i in 0..=rows_in_page {
let offset = (i * 100) as u32; // Each row starts at i * 100 bytes
rep_index_data.extend_from_slice(&offset.to_le_bytes());
}

// Simulate storage with the repetition index at position 1000
let mut full_data = vec![0u8; 1000];
full_data.extend_from_slice(&rep_index_data);
full_data.extend_from_slice(&vec![0u8; 10000]); // Add some data after

let data = bytes::Bytes::from(full_data);
let io = Arc::new(SimulatedScheduler::new(data));
let _cache = Arc::new(LanceCache::with_capacity(1024 * 1024));

// Create FullZipScheduler with repetition index
let mut scheduler = FullZipScheduler {
data_buf_position: 0,
rep_index: Some(FullZipRepIndexDetails {
buf_position: 1000,
bytes_per_value,
}),
priority: 0,
rows_in_page,
bits_per_offset: 32,
details: Arc::new(FullZipDecodeDetails {
value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
max_rep: 0,
max_visible_def: 0,
}),
cached_state: None,
};

// First initialization should load and cache the repetition index
let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();

// Verify that we got a FullZipCacheableState (not NoCachedPageData)
let is_cached = cached_data1
.clone()
.as_arc_any()
.downcast::<FullZipCacheableState>()
.is_ok();
assert!(
is_cached,
"Expected FullZipCacheableState, got NoCachedPageData"
);

// Load the cached data into the scheduler
scheduler.load(&cached_data1);

// Verify that cached_state is now populated
assert!(
scheduler.cached_state.is_some(),
"cached_state should be populated after load"
);

// Verify the cached data contains the repetition index
let cached_state = scheduler.cached_state.as_ref().unwrap();

// Test that schedule_ranges_rep uses the cached data
let ranges = vec![0..10, 20..30];
let result = scheduler.schedule_ranges_rep(
&ranges,
&io_dyn,
&FullZipRepIndexDetails {
buf_position: 1000,
bytes_per_value,
},
);

// The result should be OK (not an error)
assert!(
result.is_ok(),
"schedule_ranges_rep should succeed with cached data"
);

// Second scheduler instance should be able to use the cached data
let mut scheduler2 = FullZipScheduler {
data_buf_position: 0,
rep_index: Some(FullZipRepIndexDetails {
buf_position: 1000,
bytes_per_value,
}),
priority: 0,
rows_in_page,
bits_per_offset: 32,
details: scheduler.details.clone(),
cached_state: None,
};

// Load cached data from the first scheduler
scheduler2.load(&cached_data1);
assert!(
scheduler2.cached_state.is_some(),
"Second scheduler should have cached_state after load"
);

// Verify that both schedulers have the same cached data
let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
assert!(
Arc::ptr_eq(cached_state, cached_state2),
"Both schedulers should share the same cached data"
);
}
}