Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in reading a non-finished IPC stream. (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 23, 2021
1 parent 824ad7e commit 4bbfb25
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 27 deletions.
1 change: 1 addition & 0 deletions examples/ipc_pyarrow/.gitignore
@@ -0,0 +1 @@
data.arrows
7 changes: 7 additions & 0 deletions examples/ipc_pyarrow/Cargo.toml
@@ -0,0 +1,7 @@
[package]
name = "ipc_stream"
version = "0.1.0"
edition = "2018"

[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_ipc"] }
24 changes: 24 additions & 0 deletions examples/ipc_pyarrow/main.py
@@ -0,0 +1,24 @@
import pyarrow as pa
from time import sleep
import socket

# Set up the data exchange socket
sk = socket.socket()
sk.bind(("127.0.0.1", 12989))
sk.listen()

data = [
pa.array([1, 2, 3, 4]),
pa.array(["foo", "bar", "baz", None]),
pa.array([True, None, False, True]),
]

batch = pa.record_batch(data, names=["f0", "f1", "f2"])

# Accept incoming connection and stream the data away
connection, address = sk.accept()
dummy_socket_file = connection.makefile("wb")
with pa.RecordBatchStreamWriter(dummy_socket_file, batch.schema) as writer:
for i in range(50):
writer.write_batch(batch)
sleep(1)
7 changes: 7 additions & 0 deletions examples/ipc_pyarrow/run.sh
@@ -0,0 +1,7 @@
python main.py &
PRODUCER_PID=$!

sleep 1 # wait for metadata to be available.
cargo run

kill $PRODUCER_PID
33 changes: 33 additions & 0 deletions examples/ipc_pyarrow/src/main.rs
@@ -0,0 +1,33 @@
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

use arrow2::array::{Array, Int64Array};
use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::ipc::read;

fn main() -> Result<()> {
const ADDRESS: &str = "127.0.0.1:12989";

let mut reader = TcpStream::connect(ADDRESS)?;
let metadata = read::read_stream_metadata(&mut reader)?;
let mut stream = read::StreamReader::new(&mut reader, metadata);

let mut idx = 0;
loop {
match stream.next() {
Some(x) => match x {
Ok(read::StreamState::Some(b)) => {
idx += 1;
println!("batch: {:?}", idx)
}
Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(2000)),
Err(l) => println!("{:?} ({})", l, idx),
},
None => break,
};
}

Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Expand Up @@ -13,4 +13,5 @@
- [Read Parquet](./io/parquet_read.md)
- [Write Parquet](./io/parquet_write.md)
- [Read Arrow](./io/ipc_read.md)
- [Read Arrow stream](./io/ipc_stream_read.md)
- [Write Arrow](./io/ipc_write.md)
21 changes: 21 additions & 0 deletions guide/src/io/ipc_stream_read.md
@@ -0,0 +1,21 @@
# Read Arrow streams

When compiled with feature `io_ipc`, this crate can be used to read Arrow streams.

The example below shows how to read from a stream:

```rust
{{#include ../../../examples/ipc_pyarrow/src/main.rs}}
```

e.g. written by pyarrow:

```python,ignore
{{#include ../../../examples/ipc_pyarrow/main.py}}
```

via

```bash,ignore
{{#include ../../../examples/ipc_pyarrow/run.sh}}
```
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<()> {
let mut reader = read::FileReader::new(&mut f, metadata, None);
let schema = reader.schema();

let mut writer = StreamWriter::try_new(std::io::stdout(), &schema)?;
let mut writer = StreamWriter::try_new(std::io::stdout(), schema)?;

reader.try_for_each(|batch| {
let batch = batch?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-stream-to-file.rs
Expand Up @@ -29,9 +29,9 @@ fn main() -> Result<()> {

let mut writer = io::stdout();

let mut writer = FileWriter::try_new(&mut writer, &schema)?;
let mut writer = FileWriter::try_new(&mut writer, schema)?;

arrow_stream_reader.try_for_each(|batch| writer.write(&batch?))?;
arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?;
writer.finish()?;

Ok(())
Expand Down
Expand Up @@ -131,7 +131,7 @@ async fn send_batch(
options: &write::IpcWriteOptions,
) -> Result {
let (dictionary_flight_data, mut batch_flight_data) =
arrow_flight::utils::flight_data_from_arrow_batch(batch, &options);
arrow_flight::utils::flight_data_from_arrow_batch(batch, options);

upload_tx
.send_all(&mut stream::iter(dictionary_flight_data).map(Ok))
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn verify_data(
consume_flight_location(
location,
ticket.clone(),
&expected_data,
expected_data,
expected_schema.clone(),
)
.await?;
Expand Down
Expand Up @@ -295,7 +295,7 @@ async fn record_batch_from_message(
schema_ref,
None,
true,
&dictionaries_by_field,
dictionaries_by_field,
MetadataVersion::V5,
&mut reader,
0,
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/mod.rs
Expand Up @@ -24,4 +24,4 @@ mod stream;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader};
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState};
43 changes: 26 additions & 17 deletions src/io/ipc/read/stream.rs
Expand Up @@ -23,7 +23,7 @@ use gen::Schema::MetadataVersion;
use crate::array::*;
use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::record_batch::{RecordBatch, RecordBatchReader};
use crate::record_batch::RecordBatch;

use super::super::CONTINUATION_MARKER;
use super::super::{convert, gen};
Expand Down Expand Up @@ -76,12 +76,27 @@ pub fn read_stream_metadata<R: Read>(reader: &mut R) -> Result<StreamMetadata> {
})
}

pub enum StreamState {
Waiting,
Some(RecordBatch),
}

impl StreamState {
pub fn unwrap(self) -> RecordBatch {
if let StreamState::Some(batch) = self {
batch
} else {
panic!("The batch is not available")
}
}
}

/// Reads the next item
pub fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
dictionaries_by_field: &mut Vec<Option<ArrayRef>>,
) -> Result<Option<RecordBatch>> {
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];

Expand All @@ -92,7 +107,7 @@ pub fn read_next<R: Read>(
// Handle EOF without the "0xFFFFFFFF 0x00000000"
// valid according to:
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
Ok(None)
Ok(Some(StreamState::Waiting))
} else {
Err(ArrowError::from(e))
};
Expand Down Expand Up @@ -144,7 +159,7 @@ pub fn read_next<R: Read>(
&mut reader,
0,
)
.map(Some)
.map(|x| Some(StreamState::Some(x)))
}
gen::Message::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
Expand All @@ -168,7 +183,7 @@ pub fn read_next<R: Read>(
// read the next message until we encounter a RecordBatch
read_next(reader, metadata, dictionaries_by_field)
}
gen::Message::MessageHeader::NONE => Ok(None),
gen::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)),
t => Err(ArrowError::Ipc(format!(
"Reading types other than record batches not yet supported, unable to read {:?} ",
t
Expand Down Expand Up @@ -210,32 +225,26 @@ impl<R: Read> StreamReader<R> {
self.finished
}

fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
fn maybe_next(&mut self) -> Result<Option<StreamState>> {
if self.finished {
return Ok(None);
}
let batch = read_next(
&mut self.reader,
&self.metadata,
&mut self.dictionaries_by_field,
)?;
if batch.is_none() {
self.finished = false;
}
if self.finished {
return Ok(None);
self.finished = true;
}
Ok(batch)
}
}

impl<R: Read> Iterator for StreamReader<R> {
type Item = Result<RecordBatch>;
type Item = Result<StreamState>;

fn next(&mut self) -> Option<Self::Item> {
self.maybe_next().transpose()
}
}

impl<R: Read> RecordBatchReader for StreamReader<R> {
fn schema(&self) -> &Schema {
self.metadata.schema.as_ref()
}
}
5 changes: 4 additions & 1 deletion tests/it/io/ipc/common.rs
Expand Up @@ -62,6 +62,9 @@ pub fn read_arrow_stream(version: &str, file_name: &str) -> (Schema, Vec<RecordB

(
schema.as_ref().clone(),
reader.collect::<Result<_>>().unwrap(),
reader
.map(|x| x.map(|x| x.unwrap()))
.collect::<Result<_>>()
.unwrap(),
)
}
2 changes: 1 addition & 1 deletion tests/it/io/ipc/read/stream.rs
Expand Up @@ -22,7 +22,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {

batches
.iter()
.zip(reader.map(|x| x.unwrap()))
.zip(reader.map(|x| x.unwrap().unwrap()))
.for_each(|(lhs, rhs)| {
assert_eq!(lhs, &rhs);
});
Expand Down
5 changes: 4 additions & 1 deletion tests/it/io/ipc/write/stream.rs
Expand Up @@ -34,7 +34,10 @@ fn test_file(version: &str, file_name: &str) {

assert_eq!(schema.as_ref(), &expected_schema);

let batches = reader.collect::<Result<Vec<_>>>().unwrap();
let batches = reader
.map(|x| x.map(|x| x.unwrap()))
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(batches, expected_batches);
}
Expand Down

0 comments on commit 4bbfb25

Please sign in to comment.