Skip to content

Commit

Permalink
native IPC memory map! 🎉 (#4250)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 4, 2022
1 parent 2df62d2 commit 912b6d5
Show file tree
Hide file tree
Showing 21 changed files with 279 additions and 115 deletions.
6 changes: 3 additions & 3 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "8604cb760b8ac475d7968b714d47e4ff714c61a1", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "5711cfcbb785450697eb38e221b5dc1825550ee6", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", version = "0.13", default-features = false, features = ["compute_concatenate"] }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "ipc_meta", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
serde = { version = "1", features = ["derive"], optional = true }
Expand Down
9 changes: 4 additions & 5 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,11 @@ thiserror = "^1.0"

[dependencies.arrow]
package = "arrow2"
# git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
# rev = "8604cb760b8ac475d7968b714d47e4ff714c61a1"
git = "https://github.com/ritchie46/arrow2"
# rev = "5711cfcbb785450697eb38e221b5dc1825550ee6"
# path = "../../../arrow2"
# branch = "polars"
version = "0.13"
branch = "ipc_meta"
# version = "0.12"
default-features = false
features = [
"compute_aggregate",
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ impl From<&ArrowDataType> for DataType {
panic!("activate the 'object' feature to be able to load POLARS_EXTENSION_TYPE")
}
}
dt => panic!("Arrow datatype {:?} not supported by Polars", dt),
dt => panic!("Arrow datatype {:?} not supported by Polars. You probably need to activate that data-type feature.", dt),
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions polars/polars-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,13 @@ impl Series {
#[cfg(feature = "dtype-categorical")]
ArrowDataType::Dictionary(key_type, value_type, _) => {
use arrow::datatypes::IntegerType;
let chunks = chunks.iter().map(|arr| &**arr).collect::<Vec<_>>();
let arr = arrow::compute::concatenate::concatenate(&chunks)?;
// don't spuriously call this. This triggers a read on mmaped data
let arr = if chunks.len() > 1 {
let chunks = chunks.iter().map(|arr| &**arr).collect::<Vec<_>>();
arrow::compute::concatenate::concatenate(&chunks)?
} else {
chunks[0].clone()
};

if !matches!(
value_type.as_ref(),
Expand Down Expand Up @@ -301,6 +306,7 @@ impl Series {
#[cfg(feature = "dtype-struct")]
ArrowDataType::Map(_field, _sorted) => {
let arr = if chunks.len() > 1 {
// don't spuriously call this. This triggers a read on mmaped data
concatenate_owned_unchecked(&chunks).unwrap() as ArrayRef
} else {
chunks[0].clone()
Expand Down Expand Up @@ -328,6 +334,7 @@ impl Series {
#[cfg(feature = "dtype-struct")]
ArrowDataType::Struct(_) => {
let arr = if chunks.len() > 1 {
// don't spuriously call this. This triggers a read on mmaped data
concatenate_owned_unchecked(&chunks).unwrap() as ArrayRef
} else {
chunks[0].clone()
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ description = "IO related logic for the Polars DataFrame library"
# support for arrows json parsing
json = ["arrow/io_json", "serde_json"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression", "memmap"]
# support for arrows streaming ipc file parsing
ipc_streaming = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrow avro parsing
Expand All @@ -37,9 +37,9 @@ private = ["polars-time/private"]
[dependencies]
ahash = "0.7"
anyhow = "1.0"
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "8604cb760b8ac475d7968b714d47e4ff714c61a1", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
arrow = { package = "arrow2", version = "0.13", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "5711cfcbb785450697eb38e221b5dc1825550ee6", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "ipc_meta", default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", default-features = false }
csv-core = { version = "0.1.10", optional = true }
dirs = "4.0"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/csv/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn get_reader_bytes<R: Read + MmapBytesReader>(reader: &mut R) -> Result<Rea
// we can get the bytes for free
if reader.to_bytes().is_some() {
// duplicate .to_bytes() is necessary to satisfy the borrow checker
Ok(ReaderBytes::Borrowed(reader.to_bytes().unwrap()))
Ok(ReaderBytes::Borrowed((*reader).to_bytes().unwrap()))
} else {
// we have to read to an owned buffer to get the bytes.
let mut bytes = Vec::with_capacity(1024 * 128);
Expand Down
76 changes: 57 additions & 19 deletions polars/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,19 @@ use std::sync::Arc;
/// }
/// ```
#[must_use]
pub struct IpcReader<R> {
pub struct IpcReader<R: MmapBytesReader> {
/// File or Stream object
reader: R,
pub(super) reader: R,
/// Aggregates chunks afterwards to a single chunk.
rechunk: bool,
n_rows: Option<usize>,
projection: Option<Vec<usize>>,
pub(super) n_rows: Option<usize>,
pub(super) projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
row_count: Option<RowCount>,
pub(super) row_count: Option<RowCount>,
memmap: bool,
}

impl<R: Read + Seek> IpcReader<R> {
impl<R: MmapBytesReader> IpcReader<R> {
/// Get schema of the Ipc File
pub fn schema(&mut self) -> Result<Schema> {
let metadata = read::read_file_metadata(&mut self.reader)?;
Expand Down Expand Up @@ -109,13 +110,40 @@ impl<R: Read + Seek> IpcReader<R> {
self
}

/// Set the reader's column projection. This counts from 0, meaning that
/// `vec![0, 4]` would select the 1st and 5th column.
pub fn memory_mapped(mut self, toggle: bool) -> Self {
self.memmap = toggle;
self
}

// todo! hoist to lazy crate
#[cfg(feature = "lazy")]
pub fn finish_with_scan_ops(
mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
verbose: bool,
) -> Result<DataFrame> {
if self.memmap && self.reader.to_file().is_some() {
if verbose {
eprintln!("memory map ipc file")
}
match self.finish_memmapped(predicate.clone(), aggregate) {
Ok(df) => return Ok(df),
Err(err) => match err {
PolarsError::ArrowError(e) => match e.as_ref() {
arrow::error::Error::NotYetImplemented(s)
if s == "mmap can only be done on uncompressed IPC files" =>
{
eprint!("could not mmap compressed IPC file, defaulting to normal read")
}
_ => return Err(PolarsError::ArrowError(e)),
},
err => return Err(err),
},
}
}
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;

Expand All @@ -125,8 +153,7 @@ impl<R: Read + Seek> IpcReader<R> {
metadata.schema.clone()
};

let reader =
read::FileReader::new(&mut self.reader, metadata, self.projection, self.n_rows);
let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);

finish_reader(
reader,
Expand All @@ -140,7 +167,7 @@ impl<R: Read + Seek> IpcReader<R> {
}
}

impl<R> ArrowReader for read::FileReader<R>
impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
where
R: Read + Seek,
{
Expand All @@ -149,10 +176,7 @@ where
}
}

impl<R> SerReader<R> for IpcReader<R>
where
R: Read + Seek,
{
impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
fn new(reader: R) -> Self {
IpcReader {
reader,
Expand All @@ -161,6 +185,7 @@ where
columns: None,
projection: None,
row_count: None,
memmap: true,
}
}

Expand All @@ -170,6 +195,22 @@ where
}

fn finish(mut self) -> Result<DataFrame> {
if self.memmap && self.reader.to_file().is_some() {
match self.finish_memmapped(None, None) {
Ok(df) => return Ok(df),
Err(err) => match err {
PolarsError::ArrowError(e) => match e.as_ref() {
arrow::error::Error::NotYetImplemented(s)
if s == "mmap can only be done on uncompressed IPC files" =>
{
eprint!("could not mmap compressed IPC file, defaulting to normal read")
}
_ => return Err(PolarsError::ArrowError(e)),
},
err => return Err(err),
},
}
}
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;
let schema = &metadata.schema;
Expand All @@ -185,12 +226,8 @@ where
metadata.schema.clone()
};

let ipc_reader = read::FileReader::new(
&mut self.reader,
metadata.clone(),
self.projection,
self.n_rows,
);
let ipc_reader =
read::FileReader::new(self.reader, metadata.clone(), self.projection, self.n_rows);
finish_reader(
ipc_reader,
rechunk,
Expand Down Expand Up @@ -228,6 +265,7 @@ pub struct IpcWriter<W> {
}

use crate::aggregations::ScanAggregation;
use crate::mmap::MmapBytesReader;
use crate::RowCount;
use polars_core::frame::ArrowChunk;
pub use write::Compression as IpcCompression;
Expand Down
100 changes: 100 additions & 0 deletions polars/polars-io/src/ipc/mmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use super::*;
use crate::mmap::MmapBytesReader;
use crate::utils::apply_projection;
use arrow::chunk::Chunk;
use arrow::io::ipc::read;
use arrow::io::ipc::read::{Dictionaries, FileMetadata};
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use memmap::Mmap;

struct MMapChunkIter<'a> {
dictionaries: Dictionaries,
metadata: FileMetadata,
mmap: Arc<Mmap>,
idx: usize,
end: usize,
projection: &'a Option<Vec<usize>>,
}

impl<'a> MMapChunkIter<'a> {
fn new(mmap: Mmap, metadata: FileMetadata, projection: &'a Option<Vec<usize>>) -> Result<Self> {
let mmap = Arc::new(mmap);

let end = metadata.blocks.len();
// mmap the dictionaries
let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };

Ok(Self {
dictionaries,
metadata,
mmap,
idx: 0,
end,
projection,
})
}
}

impl ArrowReader for MMapChunkIter<'_> {
fn next_record_batch(&mut self) -> ArrowResult<Option<ArrowChunk>> {
if self.idx < self.end {
let chunk = unsafe {
mmap_unchecked(
&self.metadata,
&self.dictionaries,
self.mmap.clone(),
self.idx,
)
}?;
self.idx += 1;
let chunk = match &self.projection {
None => chunk,
Some(proj) => {
let cols = chunk.into_arrays();
let arrays = proj.iter().map(|i| cols[*i].clone()).collect();
Chunk::new(arrays)
}
};
Ok(Some(chunk))
} else {
Ok(None)
}
}
}

impl<R: MmapBytesReader> IpcReader<R> {
pub(super) fn finish_memmapped(
&self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
) -> Result<DataFrame> {
match self.reader.to_file() {
Some(file) => {
let mmap = unsafe { memmap::Mmap::map(file).unwrap() };
let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;

let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let reader = MMapChunkIter::new(mmap, metadata, &self.projection)?;

finish_reader(
reader,
// don't rechunk, that would trigger a read.
false,
self.n_rows,
predicate,
aggregate,
&schema,
self.row_count.clone(),
)
}
None => Err(PolarsError::ComputeError(
"Cannot memory map, you must provide a file".into(),
)),
}
}
}
1 change: 1 addition & 0 deletions polars/polars-io/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod ipc_file;

#[cfg(feature = "ipc_streaming")]
mod ipc_stream;
mod mmap;

#[cfg(feature = "ipc")]
pub use ipc_file::{IpcCompression, IpcReader, IpcWriter, IpcWriterOption};
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod json;
#[cfg_attr(docsrs, doc(cfg(feature = "json")))]
pub mod ndjson_core;

#[cfg(any(feature = "csv-file", feature = "parquet"))]
#[cfg(any(feature = "csv-file", feature = "parquet", feature = "ipc"))]
pub mod mmap;
mod options;
#[cfg(feature = "parquet")]
Expand Down
10 changes: 10 additions & 0 deletions polars/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ impl<T: MmapBytesReader + ?Sized> MmapBytesReader for Box<T> {
}
}

impl<T: MmapBytesReader> MmapBytesReader for &mut T {
fn to_file(&self) -> Option<&File> {
T::to_file(self)
}

fn to_bytes(&self) -> Option<&[u8]> {
T::to_bytes(self)
}
}

// Handle various forms of input bytes
pub enum ReaderBytes<'a> {
Borrowed(&'a [u8]),
Expand Down

0 comments on commit 912b6d5

Please sign in to comment.