Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for IPC Streaming Read/Write #3783

Merged
merged 24 commits into from
Jun 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
695f34e
Add support for IPC Streaming Read/Write
joshuataylor Jun 23, 2022
668dd69
python fix arr.contains type (#3782)
ritchie46 Jun 23, 2022
50ab569
improve predicate combination and schema state (#3788)
ritchie46 Jun 23, 2022
6f2b006
Move ipc to its own folder, add ipc_streaming
joshuataylor Jun 24, 2022
c9084a9
Merge remote-tracking branch 'upstream/master' into feature/ipc-strea…
joshuataylor Jun 24, 2022
485e970
remove ipc changes
joshuataylor Jun 24, 2022
6d80b9b
ipc rs
joshuataylor Jun 24, 2022
61814dd
change comment
joshuataylor Jun 24, 2022
76bcc09
fix feature
joshuataylor Jun 24, 2022
fa560f3
Use proper ipc_streaming in cargo
joshuataylor Jun 24, 2022
3f6daf8
fix duration computation (#3790)
ritchie46 Jun 24, 2022
5ef796a
Update arrow2 to support IPC Stream Reading with projections (#3793)
joshuataylor Jun 24, 2022
6e8ea33
Some API alignment (missing funcs) between `DataFrame`, `LazyFrame`, …
alexander-beedie Jun 24, 2022
a75733c
Merge branch 'master' into feature/ipc-streaming
joshuataylor Jun 24, 2022
e80d5c4
formatting fixes
joshuataylor Jun 24, 2022
7e28262
Merge branch 'master' into feature/ipc-streaming
joshuataylor Jun 24, 2022
69fe351
change file names
joshuataylor Jun 24, 2022
bf2529f
remove crate word
joshuataylor Jun 24, 2022
d579874
code review comments
joshuataylor Jun 24, 2022
186f0dc
fix issue with reading batch files, return Ok for waiting
joshuataylor Jun 24, 2022
e3b7fdc
Merge remote-tracking branch 'upstream/master' into feature/ipc-strea…
joshuataylor Jun 24, 2022
d206421
Store metadata so it can be fetched, as otherwise if read IPC will error
joshuataylor Jun 25, 2022
967b4b5
ci fix
joshuataylor Jun 25, 2022
f9666d1
Merge branch 'master' into feature/ipc-streaming
joshuataylor Jun 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ json = ["polars-io", "polars-io/json"]
# support for arrows ipc file parsing
ipc = ["polars-io", "polars-io/ipc", "polars-lazy/ipc"]

# support for arrows streaming ipc file parsing
ipc_streaming = ["polars-io", "polars-io/ipc_streaming", "polars-lazy/ipc"]

# support for apache avro file parsing
avro = ["polars-io", "polars-io/avro"]

Expand Down Expand Up @@ -130,6 +133,7 @@ test = [
"abs",
"parquet",
"ipc",
"ipc_streaming",
]

# don't use this
Expand Down Expand Up @@ -195,6 +199,7 @@ docs-selection = [
"json",
"parquet",
"ipc",
"ipc_streaming",
"dtype-full",
"is_in",
"sort_multiple",
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ description = "IO related logic for the Polars DataFrame library"
json = ["arrow/io_json", "serde_json"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrows streaming ipc file parsing
ipc_streaming = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrow avro parsing
avro = ["arrow/io_avro", "arrow/io_avro_compression"]
# ipc = []
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-io/src/aggregations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl ScanAggregation {
/// Evaluate the aggregations per batch.
#[cfg(any(
feature = "ipc",
feature = "ipc_streaming",
feature = "parquet",
feature = "json",
feature = "avro"
Expand Down Expand Up @@ -88,6 +89,7 @@ impl ScanAggregation {

#[cfg(any(
feature = "ipc",
feature = "ipc_streaming",
feature = "parquet",
feature = "json",
feature = "avro"
Expand Down
File renamed without changes.
326 changes: 326 additions & 0 deletions polars/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,326 @@
//! # (De)serializing Arrows Streaming IPC format.
//!
//! Arrow Streaming IPC is a [binary format format](https://arrow.apache.org/docs/python/ipc.html).
//! It used for sending an arbitrary length sequence of record batches.
//! The format must be processed from start to end, and does not support random access.
//! It is different than IPC, if you can't deserialize a file with `IpcReader::new`, it's probably an IPC Stream File.
//!
//! ## Example
//!
//! ```rust
//! use polars_core::prelude::*;
//! use polars_io::prelude::*;
//! use std::io::Cursor;
//!
//!
//! let s0 = Series::new("days", &[0, 1, 2, 3, 4]);
//! let s1 = Series::new("temp", &[22.1, 19.9, 7., 2., 3.]);
//! let mut df = DataFrame::new(vec![s0, s1]).unwrap();
//!
//! // Create an in memory file handler.
//! // Vec<u8>: Read + Write
//! // Cursor<T>: Seek
//!
//! let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
//!
//! // write to the in memory buffer
//! IpcStreamWriter::new(&mut buf).finish(&mut df).expect("ipc writer");
//!
//! // reset the buffers index after writing to the beginning of the buffer
//! buf.set_position(0);
//!
//! // read the buffer into a DataFrame
//! let df_read = IpcStreamReader::new(buf).finish().unwrap();
//! assert!(df.frame_equal(&df_read));
//! ```
use crate::{finish_reader, ArrowReader, ArrowResult};
use crate::{prelude::*, WriterFactory};
use arrow::io::ipc::read::{StreamMetadata, StreamState};
use arrow::io::ipc::write::WriteOptions;
use arrow::io::ipc::{read, write};
use polars_core::prelude::*;
use std::io::{Read, Seek, Write};
use std::path::PathBuf;

/// Read Arrows Stream IPC format into a DataFrame
///
/// # Example
/// ```
/// use polars_core::prelude::*;
/// use std::fs::File;
/// use polars_io::ipc::IpcStreamReader;
/// use polars_io::SerReader;
///
/// fn example() -> Result<DataFrame> {
/// let file = File::open("file.ipc").expect("file not found");
///
/// IpcStreamReader::new(file)
/// .finish()
/// }
/// ```
#[must_use]
pub struct IpcStreamReader<R> {
/// File or Stream object
reader: R,
/// Aggregates chunks afterwards to a single chunk.
rechunk: bool,
n_rows: Option<usize>,
projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
row_count: Option<RowCount>,
metadata: Option<StreamMetadata>,
}

impl<R: Read + Seek> IpcStreamReader<R> {
/// Get schema of the Ipc Stream File
pub fn schema(&mut self) -> Result<Schema> {
Ok((&self.metadata()?.schema.fields).into())
}

/// Get arrow schema of the Ipc Stream File, this is faster than creating a polars schema.
pub fn arrow_schema(&mut self) -> Result<ArrowSchema> {
Ok(self.metadata()?.schema)
}
/// Stop reading when `n` rows are read.
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}

/// Columns to select/ project
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
self.columns = columns;
self
}

/// Add a `row_count` column.
pub fn with_row_count(mut self, row_count: Option<RowCount>) -> Self {
self.row_count = row_count;
self
}

/// Set the reader's column projection. This counts from 0, meaning that
/// `vec![0, 4]` would select the 1st and 5th column.
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}

fn metadata(&mut self) -> Result<StreamMetadata> {
match &self.metadata {
None => {
let metadata = read::read_stream_metadata(&mut self.reader).unwrap();
self.metadata = Option::from(metadata.clone());
Ok(metadata)
}
Some(md) => Ok(md.clone()),
}
}
}

impl<R> ArrowReader for read::StreamReader<R>
where
R: Read + Seek,
{
fn next_record_batch(&mut self) -> ArrowResult<Option<ArrowChunk>> {
self.next().map_or(Ok(None), |v| match v {
Ok(stream_state) => match stream_state {
StreamState::Waiting => Ok(None),
StreamState::Some(chunk) => Ok(Some(chunk)),
},
Err(err) => Err(err),
})
}
}

impl<R> SerReader<R> for IpcStreamReader<R>
where
R: Read + Seek,
{
fn new(reader: R) -> Self {
IpcStreamReader {
reader,
rechunk: true,
n_rows: None,
columns: None,
projection: None,
row_count: None,
metadata: None,
}
}

fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}

fn finish(mut self) -> Result<DataFrame> {
let rechunk = self.rechunk;
let metadata = self.metadata()?;
let schema = &metadata.schema;

if let Some(columns) = self.columns {
let prj = columns_to_projection(columns, schema)?;
self.projection = Some(prj);
}

let sorted_projection = self.projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let include_row_count = self.row_count.is_some();
let ipc_reader =
read::StreamReader::new(&mut self.reader, metadata.clone(), sorted_projection);
finish_reader(
ipc_reader,
rechunk,
self.n_rows,
None,
None,
&schema,
self.row_count,
)
.map(|df| fix_column_order(df, self.projection, include_row_count))
}
}

fn fix_column_order(df: DataFrame, projection: Option<Vec<usize>>, row_count: bool) -> DataFrame {
if let Some(proj) = projection {
let offset = if row_count { 1 } else { 0 };
let mut args = (0..proj.len()).zip(proj).collect::<Vec<_>>();
// first el of tuple is argument index
// second el is the projection index
args.sort_unstable_by_key(|tpl| tpl.1);
let cols = df.get_columns();

let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone());
let cols = if row_count {
let mut new_cols = vec![df.get_columns()[0].clone()];
new_cols.extend(iter);
new_cols
} else {
iter.collect()
};

DataFrame::new_no_checks(cols)
} else {
df
}
}

/// Write a DataFrame to Arrow's Streaming IPC format
///
/// # Example
///
/// ```
/// use polars_core::prelude::*;
/// use polars_io::ipc::IpcStreamWriter;
/// use std::fs::File;
/// use polars_io::SerWriter;
///
/// fn example(df: &mut DataFrame) -> Result<()> {
/// let mut file = File::create("file.ipc").expect("could not create file");
///
/// IpcStreamWriter::new(&mut file)
/// .finish(df)
/// }
///
/// ```
#[must_use]
pub struct IpcStreamWriter<W> {
writer: W,
compression: Option<write::Compression>,
}

use crate::RowCount;
use polars_core::frame::ArrowChunk;
pub use write::Compression as IpcCompression;

impl<W> IpcStreamWriter<W> {
/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<write::Compression>) -> Self {
self.compression = compression;
self
}
}

impl<W> SerWriter<W> for IpcStreamWriter<W>
where
W: Write,
{
fn new(writer: W) -> Self {
IpcStreamWriter {
writer,
compression: None,
}
}

fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
let mut ipc_stream_writer = write::StreamWriter::new(
&mut self.writer,
WriteOptions {
compression: self.compression,
},
);

ipc_stream_writer.start(&df.schema().to_arrow(), None)?;

df.rechunk();
let iter = df.iter_chunks();

for batch in iter {
ipc_stream_writer.write(&batch, None)?
}
ipc_stream_writer.finish()?;
Ok(())
}
}

pub struct IpcStreamWriterOption {
compression: Option<write::Compression>,
extension: PathBuf,
}

impl IpcStreamWriterOption {
pub fn new() -> Self {
Self {
compression: None,
extension: PathBuf::from(".ipc"),
}
}

/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<write::Compression>) -> Self {
self.compression = compression;
self
}

/// Set the extension. Defaults to ".ipc".
pub fn with_extension(mut self, extension: PathBuf) -> Self {
self.extension = extension;
self
}
}

impl Default for IpcStreamWriterOption {
fn default() -> Self {
Self::new()
}
}

impl WriterFactory for IpcStreamWriterOption {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>> {
Box::new(IpcStreamWriter::new(writer).with_compression(self.compression))
}

fn extension(&self) -> PathBuf {
self.extension.to_owned()
}
}
13 changes: 13 additions & 0 deletions polars/polars-io/src/ipc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use super::*;

#[cfg(feature = "ipc")]
mod ipc_file;

#[cfg(feature = "ipc_streaming")]
mod ipc_stream;

#[cfg(feature = "ipc")]
pub use ipc_file::{IpcCompression, IpcReader, IpcWriter, IpcWriterOption};

#[cfg(feature = "ipc_streaming")]
pub use ipc_stream::*;
Loading