Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for projections in reading IPC streams #1097

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-stream-to-file.rs
Expand Up @@ -24,7 +24,7 @@ use arrow2::io::ipc::write;
fn main() -> Result<()> {
let mut reader = io::stdin();
let metadata = read::read_stream_metadata(&mut reader)?;
let mut arrow_stream_reader = read::StreamReader::new(reader, metadata.clone());
let mut arrow_stream_reader = read::StreamReader::new(reader, metadata.clone(), None);

let writer = io::stdout();

Expand Down
49 changes: 43 additions & 6 deletions src/io/ipc/read/stream.rs
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::Read;

use arrow_format;
Expand Down Expand Up @@ -91,6 +92,7 @@ fn read_next<R: Read>(
dictionaries: &mut Dictionaries,
message_buffer: &mut Vec<u8>,
data_buffer: &mut Vec<u8>,
projection: &Option<(Vec<usize>, HashMap<usize, usize>, Schema)>,
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_length: [u8; 4] = [0; 4];
Expand Down Expand Up @@ -155,18 +157,26 @@ fn read_next<R: Read>(

let mut reader = std::io::Cursor::new(data_buffer);

read_record_batch(
let chunk = read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
None,
projection.as_ref().map(|x| x.0.as_ref()),
dictionaries,
metadata.version,
&mut reader,
0,
file_size,
)
.map(|x| Some(StreamState::Some(x)))
);

if let Some((_, map, _)) = projection {
// re-order according to projection
chunk
.map(|chunk| apply_projection(chunk, &map))
joshuataylor marked this conversation as resolved.
Show resolved Hide resolved
.map(|x| Some(StreamState::Some(x)))
} else {
chunk.map(|x| Some(StreamState::Some(x)))
}
}
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
let mut buf = vec![0; block_length];
Expand All @@ -185,7 +195,14 @@ fn read_next<R: Read>(
)?;

// read the next message until we encounter a RecordBatch message
read_next(reader, metadata, dictionaries, message_buffer, data_buffer)
read_next(
reader,
metadata,
dictionaries,
message_buffer,
data_buffer,
projection,
)
}
_ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)),
}
Expand All @@ -204,6 +221,7 @@ pub struct StreamReader<R: Read> {
finished: bool,
data_buffer: Vec<u8>,
message_buffer: Vec<u8>,
projection: Option<(Vec<usize>, HashMap<usize, usize>, Schema)>,
}

impl<R: Read> StreamReader<R> {
Expand All @@ -212,14 +230,24 @@ impl<R: Read> StreamReader<R> {
/// The first message in the stream is the schema, the reader will fail if it does not
/// encounter a schema.
/// To check if the reader is done, use `is_finished(self)`
pub fn new(reader: R, metadata: StreamMetadata) -> Self {
pub fn new(reader: R, metadata: StreamMetadata, projection: Option<Vec<usize>>) -> Self {
let projection = projection.map(|projection| {
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = Schema {
fields,
metadata: metadata.schema.metadata.clone(),
};
(p, h, schema)
});

Self {
reader,
metadata,
dictionaries: Default::default(),
finished: false,
data_buffer: vec![],
message_buffer: vec![],
projection,
}
}

Expand All @@ -228,6 +256,14 @@ impl<R: Read> StreamReader<R> {
&self.metadata
}

/// Return the schema of the file
pub fn schema(&self) -> &Schema {
self.projection
.as_ref()
.map(|x| &x.2)
.unwrap_or(&self.metadata.schema)
}

/// Check if the stream is finished
pub fn is_finished(&self) -> bool {
self.finished
Expand All @@ -243,6 +279,7 @@ impl<R: Read> StreamReader<R> {
&mut self.dictionaries,
&mut self.message_buffer,
&mut self.data_buffer,
&self.projection,
)?;
if batch.is_none() {
self.finished = true;
Expand Down
8 changes: 6 additions & 2 deletions tests/it/io/ipc/common.rs
Expand Up @@ -46,7 +46,11 @@ pub fn read_gzip_json(version: &str, file_name: &str) -> Result<IpcRead> {
Ok((schema, ipc_fields, batches))
}

pub fn read_arrow_stream(version: &str, file_name: &str) -> IpcRead {
pub fn read_arrow_stream(
version: &str,
file_name: &str,
projection: Option<Vec<usize>>,
) -> IpcRead {
let testdata = crate::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
Expand All @@ -55,7 +59,7 @@ pub fn read_arrow_stream(version: &str, file_name: &str) -> IpcRead {
.unwrap();

let metadata = read_stream_metadata(&mut file).unwrap();
let reader = StreamReader::new(file, metadata);
let reader = StreamReader::new(file, metadata, projection);

let schema = reader.metadata().schema.clone();
let ipc_fields = reader.metadata().ipc_schema.fields.clone();
Expand Down
54 changes: 53 additions & 1 deletion tests/it/io/ipc/read/stream.rs
@@ -1,3 +1,4 @@
use arrow2::chunk::Chunk;
use std::fs::File;

use arrow2::error::Result;
Expand All @@ -13,7 +14,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {
))?;

let metadata = read_stream_metadata(&mut file)?;
let reader = StreamReader::new(file, metadata);
let reader = StreamReader::new(file, metadata, None);

// read expected JSON output
let (schema, ipc_fields, batches) = read_gzip_json(version, file_name)?;
Expand Down Expand Up @@ -100,3 +101,54 @@ fn read_generated_200_compression_lz4() -> Result<()> {
fn read_generated_200_compression_zstd() -> Result<()> {
test_file("2.0.0-compression", "generated_zstd")
}

fn test_projection(version: &str, file_name: &str, columns: Vec<usize>) -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, file_name
))?;

let metadata = read_stream_metadata(&mut file)?;

let (_, _, chunks) = read_gzip_json(version, file_name)?;

let expected_fields = columns
.iter()
.copied()
.map(|x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();

let expected_chunks = chunks
.into_iter()
.map(|chunk| {
let columns = columns
.iter()
.copied()
.map(|x| chunk.arrays()[x].clone())
.collect::<Vec<_>>();
Chunk::new(columns)
})
.collect::<Vec<_>>();

let reader = StreamReader::new(&mut file, metadata, Some(columns.clone()));

assert_eq!(reader.schema().fields, expected_fields);

expected_chunks
.iter()
.zip(reader.map(|x| x.unwrap().unwrap()))
.for_each(|(lhs, rhs)| {
assert_eq!(lhs, &rhs);
});
Ok(())
}

#[test]
fn read_projected() -> Result<()> {
test_projection("1.0.0-littleendian", "generated_primitive", vec![1])?;
test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?;
test_projection("1.0.0-littleendian", "generated_nested", vec![0])?;
test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])?;
test_projection("1.0.0-littleendian", "generated_primitive", vec![0, 2, 1])
}
4 changes: 2 additions & 2 deletions tests/it/io/ipc/write/stream.rs
Expand Up @@ -30,13 +30,13 @@ fn write_(
}

fn test_file(version: &str, file_name: &str) {
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name);
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None);

let result = write_(&schema, Some(ipc_fields), &batches);

let mut reader = Cursor::new(result);
let metadata = read_stream_metadata(&mut reader).unwrap();
let reader = StreamReader::new(reader, metadata);
let reader = StreamReader::new(reader, metadata, None);

let schema = reader.metadata().schema.clone();
let ipc_fields = reader.metadata().ipc_schema.fields.clone();
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/ipc/write_file_async.rs
Expand Up @@ -32,7 +32,7 @@ async fn write_(
}

async fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name);
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None);

let result = write_(&schema, &ipc_fields, &batches).await?;

Expand Down
4 changes: 2 additions & 2 deletions tests/it/io/ipc/write_stream_async.rs
Expand Up @@ -32,13 +32,13 @@ async fn write_(
}

async fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name);
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None);

let result = write_(&schema, &ipc_fields, &batches).await?;

let mut reader = Cursor::new(result);
let metadata = read::read_stream_metadata(&mut reader)?;
let reader = read::StreamReader::new(reader, metadata);
let reader = read::StreamReader::new(reader, metadata, None);

let schema = &reader.metadata().schema;
let ipc_fields = reader.metadata().ipc_schema.fields.clone();
Expand Down