Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions rust/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a> BinaryEncoder<'a> {
let end = offsets[offsets.len() - 1].as_usize();
let b = unsafe {
std::slice::from_raw_parts(
arr.to_data().buffers()[1].as_ptr().offset(start as isize),
arr.to_data().buffers()[1].as_ptr().add(start),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cargo clippy

end - start,
)
};
Expand All @@ -86,7 +86,7 @@ impl<'a> BinaryEncoder<'a> {
.skip(1)
.map(|b| b.as_usize() - start_offset + last_offset)
.for_each(|o| pos_builder.append_value(o as i64));
last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize;
last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cargo clippy

}

let positions_offset = self.writer.tell();
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
if self.nullable {
let mut null_count = 0;
let mut null_buf = MutableBuffer::new_null(self.length);
positions
position_slice
.values()
.windows(2)
.enumerate()
Expand Down Expand Up @@ -294,6 +294,7 @@ impl<'a, T: ByteArrayType> Decoder for BinaryDecoder<'a, T> {
let end = indices.value(indices.len() - 1);

// TODO: make min batch size configurable.
// TODO: make reading positions in chunks too.
const MIN_IO_SIZE: i64 = 64 * 1024; // 64KB
let positions = self
.get_positions(start as usize..(end + 1) as usize)
Expand Down Expand Up @@ -393,8 +394,8 @@ mod tests {
use arrow_select::concat::concat;

use arrow_array::{
new_empty_array, types::GenericStringType, GenericStringArray, LargeStringArray,
OffsetSizeTrait, StringArray,
cast::AsArray, new_empty_array, types::GenericStringType, BinaryArray, GenericStringArray,
LargeStringArray, OffsetSizeTrait, StringArray,
};
use object_store::path::Path;

Expand Down Expand Up @@ -585,4 +586,33 @@ mod tests {
assert_eq!(pos, (i * (8 * 11) /* offset array */ + (i + 1) * (10 * 10)));
}
}

#[tokio::test]
async fn test_write_binary_with_nulls() {
let data = BinaryArray::from_iter((0..60000).map(|v| {
if v % 4 != 0 {
Some::<&[u8]>(b"abcdefgh")
} else {
None
}
}));
let store = ObjectStore::memory();
let path = Path::from("/slices");

let mut object_writer = ObjectWriter::new(&store, &path).await.unwrap();
// Write some garbage to reset "tell()".
object_writer.write_all(b"1234").await.unwrap();
let mut encoder = BinaryEncoder::new(&mut object_writer);

// let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
let pos = encoder.encode(&[&data]).await.unwrap();
object_writer.shutdown().await.unwrap();

let reader = store.open(&path).await.unwrap();
let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
let actual = decoder.take(&idx).await.unwrap();
let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
}
}