Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed error in reading a non-finished IPC stream. #302

Merged
merged 4 commits into from Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/ipc_pyarrow/.gitignore
@@ -0,0 +1 @@
data.arrows
7 changes: 7 additions & 0 deletions examples/ipc_pyarrow/Cargo.toml
@@ -0,0 +1,7 @@
[package]
name = "ipc_stream"
version = "0.1.0"
edition = "2018"

[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_ipc"] }
24 changes: 24 additions & 0 deletions examples/ipc_pyarrow/main.py
@@ -0,0 +1,24 @@
import pyarrow as pa
from time import sleep
import socket

# Set up the data exchange socket
sk = socket.socket()
sk.bind(("127.0.0.1", 12989))
sk.listen()

data = [
pa.array([1, 2, 3, 4]),
pa.array(["foo", "bar", "baz", None]),
pa.array([True, None, False, True]),
]

batch = pa.record_batch(data, names=["f0", "f1", "f2"])

# Accept incoming connection and stream the data away
connection, address = sk.accept()
dummy_socket_file = connection.makefile("wb")
with pa.RecordBatchStreamWriter(dummy_socket_file, batch.schema) as writer:
for i in range(50):
writer.write_batch(batch)
sleep(1)
7 changes: 7 additions & 0 deletions examples/ipc_pyarrow/run.sh
@@ -0,0 +1,7 @@
python main.py &
PRODUCER_PID=$!

sleep 1 # wait for metadata to be available.
cargo run

kill $PRODUCER_PID
33 changes: 33 additions & 0 deletions examples/ipc_pyarrow/src/main.rs
@@ -0,0 +1,33 @@
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

use arrow2::array::{Array, Int64Array};
use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::ipc::read;

fn main() -> Result<()> {
const ADDRESS: &str = "127.0.0.1:12989";

let mut reader = TcpStream::connect(ADDRESS)?;
let metadata = read::read_stream_metadata(&mut reader)?;
let mut stream = read::StreamReader::new(&mut reader, metadata);

let mut idx = 0;
loop {
match stream.next() {
Some(x) => match x {
Ok(read::StreamState::Some(b)) => {
idx += 1;
println!("batch: {:?}", idx)
}
Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(2000)),
Err(l) => println!("{:?} ({})", l, idx),
},
None => break,
};
}

Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Expand Up @@ -13,4 +13,5 @@
- [Read Parquet](./io/parquet_read.md)
- [Write Parquet](./io/parquet_write.md)
- [Read Arrow](./io/ipc_read.md)
- [Read Arrow stream](./io/ipc_stream_read.md)
- [Write Arrow](./io/ipc_write.md)
21 changes: 21 additions & 0 deletions guide/src/io/ipc_stream_read.md
@@ -0,0 +1,21 @@
# Read Arrow streams

When compiled with feature `io_ipc`, this crate can be used to read Arrow streams.

The example below shows how to read from a stream:

```rust
{{#include ../../../examples/ipc_pyarrow/src/main.rs}}
```

e.g. written by pyarrow:

```python,ignore
{{#include ../../../examples/ipc_pyarrow/main.py}}
```

via

```bash,ignore
{{#include ../../../examples/ipc_pyarrow/run.sh}}
```
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<()> {
let mut reader = read::FileReader::new(&mut f, metadata, None);
let schema = reader.schema();

let mut writer = StreamWriter::try_new(std::io::stdout(), &schema)?;
let mut writer = StreamWriter::try_new(std::io::stdout(), schema)?;

reader.try_for_each(|batch| {
let batch = batch?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-stream-to-file.rs
Expand Up @@ -29,9 +29,9 @@ fn main() -> Result<()> {

let mut writer = io::stdout();

let mut writer = FileWriter::try_new(&mut writer, &schema)?;
let mut writer = FileWriter::try_new(&mut writer, schema)?;

arrow_stream_reader.try_for_each(|batch| writer.write(&batch?))?;
arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?;
writer.finish()?;

Ok(())
Expand Down
Expand Up @@ -131,7 +131,7 @@ async fn send_batch(
options: &write::IpcWriteOptions,
) -> Result {
let (dictionary_flight_data, mut batch_flight_data) =
arrow_flight::utils::flight_data_from_arrow_batch(batch, &options);
arrow_flight::utils::flight_data_from_arrow_batch(batch, options);

upload_tx
.send_all(&mut stream::iter(dictionary_flight_data).map(Ok))
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn verify_data(
consume_flight_location(
location,
ticket.clone(),
&expected_data,
expected_data,
expected_schema.clone(),
)
.await?;
Expand Down
Expand Up @@ -295,7 +295,7 @@ async fn record_batch_from_message(
schema_ref,
None,
true,
&dictionaries_by_field,
dictionaries_by_field,
MetadataVersion::V5,
&mut reader,
0,
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/mod.rs
Expand Up @@ -24,4 +24,4 @@ mod stream;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader};
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState};
43 changes: 26 additions & 17 deletions src/io/ipc/read/stream.rs
Expand Up @@ -23,7 +23,7 @@ use gen::Schema::MetadataVersion;
use crate::array::*;
use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::record_batch::{RecordBatch, RecordBatchReader};
use crate::record_batch::RecordBatch;

use super::super::CONTINUATION_MARKER;
use super::super::{convert, gen};
Expand Down Expand Up @@ -76,12 +76,27 @@ pub fn read_stream_metadata<R: Read>(reader: &mut R) -> Result<StreamMetadata> {
})
}

pub enum StreamState {
Waiting,
Some(RecordBatch),
}

impl StreamState {
pub fn unwrap(self) -> RecordBatch {
if let StreamState::Some(batch) = self {
batch
} else {
panic!("The batch is not available")
}
}
}

/// Reads the next item
pub fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
dictionaries_by_field: &mut Vec<Option<ArrayRef>>,
) -> Result<Option<RecordBatch>> {
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];

Expand All @@ -92,7 +107,7 @@ pub fn read_next<R: Read>(
// Handle EOF without the "0xFFFFFFFF 0x00000000"
// valid according to:
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
Ok(None)
Ok(Some(StreamState::Waiting))
} else {
Err(ArrowError::from(e))
};
Expand Down Expand Up @@ -144,7 +159,7 @@ pub fn read_next<R: Read>(
&mut reader,
0,
)
.map(Some)
.map(|x| Some(StreamState::Some(x)))
}
gen::Message::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
Expand All @@ -168,7 +183,7 @@ pub fn read_next<R: Read>(
// read the next message until we encounter a RecordBatch
read_next(reader, metadata, dictionaries_by_field)
}
gen::Message::MessageHeader::NONE => Ok(None),
gen::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)),
t => Err(ArrowError::Ipc(format!(
"Reading types other than record batches not yet supported, unable to read {:?} ",
t
Expand Down Expand Up @@ -210,32 +225,26 @@ impl<R: Read> StreamReader<R> {
self.finished
}

fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
fn maybe_next(&mut self) -> Result<Option<StreamState>> {
if self.finished {
return Ok(None);
}
let batch = read_next(
&mut self.reader,
&self.metadata,
&mut self.dictionaries_by_field,
)?;
if batch.is_none() {
self.finished = false;
}
if self.finished {
return Ok(None);
self.finished = true;
}
Ok(batch)
}
}

impl<R: Read> Iterator for StreamReader<R> {
type Item = Result<RecordBatch>;
type Item = Result<StreamState>;

fn next(&mut self) -> Option<Self::Item> {
self.maybe_next().transpose()
}
}

impl<R: Read> RecordBatchReader for StreamReader<R> {
fn schema(&self) -> &Schema {
self.metadata.schema.as_ref()
}
}
5 changes: 4 additions & 1 deletion tests/it/io/ipc/common.rs
Expand Up @@ -62,6 +62,9 @@ pub fn read_arrow_stream(version: &str, file_name: &str) -> (Schema, Vec<RecordB

(
schema.as_ref().clone(),
reader.collect::<Result<_>>().unwrap(),
reader
.map(|x| x.map(|x| x.unwrap()))
.collect::<Result<_>>()
.unwrap(),
)
}
2 changes: 1 addition & 1 deletion tests/it/io/ipc/read/stream.rs
Expand Up @@ -22,7 +22,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {

batches
.iter()
.zip(reader.map(|x| x.unwrap()))
.zip(reader.map(|x| x.unwrap().unwrap()))
.for_each(|(lhs, rhs)| {
assert_eq!(lhs, &rhs);
});
Expand Down
5 changes: 4 additions & 1 deletion tests/it/io/ipc/write/stream.rs
Expand Up @@ -34,7 +34,10 @@ fn test_file(version: &str, file_name: &str) {

assert_eq!(schema.as_ref(), &expected_schema);

let batches = reader.collect::<Result<Vec<_>>>().unwrap();
let batches = reader
.map(|x| x.map(|x| x.unwrap()))
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(batches, expected_batches);
}
Expand Down