Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

IPC sink types and IPC file stream #878

Merged
merged 36 commits into from Mar 5, 2022

Conversation

dexterduck
Copy link
Contributor

IPC component of this PR: #876

Adds the following new types:

  • io::ipc::read::file_async::FileStream - implements futures::Stream for IPC files.
  • io::ipc::write::file_async::FileSink - implements futures::Sink for IPC files.
  • io::ipc::write::file_async::StreamSink - implements futures::Sink for IPC streams.

dexterduck and others added 29 commits February 17, 2022 16:34
@codecov
Copy link

codecov bot commented Mar 2, 2022

Codecov Report

Merging #878 (3afd996) into main (66616dd) will decrease coverage by 0.03%.
The diff coverage is 66.78%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #878      +/-   ##
==========================================
- Coverage   71.68%   71.64%   -0.04%     
==========================================
  Files         335      337       +2     
  Lines       18205    18452     +247     
==========================================
+ Hits        13050    13220     +170     
- Misses       5155     5232      +77     
Impacted Files Coverage Δ
src/io/ipc/write/mod.rs 92.30% <ø> (ø)
src/io/ipc/write/common.rs 86.40% <58.33%> (-3.71%) ⬇️
src/io/ipc/read/file_async.rs 58.59% <58.59%> (ø)
src/io/ipc/write/stream_async.rs 72.41% <73.21%> (+16.85%) ⬆️
src/io/ipc/write/file_async.rs 75.29% <75.29%> (ø)
src/io/ipc/read/reader.rs 72.85% <100.00%> (+1.42%) ⬆️
src/io/ipc/write/common_async.rs 96.66% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 66616dd...3afd996. Read the comment docs.

@dexterduck
Copy link
Contributor Author

@jorgecarleitao along the same lines as the Parquet integration, the Sink implementations could replace the file/stream writers for IPC as well, although it would require a wrapper type in order to preserve the ability to do per-chunk projections.

Potentially something to the effect of:

// Wrapper type
pub struct Projection {
  pub chunk: Chunk<Arc<dyn Array>>,
  pub fields: Option<Vec<IpcField>>,
}

impl From<Chunk<Arc<dyn Array>>> for Projection {
  fn from(chunk: Chunk<Arc<dyn Array>>) -> Self {
    Self { chunk, fields: None }
  }
}

// Usage
let mut sink = FileSink::new(..);
let chunks: Vec<Chunk<Arc<dyn Array>>> = vec![];
for chunk in chunks {
  sink.feed(chunk.into()).await?;
}

@jorgecarleitao
Copy link
Owner

jorgecarleitao commented Mar 3, 2022

  • The IPCField is required to re-use dictionaries across IPC record batches. So, we do need it in the API.
  • I am not very confortable with code duplication - could you either change the existing code (e.g. implement Sink for the structs we have) or remove them? Having 2 implementations of the same thing is not ideal imo

@dexterduck
Copy link
Contributor Author

dexterduck commented Mar 3, 2022

@jorgecarleitao I've added support for passing &[IpcField] to the stream and file sinks. I've removed StreamWriter and updated the integration tests to use the new implementations.

Per the example above, I've added a compatibility struct which allows calling the sinks as either sink.feed(chunk.into()) or sink.feed((chunk, Some(fields)).into()). The struct is currently io::ipc::write::Record. I presume there's a better name I could be using for it so just let me know if I should change it.

Copy link
Owner

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thank you so much, @dexterduck - learned a lot in this PR!

/// An array [`Chunk`] with optional accompanying IPC fields.
#[derive(Debug, Clone, PartialEq)]
pub struct Record<'a> {
columns: Cow<'a, Chunk<Arc<dyn Array>>>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: what is the purpose of Cow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to allow passing the arguments either by value or by reference. Obviously, better to avoid unnecessary clones if possible, but also wanted to avoid requiring a reference since then if you want to pass owned values you have to use the somewhat non-ergonomic sink.feed((&chunk).into()) or sink.feed((&chunk, &fields[..]).into()). In contrast, with the chosen implementation you can use sink.feed(chunk.into()) or sink.feed((chunk, fields).into()) in all cases.

I originally used a generic implementation that parameterized Record to accept a type implementing Borrow<_>, but it turns out that having a single type that implements sink for multiple other types (e.g. FileSink implementing both Sink<Record<Chunk<_>>> and Sink<Record&<Chunk<_>>>) doesn't work because the compiler can't infer the correct generic type when you call sink.flush() or sink.close().

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation. Cool tip!

@jorgecarleitao jorgecarleitao merged commit dda052f into jorgecarleitao:main Mar 5, 2022
@jorgecarleitao jorgecarleitao added the feature A new feature label Mar 6, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature A new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants