Skip to content

Commit

Permalink
add ipc scan
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 12, 2021
1 parent ad111d3 commit 391ebd1
Show file tree
Hide file tree
Showing 17 changed files with 549 additions and 62 deletions.
2 changes: 1 addition & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object = ["polars-core/object"]
json = ["polars-io", "polars-io/json"]

# support for arrows ipc file parsing
ipc = ["polars-io", "polars-io/ipc"]
ipc = ["polars-io", "polars-io/ipc", "polars-lazy/ipc"]

# support for arrows csv file parsing
csv-file = ["polars-io", "polars-io/csv-file", "polars-lazy/csv-file"]
Expand Down
45 changes: 44 additions & 1 deletion polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//! ```
use super::{finish_reader, ArrowReader, ArrowResult, RecordBatch};
use crate::prelude::*;
use crate::{PhysicalIoExpr, ScanAggregation};
use arrow::io::ipc::write::WriteOptions;
use arrow::io::ipc::{read, write};
use polars_core::prelude::*;
Expand Down Expand Up @@ -61,6 +62,47 @@ pub struct IpcReader<R> {
reader: R,
/// Aggregates chunks afterwards to a single chunk.
rechunk: bool,
stop_after_n_rows: Option<usize>,
}

impl<R: Read + Seek> IpcReader<R> {
/// Get schema of the Ipc File
pub fn schema(&mut self) -> Result<Schema> {
let metadata = read::read_file_metadata(&mut self.reader)?;
Ok((&**metadata.schema()).into())
}

/// Get arrow schema of the Ipc File, this is faster than a polars schema.
pub fn arrow_schema(&mut self) -> Result<Arc<ArrowSchema>> {
let metadata = read::read_file_metadata(&mut self.reader)?;
Ok(metadata.schema().clone())
}
/// Stop reading when `n` rows are read.
pub fn with_stop_after_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.stop_after_n_rows = num_rows;
self
}
#[cfg(feature = "lazy")]
// todo! hoist to lazy crate
pub fn finish_with_scan_ops(
mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
projection: Option<&[usize]>,
) -> Result<DataFrame> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;
let reader =
read::FileReader::new(&mut self.reader, metadata, projection.map(|x| x.to_vec()));

finish_reader(
reader,
rechunk,
self.stop_after_n_rows,
predicate,
aggregate,
)
}
}

impl<R> ArrowReader for read::FileReader<R>
Expand All @@ -84,6 +126,7 @@ where
IpcReader {
reader,
rechunk: true,
stop_after_n_rows: None,
}
}
fn set_rechunk(mut self, rechunk: bool) -> Self {
Expand All @@ -95,7 +138,7 @@ where
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;
let ipc_reader = read::FileReader::new(&mut self.reader, metadata, None);
finish_reader(ipc_reader, rechunk, None, None, None)
finish_reader(ipc_reader, rechunk, self.stop_after_n_rows, None, None)
}
}

Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ description = "Lazy query engine for the Polars DataFrame library"
compile = []
default = ["compile"]
parquet = ["polars-core/parquet", "polars-io/parquet"]
ipc = ["polars-io/ipc"]
csv-file = ["polars-io/csv-file"]
temporal = ["polars-core/temporal"]
# debugging purposesses
Expand Down
29 changes: 24 additions & 5 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Lazy variant of a [DataFrame](polars_core::frame::DataFrame).
#[cfg(any(feature = "parquet", feature = "csv-file"))]
#[cfg(any(feature = "parquet", feature = "csv-file", feature = "ipc"))]
use polars_core::datatypes::PlHashMap;
use polars_core::frame::hash_join::JoinType;
use polars_core::prelude::*;
Expand All @@ -8,13 +8,15 @@ use polars_core::toggle_string_cache;
use std::sync::Arc;

use crate::logical_plan::optimizer::aggregate_pushdown::AggregatePushdown;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
#[cfg(any(feature = "parquet", feature = "csv-file", feature = "ipc"))]
use crate::logical_plan::optimizer::aggregate_scan_projections::AggScanProjection;
use crate::logical_plan::optimizer::simplify_expr::SimplifyExprRule;
use crate::logical_plan::optimizer::stack_opt::{OptimizationRule, StackOptimizer};
use crate::logical_plan::optimizer::{
predicate_pushdown::PredicatePushDown, projection_pushdown::ProjectionPushDown,
};
#[cfg(feature = "ipc")]
use crate::logical_plan::IpcOptions;
use crate::physical_plan::state::ExecutionState;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
use crate::prelude::aggregate_scan_projections::agg_projection;
Expand Down Expand Up @@ -265,12 +267,29 @@ impl LazyFrame {

/// Create a LazyFrame directly from a parquet scan.
#[cfg(feature = "parquet")]
pub fn new_from_parquet(path: String, stop_after_n_rows: Option<usize>, cache: bool) -> Self {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(path, stop_after_n_rows, cache)
pub fn scan_parquet(
path: String,
stop_after_n_rows: Option<usize>,
cache: bool,
) -> Result<Self> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(path, stop_after_n_rows, cache)?
.build()
.into();
lf.opt_state.agg_scan_projection = true;
lf
Ok(lf)
}

/// Create a LazyFrame directly from a ipc scan.
#[cfg(feature = "ipc")]
pub fn scan_ipc(path: String, stop_after_n_rows: Option<usize>, cache: bool) -> Result<Self> {
let options = IpcOptions {
stop_after_n_rows,
cache,
with_columns: None,
};
let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path, options)?.build().into();
lf.opt_state.agg_scan_projection = true;
Ok(lf)
}

/// Get a dot language representation of the LogicalPlan.
Expand Down
59 changes: 59 additions & 0 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "ipc")]
use crate::logical_plan::IpcOptions;
use crate::logical_plan::{det_melt_schema, Context, CsvParserOptions};
use crate::prelude::*;
use crate::utils::{aexprs_to_schema, PushNode};
Expand Down Expand Up @@ -38,6 +40,16 @@ pub enum ALogicalPlan {
predicate: Option<Node>,
aggregate: Vec<Node>,
},
#[cfg(feature = "ipc")]
IpcScan {
path: PathBuf,
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
options: IpcOptions,
predicate: Option<Node>,
aggregate: Vec<Node>,
},
#[cfg(feature = "parquet")]
ParquetScan {
path: PathBuf,
Expand Down Expand Up @@ -144,6 +156,12 @@ impl ALogicalPlan {
output_schema,
..
} => output_schema.as_ref().unwrap_or(schema),
#[cfg(feature = "ipc")]
IpcScan {
schema,
output_schema,
..
} => output_schema.as_ref().unwrap_or(schema),
DataFrameScan { schema, .. } => schema,
Selection { input, .. } => arena.get(*input).schema(arena),
#[cfg(feature = "csv-file")]
Expand Down Expand Up @@ -192,6 +210,10 @@ impl ALogicalPlan {
(ParquetScan { path: path_a, .. }, ParquetScan { path: path_b, .. }) => {
canonicalize(path_a).unwrap() == canonicalize(path_b).unwrap()
}
#[cfg(feature = "ipc")]
(IpcScan { path: path_a, .. }, IpcScan { path: path_b, .. }) => {
canonicalize(path_a).unwrap() == canonicalize(path_b).unwrap()
}
(DataFrameScan { df: df_a, .. }, DataFrameScan { df: df_b, .. }) => {
df_a.ptr_equal(df_b)
}
Expand Down Expand Up @@ -329,6 +351,30 @@ impl ALogicalPlan {
exprs,
schema: schema.clone(),
},
#[cfg(feature = "ipc")]
IpcScan {
path,
schema,
output_schema,
options,
predicate,
..
} => {
let mut new_predicate = None;
if predicate.is_some() {
new_predicate = exprs.pop()
}

IpcScan {
path: path.clone(),
schema: schema.clone(),
output_schema: output_schema.clone(),
predicate: new_predicate,
aggregate: exprs,
options: options.clone(),
}
}

#[cfg(feature = "parquet")]
ParquetScan {
path,
Expand Down Expand Up @@ -453,6 +499,17 @@ impl ALogicalPlan {
container.push(*node)
}
}
#[cfg(feature = "ipc")]
IpcScan {
predicate,
aggregate,
..
} => {
container.extend_from_slice(aggregate);
if let Some(node) = predicate {
container.push(*node)
}
}
#[cfg(feature = "csv-file")]
CsvScan {
predicate,
Expand Down Expand Up @@ -524,6 +581,8 @@ impl ALogicalPlan {
Udf { input, .. } => *input,
#[cfg(feature = "parquet")]
ParquetScan { .. } => return,
#[cfg(feature = "ipc")]
IpcScan { .. } => return,
#[cfg(feature = "csv-file")]
CsvScan { .. } => return,
DataFrameScan { .. } => return,
Expand Down
33 changes: 33 additions & 0 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,24 @@ pub(crate) fn to_alp(
.map(|expr| to_aexpr(expr, expr_arena))
.collect(),
},
#[cfg(feature = "ipc")]
LogicalPlan::IpcScan {
path,
schema,
predicate,
aggregate,
options,
} => ALogicalPlan::IpcScan {
path,
schema,
output_schema: None,
predicate: predicate.map(|expr| to_aexpr(expr, expr_arena)),
aggregate: aggregate
.into_iter()
.map(|expr| to_aexpr(expr, expr_arena))
.collect(),
options,
},
#[cfg(feature = "parquet")]
LogicalPlan::ParquetScan {
path,
Expand Down Expand Up @@ -651,6 +669,21 @@ pub(crate) fn node_to_lp(
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
},
#[cfg(feature = "ipc")]
ALogicalPlan::IpcScan {
path,
schema,
output_schema: _,
predicate,
aggregate,
options,
} => LogicalPlan::IpcScan {
path,
schema,
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
options,
},
#[cfg(feature = "parquet")]
ALogicalPlan::ParquetScan {
path,
Expand Down

0 comments on commit 391ebd1

Please sign in to comment.