Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

IPC's StreamReader may abort due to excessive memory by overflowing a usized variable #301

Closed
HagaiHargil opened this issue Aug 19, 2021 · 12 comments · Fixed by #302
Closed

Comments

@HagaiHargil
Copy link
Contributor

Hi,

I've been trying to use this crate as means to transfer real time streaming data between pyarrow and a Rust process. However, I've been having hard-to-debug issues with the data readout on the Rust side - basically after a few seconds of stream I always get a message similar to: memory allocation of 18446744071884874941 bytes failed and the process aborts without a traceback (Windows 10).

To debug it I forked and inserted a few logging statements in stream.rs which revealed this line as the culprit. Basically, sometime during my application meta_len becomes negative, and when its cast to a usize it obviously wraps around and causes the memory overflow. Now, if meta_len is negative it also means that meta_size is corrupt as well, but I'm still not quite sure what causes this corruption.

During the live streaming I get this error pretty reliably, but if I try to re-run this script without the Python side of things, i.e. only reading from the Arrow IPC file and not writing anything new to it, the file parses correctly. In other words, it appears that concurrent access to the file of the Python process - which writes the data - and the Rust process causes this issue. I was under the impression that Arrow's NativeFile format should help in these situations, but I might've interpreted the pyarrow docs incorrectly. I'm not quite sure how to lock a file between two different processes (if that's even possible), so I'm kinda lost with respect to that issue.

With all of this background, my question is a bit more straight-forward: Assuming that there happened some corruption at the meta_size level, how should I recover from it? Should I move the file Cursor back a few steps? Should I try to re-read the data? Or is aborting the streaming all that's left?

I hope my issue and specific question are clear enough. As a side note, I tried using arrow-rs's IPC implementation, but they have more severe bugs which are much less subtle than this one.

@jorgecarleitao
Copy link
Owner

jorgecarleitao commented Aug 19, 2021

Wow, thanks a lot for the detailed description! Let's work this out! :)

I agree with your analysis, something is corrupting the meta_len.

Are you streaming IPC files or IPC streams? I am asking because arrow has two completely different formats, an IPC stream and an IPC file. In general, IPC files are not appendable. An idea is to write your Python stream to a data.arrows (using the stream serializer), and try to read it from arrow2.

I will also try to reproduce your setup on my end to see if we can find the root cause of this.

@HagaiHargil
Copy link
Contributor Author

Thanks for the prompt response!

This is a stream, not a file (pyarrow.ipc.new_stream()), since I don't need arbitrary access to locations in the data. I'm treating it as a FIFO list of record batches, or at least that's what I hope is happening.

I'm not sure what you mean by "data.arrow_stream" - could you please elaborate?

@jorgecarleitao
Copy link
Owner

jorgecarleitao commented Aug 19, 2021

Sorry, I ended up creating more confusion. :/

i.e. only reading from the Arrow IPC file and not writing anything new to it, the file parses correctly

what is Arrow IPC file? Is just writing the stream to a file, or is it using the file writer?

I'm not sure what you mean by "data.arrow_stream" - could you please elaborate?

I was just trying to understand the notation being used: whether the Arrow IPC file was a file in the fs with the arrow file format, or whether it was a file in the fs with a arrow stream format. Arrow recently made it official that .arrow stands for the file format and .arrows stands for the stream format.

@HagaiHargil
Copy link
Contributor Author

Oh, I missed the news regarding the suffixes :)

The file is created in the Python process with pyarrow.ipc.new_stream() which uses a pyarrow.NativeFile as its default sink I believe. To write the data to disk I use the stream.write() method.

@HagaiHargil
Copy link
Contributor Author

@jorgecarleitao

I believe that the following two snippets running together should re-create the issue on your machine as well:

def test_ipc_written_in_pyarrow():
    data = [
        pa.array([1, 2, 3, 4]),
        pa.array(['foo', 'bar', 'baz', None]),
        pa.array([True, None, False, True])
    ]

    batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
    writer = pa.ipc.new_stream("tests/data/data.arrow_stream", batch.schema)
    while True:
        for i in range(10):
            writer.write(batch)

        sleep(2)

        for i in range(10):
            writer.write(batch)

        writer.write(batch[:0])

And on the Rust side:

#[test]
fn read_previous_data() {
    let mut reader = File::open("tests/data/data.arrow_stream").unwrap();
    let meta = read_stream_metadata(&mut reader).unwrap();
    let mut stream = StreamReader::new(&mut reader, meta);
    let mut idx = 0;
    loop {
        match stream.next() {
            Some(x) => {
                match x {
                    Ok(_) => idx += 1,
                    Err(l) => println!("{:?} ({})", l, idx),
                }
            },
            None => continue,
        };
    }
    println!("{} Done", idx);
}

It raises a few Io(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }) (908) and should also abort.

I haven't tried duplicating it with a Rust-only approach.

@jorgecarleitao
Copy link
Owner

I ran this, but I am not sure it is a problem: the rust side is just trying to read the stream. When the stream has no more entries, it raises an UnexpectedEof. Ignoring this error just indicates that when there are no more entries we should keep trying.

Admittedly, this is not a nice API: .next should be blocking until a new entry is available, instead of spitting errors, and we should offer an async non-blocking version.

I was unable to reproduce the error on which the metadata is incorrect, all entries just return EoF errors.

...
Io(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }) (98)
Io(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }) (99)
Io(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }) (100)
Io(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }) (101)
Io(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }) (102)
Io(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }) (104)

@jorgecarleitao
Copy link
Owner

nvm, I got the error of the core dump due to lack of memory ^_^

@HagaiHargil
Copy link
Contributor Author

I mean, this might be an Arrow IPC protocol issue rather than an implementation-level bug. Perhaps the format doesn't allow concurrent read-and-write from different processes, exactly due to the fact that you can't ensure a working locking mechanism on the file?

For my real-time streaming use case I do think that a blocking .next() method is the way to go. Do the Arrow specs permit that? Or do they not go into these finer details?

@jorgecarleitao
Copy link
Owner

I think that you are right. There is no guarantees over this. I tried the following:

# produce.py
import os
from time import sleep


if os.environ.get("READ", None) is None:
    i = 0
    with open("a.stream", "wb", 0) as f:
        while True:
            f.write(i.to_bytes(4, byteorder="little"))
            f.write(b"a" * i)
            sleep(1)
            i += 1
else:
    with open("a.stream", "rb") as f:
        while True:
            data = f.read(4)
            if data != b"":
                size = int.from_bytes(data, "little")
                data = f.read(size)
                print(size)
                print(data)
                assert data == b"a" * size
                print("correct")
# run.sh
python produce.py &
PRODUCER_PID=$!

sleep 1 # wait for metadata to be available.

READ=1 python produce.py
CONSUMER_PID=$!

kill $PRODUCER_PID $CONSUMER_PID

and this errors after some time, indicating that we can't expect the arrow stream to work.

@jorgecarleitao
Copy link
Owner

Thanks for raising this in the mailing list.

I think I found a solution for it, #302.

Basically, contrarily to the file reader, on which None implies that there are no more batches (because there is a footer and EOF), in a stream, we can't prove that there are no more batches: it could simply mean that we are "waiting" for them. Thus, the design of the stream and the file must be different to take this different semantics into account.

Could you check if that PR addresses this issue?

@HagaiHargil
Copy link
Contributor Author

@jorgecarleitao Thanks for the quick PR! I'll only be back at my lab and my setup next week, so it will take me a few days to test it out.

@jorgecarleitao
Copy link
Owner

Closed by #302

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants