diff --git a/integration-testing/src/bin/arrow-stream-to-file.rs b/integration-testing/src/bin/arrow-stream-to-file.rs index ab0855bf677..f5d9f81fd70 100644 --- a/integration-testing/src/bin/arrow-stream-to-file.rs +++ b/integration-testing/src/bin/arrow-stream-to-file.rs @@ -24,7 +24,7 @@ use arrow2::io::ipc::write; fn main() -> Result<()> { let mut reader = io::stdin(); let metadata = read::read_stream_metadata(&mut reader)?; - let mut arrow_stream_reader = read::StreamReader::new(reader, metadata.clone()); + let mut arrow_stream_reader = read::StreamReader::new(reader, metadata.clone(), None); let writer = io::stdout(); diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index c49512faaaa..3149e5e5b1f 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::io::Read; use arrow_format; @@ -91,6 +92,7 @@ fn read_next( dictionaries: &mut Dictionaries, message_buffer: &mut Vec, data_buffer: &mut Vec, + projection: &Option<(Vec, HashMap, Schema)>, ) -> Result> { // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -155,18 +157,26 @@ fn read_next( let mut reader = std::io::Cursor::new(data_buffer); - read_record_batch( + let chunk = read_record_batch( batch, &metadata.schema.fields, &metadata.ipc_schema, - None, + projection.as_ref().map(|x| x.0.as_ref()), dictionaries, metadata.version, &mut reader, 0, file_size, - ) - .map(|x| Some(StreamState::Some(x))) + ); + + if let Some((_, map, _)) = projection { + // re-order according to projection + chunk + .map(|chunk| apply_projection(chunk, map)) + .map(|x| Some(StreamState::Some(x))) + } else { + chunk.map(|x| Some(StreamState::Some(x))) + } } arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { let mut buf = vec![0; block_length]; @@ -185,7 +195,14 @@ fn read_next( )?; // read the next message until we encounter a RecordBatch message - read_next(reader, metadata, dictionaries, message_buffer, data_buffer) + read_next( + reader, + metadata, + dictionaries, + message_buffer, + data_buffer, + projection, + ) } _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), } @@ -204,6 +221,7 @@ pub struct StreamReader { finished: bool, data_buffer: Vec, message_buffer: Vec, + projection: Option<(Vec, HashMap, Schema)>, } impl StreamReader { @@ -212,7 +230,16 @@ impl StreamReader { /// The first message in the stream is the schema, the reader will fail if it does not /// encounter a schema. /// To check if the reader is done, use `is_finished(self)` - pub fn new(reader: R, metadata: StreamMetadata) -> Self { + pub fn new(reader: R, metadata: StreamMetadata, projection: Option>) -> Self { + let projection = projection.map(|projection| { + let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection); + let schema = Schema { + fields, + metadata: metadata.schema.metadata.clone(), + }; + (p, h, schema) + }); + Self { reader, metadata, @@ -220,6 +247,7 @@ impl StreamReader { finished: false, data_buffer: vec![], message_buffer: vec![], + projection, } } @@ -228,6 +256,14 @@ impl StreamReader { &self.metadata } + /// Return the schema of the file + pub fn schema(&self) -> &Schema { + self.projection + .as_ref() + .map(|x| &x.2) + .unwrap_or(&self.metadata.schema) + } + /// Check if the stream is finished pub fn is_finished(&self) -> bool { self.finished @@ -243,6 +279,7 @@ impl StreamReader { &mut self.dictionaries, &mut self.message_buffer, &mut self.data_buffer, + &self.projection, )?; if batch.is_none() { self.finished = true; diff --git a/tests/it/io/ipc/common.rs b/tests/it/io/ipc/common.rs index 45df69f46af..deaf44ec6ab 100644 --- a/tests/it/io/ipc/common.rs +++ b/tests/it/io/ipc/common.rs @@ -46,7 +46,11 @@ pub fn read_gzip_json(version: &str, file_name: &str) -> Result { Ok((schema, ipc_fields, batches)) } -pub fn read_arrow_stream(version: &str, file_name: &str) -> IpcRead { +pub fn read_arrow_stream( + version: &str, + file_name: &str, + projection: Option>, +) -> IpcRead { let testdata = crate::test_util::arrow_test_data(); let mut file = File::open(format!( "{}/arrow-ipc-stream/integration/{}/{}.stream", @@ -55,7 +59,7 @@ pub fn read_arrow_stream(version: &str, file_name: &str) -> IpcRead { .unwrap(); let metadata = read_stream_metadata(&mut file).unwrap(); - let reader = StreamReader::new(file, metadata); + let reader = StreamReader::new(file, metadata, projection); let schema = reader.metadata().schema.clone(); let ipc_fields = reader.metadata().ipc_schema.fields.clone(); diff --git a/tests/it/io/ipc/read/stream.rs b/tests/it/io/ipc/read/stream.rs index 18e7ecd0756..ab8ae45a0e2 100644 --- a/tests/it/io/ipc/read/stream.rs +++ b/tests/it/io/ipc/read/stream.rs @@ -1,3 +1,4 @@ +use arrow2::chunk::Chunk; use std::fs::File; use arrow2::error::Result; @@ -13,7 +14,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { ))?; let metadata = read_stream_metadata(&mut file)?; - let reader = StreamReader::new(file, metadata); + let reader = StreamReader::new(file, metadata, None); // read expected JSON output let (schema, ipc_fields, batches) = read_gzip_json(version, file_name)?; @@ -100,3 +101,54 @@ fn read_generated_200_compression_lz4() -> Result<()> { fn read_generated_200_compression_zstd() -> Result<()> { test_file("2.0.0-compression", "generated_zstd") } + +fn test_projection(version: &str, file_name: &str, columns: Vec) -> Result<()> { + let testdata = crate::test_util::arrow_test_data(); + let mut file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, file_name + ))?; + + let metadata = read_stream_metadata(&mut file)?; + + let (_, _, chunks) = read_gzip_json(version, file_name)?; + + let expected_fields = columns + .iter() + .copied() + .map(|x| metadata.schema.fields[x].clone()) + .collect::>(); + + let expected_chunks = chunks + .into_iter() + .map(|chunk| { + let columns = columns + .iter() + .copied() + .map(|x| chunk.arrays()[x].clone()) + .collect::>(); + Chunk::new(columns) + }) + .collect::>(); + + let reader = StreamReader::new(&mut file, metadata, Some(columns.clone())); + + assert_eq!(reader.schema().fields, expected_fields); + + expected_chunks + .iter() + .zip(reader.map(|x| x.unwrap().unwrap())) + .for_each(|(lhs, rhs)| { + assert_eq!(lhs, &rhs); + }); + Ok(()) +} + +#[test] +fn read_projected() -> Result<()> { + test_projection("1.0.0-littleendian", "generated_primitive", vec![1])?; + test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?; + test_projection("1.0.0-littleendian", "generated_nested", vec![0])?; + test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])?; + test_projection("1.0.0-littleendian", "generated_primitive", vec![0, 2, 1]) +} diff --git a/tests/it/io/ipc/write/stream.rs b/tests/it/io/ipc/write/stream.rs index 46f62aa11b4..092e4402b7d 100644 --- a/tests/it/io/ipc/write/stream.rs +++ b/tests/it/io/ipc/write/stream.rs @@ -30,13 +30,13 @@ fn write_( } fn test_file(version: &str, file_name: &str) { - let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name); + let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None); let result = write_(&schema, Some(ipc_fields), &batches); let mut reader = Cursor::new(result); let metadata = read_stream_metadata(&mut reader).unwrap(); - let reader = StreamReader::new(reader, metadata); + let reader = StreamReader::new(reader, metadata, None); let schema = reader.metadata().schema.clone(); let ipc_fields = reader.metadata().ipc_schema.fields.clone(); diff --git a/tests/it/io/ipc/write_file_async.rs b/tests/it/io/ipc/write_file_async.rs index 7bd76b86be0..25b8164f058 100644 --- a/tests/it/io/ipc/write_file_async.rs +++ b/tests/it/io/ipc/write_file_async.rs @@ -32,7 +32,7 @@ async fn write_( } async fn test_file(version: &str, file_name: &str) -> Result<()> { - let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name); + let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None); let result = write_(&schema, &ipc_fields, &batches).await?; diff --git a/tests/it/io/ipc/write_stream_async.rs b/tests/it/io/ipc/write_stream_async.rs index e033d5e9435..12669bdd59e 100644 --- a/tests/it/io/ipc/write_stream_async.rs +++ b/tests/it/io/ipc/write_stream_async.rs @@ -32,13 +32,13 @@ async fn write_( } async fn test_file(version: &str, file_name: &str) -> Result<()> { - let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name); + let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None); let result = write_(&schema, &ipc_fields, &batches).await?; let mut reader = Cursor::new(result); let metadata = read::read_stream_metadata(&mut reader)?; - let reader = read::StreamReader::new(reader, metadata); + let reader = read::StreamReader::new(reader, metadata, None); let schema = &reader.metadata().schema; let ipc_fields = reader.metadata().ipc_schema.fields.clone();