Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for projections in reading IPC streams #1097

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-stream-to-file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow2::io::ipc::write;
fn main() -> Result<()> {
let mut reader = io::stdin();
let metadata = read::read_stream_metadata(&mut reader)?;
let mut arrow_stream_reader = read::StreamReader::new(reader, metadata.clone());
let mut arrow_stream_reader = read::StreamReader::new(reader, metadata.clone(), None);

let writer = io::stdout();

Expand Down
49 changes: 43 additions & 6 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::Read;

use arrow_format;
Expand Down Expand Up @@ -91,6 +92,7 @@ fn read_next<R: Read>(
dictionaries: &mut Dictionaries,
message_buffer: &mut Vec<u8>,
data_buffer: &mut Vec<u8>,
projection: &Option<(Vec<usize>, HashMap<usize, usize>, Schema)>,
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_length: [u8; 4] = [0; 4];
Expand Down Expand Up @@ -155,18 +157,26 @@ fn read_next<R: Read>(

let mut reader = std::io::Cursor::new(data_buffer);

read_record_batch(
let chunk = read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
None,
projection.as_ref().map(|x| x.0.as_ref()),
dictionaries,
metadata.version,
&mut reader,
0,
file_size,
)
.map(|x| Some(StreamState::Some(x)))
);

if let Some((_, map, _)) = projection {
// re-order according to projection
chunk
.map(|chunk| apply_projection(chunk, map))
.map(|x| Some(StreamState::Some(x)))
} else {
chunk.map(|x| Some(StreamState::Some(x)))
}
}
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
let mut buf = vec![0; block_length];
Expand All @@ -185,7 +195,14 @@ fn read_next<R: Read>(
)?;

// read the next message until we encounter a RecordBatch message
read_next(reader, metadata, dictionaries, message_buffer, data_buffer)
read_next(
reader,
metadata,
dictionaries,
message_buffer,
data_buffer,
projection,
)
}
_ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)),
}
Expand All @@ -204,6 +221,7 @@ pub struct StreamReader<R: Read> {
finished: bool,
data_buffer: Vec<u8>,
message_buffer: Vec<u8>,
projection: Option<(Vec<usize>, HashMap<usize, usize>, Schema)>,
}

impl<R: Read> StreamReader<R> {
Expand All @@ -212,14 +230,24 @@ impl<R: Read> StreamReader<R> {
/// The first message in the stream is the schema, the reader will fail if it does not
/// encounter a schema.
/// To check if the reader is done, use `is_finished(self)`
pub fn new(reader: R, metadata: StreamMetadata) -> Self {
pub fn new(reader: R, metadata: StreamMetadata, projection: Option<Vec<usize>>) -> Self {
let projection = projection.map(|projection| {
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = Schema {
fields,
metadata: metadata.schema.metadata.clone(),
};
(p, h, schema)
});

Self {
reader,
metadata,
dictionaries: Default::default(),
finished: false,
data_buffer: vec![],
message_buffer: vec![],
projection,
}
}

Expand All @@ -228,6 +256,14 @@ impl<R: Read> StreamReader<R> {
&self.metadata
}

/// Return the schema of the file
pub fn schema(&self) -> &Schema {
self.projection
.as_ref()
.map(|x| &x.2)
.unwrap_or(&self.metadata.schema)
}

/// Check if the stream is finished
pub fn is_finished(&self) -> bool {
self.finished
Expand All @@ -243,6 +279,7 @@ impl<R: Read> StreamReader<R> {
&mut self.dictionaries,
&mut self.message_buffer,
&mut self.data_buffer,
&self.projection,
)?;
if batch.is_none() {
self.finished = true;
Expand Down
8 changes: 6 additions & 2 deletions tests/it/io/ipc/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ pub fn read_gzip_json(version: &str, file_name: &str) -> Result<IpcRead> {
Ok((schema, ipc_fields, batches))
}

pub fn read_arrow_stream(version: &str, file_name: &str) -> IpcRead {
pub fn read_arrow_stream(
version: &str,
file_name: &str,
projection: Option<Vec<usize>>,
) -> IpcRead {
let testdata = crate::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
Expand All @@ -55,7 +59,7 @@ pub fn read_arrow_stream(version: &str, file_name: &str) -> IpcRead {
.unwrap();

let metadata = read_stream_metadata(&mut file).unwrap();
let reader = StreamReader::new(file, metadata);
let reader = StreamReader::new(file, metadata, projection);

let schema = reader.metadata().schema.clone();
let ipc_fields = reader.metadata().ipc_schema.fields.clone();
Expand Down
54 changes: 53 additions & 1 deletion tests/it/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use arrow2::chunk::Chunk;
use std::fs::File;

use arrow2::error::Result;
Expand All @@ -13,7 +14,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {
))?;

let metadata = read_stream_metadata(&mut file)?;
let reader = StreamReader::new(file, metadata);
let reader = StreamReader::new(file, metadata, None);

// read expected JSON output
let (schema, ipc_fields, batches) = read_gzip_json(version, file_name)?;
Expand Down Expand Up @@ -100,3 +101,54 @@ fn read_generated_200_compression_lz4() -> Result<()> {
fn read_generated_200_compression_zstd() -> Result<()> {
test_file("2.0.0-compression", "generated_zstd")
}

fn test_projection(version: &str, file_name: &str, columns: Vec<usize>) -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, file_name
))?;

let metadata = read_stream_metadata(&mut file)?;

let (_, _, chunks) = read_gzip_json(version, file_name)?;

let expected_fields = columns
.iter()
.copied()
.map(|x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();

let expected_chunks = chunks
.into_iter()
.map(|chunk| {
let columns = columns
.iter()
.copied()
.map(|x| chunk.arrays()[x].clone())
.collect::<Vec<_>>();
Chunk::new(columns)
})
.collect::<Vec<_>>();

let reader = StreamReader::new(&mut file, metadata, Some(columns.clone()));

assert_eq!(reader.schema().fields, expected_fields);

expected_chunks
.iter()
.zip(reader.map(|x| x.unwrap().unwrap()))
.for_each(|(lhs, rhs)| {
assert_eq!(lhs, &rhs);
});
Ok(())
}

#[test]
fn read_projected() -> Result<()> {
test_projection("1.0.0-littleendian", "generated_primitive", vec![1])?;
test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?;
test_projection("1.0.0-littleendian", "generated_nested", vec![0])?;
test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])?;
test_projection("1.0.0-littleendian", "generated_primitive", vec![0, 2, 1])
}
4 changes: 2 additions & 2 deletions tests/it/io/ipc/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ fn write_(
}

fn test_file(version: &str, file_name: &str) {
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name);
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None);

let result = write_(&schema, Some(ipc_fields), &batches);

let mut reader = Cursor::new(result);
let metadata = read_stream_metadata(&mut reader).unwrap();
let reader = StreamReader::new(reader, metadata);
let reader = StreamReader::new(reader, metadata, None);

let schema = reader.metadata().schema.clone();
let ipc_fields = reader.metadata().ipc_schema.fields.clone();
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/ipc/write_file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn write_(
}

async fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name);
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None);

let result = write_(&schema, &ipc_fields, &batches).await?;

Expand Down
4 changes: 2 additions & 2 deletions tests/it/io/ipc/write_stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ async fn write_(
}

async fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name);
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None);

let result = write_(&schema, &ipc_fields, &batches).await?;

let mut reader = Cursor::new(result);
let metadata = read::read_stream_metadata(&mut reader)?;
let reader = read::StreamReader::new(reader, metadata);
let reader = read::StreamReader::new(reader, metadata, None);

let schema = &reader.metadata().schema;
let ipc_fields = reader.metadata().ipc_schema.fields.clone();
Expand Down