Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement compression and skipping for binview IPC #14789

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions crates/polars-arrow/src/io/ipc/read/array/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,34 @@ pub fn read_binview<T: ViewType + ?Sized, R: Read + Seek>(
BinaryViewArrayGeneric::<T>::try_new(data_type, views, Arc::from(variadic_buffers), validity)
.map(|arr| arr.boxed())
}

pub fn skip_binview(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
oos = "IPC: unable to fetch the field for utf8. The file or stream is corrupted."
)
})?;

let _ = buffers
.pop_front()
.ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?;

let _ = buffers
.pop_front()
.ok_or_else(|| polars_err!(oos = "IPC: missing views buffer."))?;

let n_variadic = variadic_buffer_counts.pop_front().ok_or_else(
|| polars_err!(ComputeError: "IPC: unable to fetch the variadic buffers\n\nThe file or stream is corrupted.")
)?;

for _ in 0..n_variadic {
let _ = buffers
.pop_front()
.ok_or_else(|| polars_err!(oos = "IPC: missing variadic buffer"))?;
}
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn skip_fixed_size_list(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(oos =
Expand All @@ -79,5 +80,10 @@ pub fn skip_fixed_size_list(

let (field, _) = FixedSizeListArray::get_child_and_size(data_type);

skip(field_nodes, field.data_type(), buffers)
skip(
field_nodes,
field.data_type(),
buffers,
variadic_buffer_counts,
)
}
3 changes: 2 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub fn skip_list<O: Offset>(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
Expand All @@ -101,5 +102,5 @@ pub fn skip_list<O: Offset>(

let data_type = ListArray::<O>::get_child_type(data_type);

skip(field_nodes, data_type, buffers)
skip(field_nodes, data_type, buffers, variadic_buffer_counts)
}
3 changes: 2 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub fn skip_map(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
Expand All @@ -97,5 +98,5 @@ pub fn skip_map(

let data_type = MapArray::get_field(data_type).data_type();

skip(field_nodes, data_type, buffers)
skip(field_nodes, data_type, buffers, variadic_buffer_counts)
}
12 changes: 9 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub fn skip_struct(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
Expand All @@ -84,7 +85,12 @@ pub fn skip_struct(

let fields = StructArray::get_fields(data_type);

fields
.iter()
.try_for_each(|field| skip(field_nodes, field.data_type(), buffers))
fields.iter().try_for_each(|field| {
skip(
field_nodes,
field.data_type(),
buffers,
variadic_buffer_counts,
)
})
}
12 changes: 9 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub fn skip_union(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
polars_err!(
Expand All @@ -117,7 +118,12 @@ pub fn skip_union(

let fields = UnionArray::get_fields(data_type);

fields
.iter()
.try_for_each(|field| skip(field_nodes, field.data_type(), buffers))
fields.iter().try_for_each(|field| {
skip(
field_nodes,
field.data_type(),
buffers,
variadic_buffer_counts,
)
})
}
7 changes: 6 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ pub fn read_record_batch<R: Read + Seek>(
scratch,
)?)),
ProjectionResult::NotSelected((field, _)) => {
skip(&mut field_nodes, &field.data_type, &mut buffers)?;
skip(
&mut field_nodes,
&field.data_type,
&mut buffers,
&mut variadic_buffer_counts,
)?;
Ok(None)
},
})
Expand Down
17 changes: 10 additions & 7 deletions crates/polars-arrow/src/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub fn skip(
field_nodes: &mut VecDeque<Node>,
data_type: &ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
variadic_buffer_counts: &mut VecDeque<usize>,
) -> PolarsResult<()> {
use PhysicalType::*;
match data_type.to_physical_type() {
Expand All @@ -272,13 +273,15 @@ pub fn skip(
LargeBinary | Binary => skip_binary(field_nodes, buffers),
LargeUtf8 | Utf8 => skip_utf8(field_nodes, buffers),
FixedSizeBinary => skip_fixed_size_binary(field_nodes, buffers),
List => skip_list::<i32>(field_nodes, data_type, buffers),
LargeList => skip_list::<i64>(field_nodes, data_type, buffers),
FixedSizeList => skip_fixed_size_list(field_nodes, data_type, buffers),
Struct => skip_struct(field_nodes, data_type, buffers),
List => skip_list::<i32>(field_nodes, data_type, buffers, variadic_buffer_counts),
LargeList => skip_list::<i64>(field_nodes, data_type, buffers, variadic_buffer_counts),
FixedSizeList => {
skip_fixed_size_list(field_nodes, data_type, buffers, variadic_buffer_counts)
},
Struct => skip_struct(field_nodes, data_type, buffers, variadic_buffer_counts),
Dictionary(_) => skip_dictionary(field_nodes, buffers),
Union => skip_union(field_nodes, data_type, buffers),
Map => skip_map(field_nodes, data_type, buffers),
BinaryView | Utf8View => todo!(),
Union => skip_union(field_nodes, data_type, buffers, variadic_buffer_counts),
Map => skip_map(field_nodes, data_type, buffers, variadic_buffer_counts),
BinaryView | Utf8View => skip_binview(field_nodes, buffers, variadic_buffer_counts),
}
}
19 changes: 11 additions & 8 deletions crates/polars-arrow/src/io/ipc/read/read_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(
fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
reader: &mut R,
buffer_length: usize,
length: usize,
output_length: Option<usize>,
is_little_endian: bool,
compression: Compression,
scratch: &mut Vec<u8>,
) -> PolarsResult<Vec<T>> {
if length == 0 {
if output_length == Some(0) {
return Ok(vec![]);
}

Expand All @@ -111,10 +111,6 @@ fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
)
}

// It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
let mut buffer = vec![T::default(); length];

// decompress first
scratch.clear();
scratch.try_reserve(buffer_length)?;
Expand All @@ -123,6 +119,13 @@ fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
.take(buffer_length as u64)
.read_to_end(scratch)?;

let length = output_length
.unwrap_or_else(|| i64::from_le_bytes(scratch[..8].try_into().unwrap()) as usize);

// It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
let mut buffer = vec![T::default(); length];

let out_slice = bytemuck::cast_slice_mut(&mut buffer);

let compression = compression
Expand Down Expand Up @@ -150,7 +153,7 @@ fn read_compressed_bytes<R: Read + Seek>(
read_compressed_buffer::<u8, _>(
reader,
buffer_length,
buffer_length,
None,
is_little_endian,
compression,
scratch,
Expand Down Expand Up @@ -224,7 +227,7 @@ pub fn read_buffer<T: NativeType, R: Read + Seek>(
Ok(read_compressed_buffer(
reader,
buffer_length,
length,
Some(length),
is_little_endian,
compression,
scratch,
Expand Down
23 changes: 23 additions & 0 deletions crates/polars/tests/it/io/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::io::{Seek, SeekFrom};

use polars::prelude::*;

#[test]
fn test_ipc_compression_variadic_buffers() {
let mut df = df![
"foo" => std::iter::repeat("Home delivery vat 24 %").take(3).collect::<Vec<_>>()
]
.unwrap();

let mut file = std::io::Cursor::new(vec![]);
IpcWriter::new(&mut file)
.with_compression(Some(IpcCompression::LZ4))
.with_pl_flavor(true)
.finish(&mut df)
.unwrap();

file.seek(SeekFrom::Start(0)).unwrap();
let out = IpcReader::new(file).finish().unwrap();

assert_eq!(out.shape(), (3, 1));
}
2 changes: 2 additions & 0 deletions crates/polars/tests/it/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ mod parquet;
#[cfg(feature = "avro")]
mod avro;

#[cfg(feature = "ipc")]
mod ipc;
#[cfg(feature = "ipc_streaming")]
mod ipc_stream;

Expand Down
Loading