Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions rust/lance-encoding/src/repdef.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,10 @@ impl RepDefBuilder {
/// Registers a nullable validity bitmap
pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
self.check_validity_len(validity.len());
if validity.null_count() == 0 {
self.add_no_null(validity.len());
return;
}
self.repdefs.push(RawRepDef::Validity(ValidityDesc {
num_values: validity.len(),
validity: Some(validity.into_inner()),
Expand Down Expand Up @@ -2833,6 +2837,26 @@ mod tests {
assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
}

#[test]
fn test_all_valid_validity_bitmap_serializes_as_no_null() {
let mut from_bitmap = RepDefBuilder::default();
from_bitmap.add_validity_bitmap(validity(&[true, true, true, true]));

let mut from_no_null = RepDefBuilder::default();
from_no_null.add_no_null(4);

let from_bitmap = RepDefBuilder::serialize(vec![from_bitmap]);
let from_no_null = RepDefBuilder::serialize(vec![from_no_null]);

assert!(from_bitmap.repetition_levels.is_none());
assert!(from_bitmap.definition_levels.is_none());
assert_eq!(from_bitmap.def_meaning, from_no_null.def_meaning);
assert_eq!(
from_bitmap.max_visible_level,
from_no_null.max_visible_level
);
}

#[test]
fn test_slicer() {
let mut builder = RepDefBuilder::default();
Expand Down
129 changes: 129 additions & 0 deletions rust/lance/src/dataset/tests/dataset_merge_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2072,3 +2072,132 @@ async fn test_fts_index_incremental_reindex_after_in_place_update() {
results.num_rows()
);
}

/// Regression test for https://github.com/lance-format/lance/issues/6338
/// Sub-schema merge_insert with binary columns on v2.2 causes data corruption
/// when the binary values are >= 256 bytes.
#[tokio::test]
async fn test_sub_schema_merge_insert_binary_v2_2() {
use crate::dataset::write::merge_insert::WhenMatched;
use arrow_array::BinaryArray;

let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int64, false),
ArrowField::new("a", DataType::Binary, true),
ArrowField::new("b", DataType::Utf8, true),
]));

let test_uri = TempStrDir::default();

// Initial write: 2 rows with null binary values
let initial_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow_array::Int64Array::from(vec![0, 1])),
Arc::new(BinaryArray::from(vec![None::<&[u8]>, None])),
Arc::new(StringArray::from(vec![None::<&str>, None])),
],
)
.unwrap();

let write_params = WriteParams {
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
let batches = RecordBatchIterator::new(vec![initial_batch].into_iter().map(Ok), schema.clone());
Dataset::write(batches, &test_uri, Some(write_params))
.await
.unwrap();

let sub_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int64, false),
ArrowField::new("a", DataType::Binary, true),
]));

// Sub-schema merge_insert for row 0 (binary value >= 256 bytes)
let data_a: Vec<u8> = (0..256).map(|i| (i % 251) as u8).collect();
{
let update_batch = RecordBatch::try_new(
sub_schema.clone(),
vec![
Arc::new(arrow_array::Int64Array::from(vec![0])),
Arc::new(BinaryArray::from(vec![Some(data_a.as_slice())])),
],
)
.unwrap();
let dataset = Dataset::open(&test_uri).await.unwrap();
let source = Box::new(RecordBatchIterator::new(
vec![update_batch].into_iter().map(Ok),
sub_schema.clone(),
));
MergeInsertBuilder::try_new(dataset.into(), vec!["id".into()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.try_build()
.unwrap()
.execute_reader(source)
.await
.unwrap();
}

// Read back and verify first merge worked
let dataset = Dataset::open(&test_uri).await.unwrap();
let table = dataset
.scan()
.project(&["id", "a"])
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let table = concat_batches(&table[0].schema(), &table).unwrap();
assert_eq!(table.num_rows(), 2);

// Sub-schema merge_insert for row 1 (binary value >= 256 bytes)
let data_b: Vec<u8> = (0..256).map(|i| ((i + 100) % 251) as u8).collect();
{
let update_batch = RecordBatch::try_new(
sub_schema.clone(),
vec![
Arc::new(arrow_array::Int64Array::from(vec![1])),
Arc::new(BinaryArray::from(vec![Some(data_b.as_slice())])),
],
)
.unwrap();
let dataset = Dataset::open(&test_uri).await.unwrap();
let source = Box::new(RecordBatchIterator::new(
vec![update_batch].into_iter().map(Ok),
sub_schema.clone(),
));
MergeInsertBuilder::try_new(dataset.into(), vec!["id".into()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.try_build()
.unwrap()
.execute_reader(source)
.await
.unwrap();
}

// Read back and verify - this is where the bug manifests
let dataset = Dataset::open(&test_uri).await.unwrap();
let table = dataset
.scan()
.project(&["id", "a"])
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let table = concat_batches(&table[0].schema(), &table).unwrap();
assert_eq!(table.num_rows(), 2);

let a_col = table.column_by_name("a").unwrap();
let binary_arr = a_col.as_any().downcast_ref::<BinaryArray>().unwrap();
assert_eq!(binary_arr.value(0), data_a.as_slice());
assert_eq!(binary_arr.value(1), data_b.as_slice());
}
Loading