Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed error in reading a non-finished IPC stream. #302

Merged
merged 4 commits into from Aug 23, 2021
Merged

Conversation

jorgecarleitao
Copy link
Owner

@jorgecarleitao jorgecarleitao commented Aug 19, 2021

When the stream reader gets called .next() and the stream has not finished, we should not error nor return None (end of stream), and instead should offer the state Waiting, so that the user can decide what to do if no new batches have been observed, but no finished state has also been found.

This PR changes the return state of the stream reader returns Option<Result<State>> where State:

pub enum State {
    Waiting,
    Some(RecordBatch),
}
  • None describes the end of the stream
  • Some(Err) describes an error
  • Some(Ok(State::Waiting)) describes that the stream has not finished, but that no new data is available to read
  • Some(Ok(State::Some(_)) describes a new batch

Thanks a lot to @HagaiHargil for clarifying the limitations of the stream reader.

Backwards incompatible changes:

  • StreamReader no longer implements RecordBatchReader
  • Iterator implementation of StreamReader no longer returns Option<Result<RecordBatch>> and instead returns Option<Result<State>>. Use State::unwrap if you are certain that the stream contains a batch (and is not waiting).

Closes #301

@codecov
Copy link

codecov bot commented Aug 19, 2021

Codecov Report

Merging #302 (c838e40) into main (824ad7e) will increase coverage by 0.00%.
The diff coverage is 68.75%.

Impacted file tree graph

@@           Coverage Diff           @@
##             main     #302   +/-   ##
=======================================
  Coverage   80.55%   80.56%           
=======================================
  Files         324      324           
  Lines       21399    21404    +5     
=======================================
+ Hits        17239    17244    +5     
  Misses       4160     4160           
Impacted Files Coverage Δ
...ng/src/flight_server_scenarios/integration_test.rs 0.00% <ø> (ø)
src/io/ipc/read/stream.rs 82.92% <54.54%> (+0.64%) ⬆️
tests/it/io/ipc/common.rs 100.00% <100.00%> (ø)
tests/it/io/ipc/read/stream.rs 100.00% <100.00%> (ø)
tests/it/io/ipc/write/stream.rs 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 824ad7e...c838e40. Read the comment docs.

@HagaiHargil
Copy link
Contributor

Hey,

I can confirm that this works as intended for me! TBH, I still had core dumps due to memory allocations, but once I switched over to sockets this resolved itself.

The nice thing about the new StreamState enum is that my app is now naturally "locked in" to the timing of the original stream - when its event rate is high my app responds faster, and when it's lower I'm not needlessly sampling the socket.

With respect to docs - it's always hard to convince the Arrow mailing list that something in unclear in their docs for some reason. Perhaps you could add a warning here that data should not be simultaneously written and read from simple files, but only from sockets? It's kinda obvious, but the Arrow docs hint that it is possible using their (Pythonic) file format.

Thanks again!

@jorgecarleitao
Copy link
Owner Author

Awesome! Could you share or PR the socket solution? I added an example in this PR demonstrating files, but if we change the example to use sockets like you are doing, maybe it is more obvious what users should use?

@HagaiHargil
Copy link
Contributor

Not sure what's the best way to "merge" my proposed changes, so I'll just add them here:

examples/ipc_pyarrow/main.py:

import pyarrow as pa
from time import sleep
import socket

# Set up the data exchange socket
HOST = 127.0.0.1
PORT = 12989
sk = socket.socket()
sk.bind((HOST, PORT))
sk.listen()

data = [
    pa.array([1, 2, 3, 4]),
    pa.array(["foo", "bar", "baz", None]),
    pa.array([True, None, False, True]),
]

batch = pa.record_batch(data, names=["f0", "f1", "f2"])

# Accept incoming connection and stream the data away
connection, address = sk.accept()
dummy_socket_file = connection.makefile('wb')
writer = pa.ipc.new_stream(dummy_socket_file, batch.schema)
while True:
    for _ in range(10):
        writer.write(batch)
    sleep(1)

examples/ipc_pyarrow/src/main.rs:

use std::net::TcpStream;
use std::thread;
use std::time::Duration;

use arrow2::array::{Array, Int64Array};
use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::ipc::read;

fn main() -> Result<()> {
    const ADDRESS: &str = "127.0.0.1:12989";

    let mut reader = TcpStream::connect(ADDRESS)?;
    let metadata = read::read_stream_metadata(&mut reader)?;
    let mut stream = read::StreamReader::new(&mut reader, metadata);

    let mut idx = 0;
    loop {
        match stream.next() {
            Some(x) => match x {
                Ok(read::StreamState::Some(b)) => {
                    idx += 1;
                    println!("batch: {:?}", idx)
                }
                Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(4000)),
                Err(l) => println!("{:?} ({})", l, idx),
            },
            None => break,
        };
    }

    Ok(())
}

@jorgecarleitao
Copy link
Owner Author

Awesome, I incorporated it in the example; thank you for taking the time. I aim to release this on the v0.4, so that you do not need to depend on the github version.

@jorgecarleitao jorgecarleitao merged commit 4bbfb25 into main Aug 23, 2021
@jorgecarleitao jorgecarleitao deleted the stream branch August 23, 2021 14:04
@jorgecarleitao
Copy link
Owner Author

The example is available in examples/ and here: https://jorgecarleitao.github.io/arrow2/io/ipc_stream_read.html

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

IPC's StreamReader may abort due to excessive memory by overflowing a usized variable
2 participants