Skip to content

Commit

Permalink
Anonymous scan enhancements & cleanup (#3657)
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 committed Jun 13, 2022
1 parent 4ea5b32 commit 6db9de6
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 39 deletions.
5 changes: 3 additions & 2 deletions polars/polars-core/src/frame/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub fn infer_schema(
infer_schema_length: usize,
) -> Schema {
let mut values: Tracker = Tracker::new();
let len = iter.size_hint().1.unwrap();
let len = iter.size_hint().1.unwrap_or(infer_schema_length);

let max_infer = std::cmp::min(len, infer_schema_length);
for inner in iter.take(max_infer) {
Expand Down Expand Up @@ -223,7 +223,8 @@ fn resolve_fields(spec: Tracker) -> Vec<Field> {
.collect()
}

fn coerce_data_type<A: Borrow<DataType>>(datatypes: &[A]) -> DataType {
/// Coerces a slice of datatypes into a single supertype.
pub fn coerce_data_type<A: Borrow<DataType>>(datatypes: &[A]) -> DataType {
use DataType::*;

let are_all_equal = datatypes.windows(2).all(|w| w[0].borrow() == w[1].borrow());
Expand Down
10 changes: 3 additions & 7 deletions polars/polars-lazy/src/frame/anonymous_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ use polars_io::RowCount;

#[derive(Clone)]
pub struct ScanArgsAnonymous {
pub skip_rows: Option<usize>,
pub n_rows: Option<usize>,
pub infer_schema_length: Option<usize>,
pub schema: Option<Schema>,
pub skip_rows: Option<usize>,
pub n_rows: Option<usize>,
pub row_count: Option<RowCount>,
pub name: &'static str,
}

impl Default for ScanArgsAnonymous {
fn default() -> Self {
Self {
infer_schema_length: None,
skip_rows: None,
n_rows: None,
infer_schema_length: None,
schema: None,
row_count: None,
name: "ANONYMOUS SCAN",
Expand All @@ -40,10 +40,6 @@ impl LazyFrame {
.build()
.into();

if let Some(n_rows) = args.n_rows {
lf = lf.slice(0, n_rows as IdxSize);
};

if let Some(rc) = args.row_count {
lf = lf.with_row_count(&rc.name, Some(rc.offset))
};
Expand Down
12 changes: 10 additions & 2 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ use polars_utils::arena::{Arena, Node};
use std::path::PathBuf;
use std::sync::Arc;

// ALogicalPlan is a representation of LogicalPlan with Nodes which are allocated in an Arena
/// ALogicalPlan is a representation of LogicalPlan with Nodes which are allocated in an Arena
#[derive(Clone, Debug)]
pub enum ALogicalPlan {
AnonymousScan {
function: Arc<dyn AnonymousScan>,
schema: SchemaRef,
output_schema: Option<SchemaRef>,
predicate: Option<Node>,
aggregate: Vec<Node>,
options: AnonymousScanOptions,
},
#[cfg(feature = "python")]
Expand Down Expand Up @@ -173,7 +174,11 @@ impl ALogicalPlan {
..
} => output_schema.as_ref().unwrap_or(schema),
DataFrameScan { schema, .. } => schema,
AnonymousScan { schema, .. } => schema,
AnonymousScan {
schema,
output_schema,
..
} => output_schema.as_ref().unwrap_or(schema),
Selection { input, .. } => arena.get(*input).schema(arena),
#[cfg(feature = "csv-file")]
CsvScan {
Expand Down Expand Up @@ -384,15 +389,18 @@ impl ALogicalPlan {
schema,
output_schema,
predicate,
aggregate: _,
options,
} => {
let mut new_predicate = None;
if predicate.is_some() {
new_predicate = exprs.pop()
}

AnonymousScan {
function: function.clone(),
schema: schema.clone(),
aggregate: exprs,
output_schema: output_schema.clone(),
predicate: new_predicate,
options: options.clone(),
Expand Down
22 changes: 22 additions & 0 deletions polars/polars-lazy/src/logical_plan/anonymous_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,34 @@ use polars_core::prelude::*;
use std::fmt::{Debug, Formatter};

pub trait AnonymousScan: Send + Sync {
/// Creates a dataframe from the supplied function & scan options.
fn scan(&self, scan_opts: AnonymousScanOptions) -> Result<DataFrame>;

/// function to supply the schema.
/// Allows for an optional infer schema argument for data sources with dynamic schemas
fn schema(&self, _infer_schema_length: Option<usize>) -> Result<Schema> {
Err(PolarsError::ComputeError(
"Must supply either a schema or a schema function".into(),
))
}
/// specify if the scan provider should allow predicate pushdowns
///
/// Defaults to `false`
fn allows_predicate_pushdown(&self) -> bool {
false
}
/// specify if the scan provider should allow projection pushdowns
///
/// Defaults to `false`
fn allows_projection_pushdown(&self) -> bool {
false
}
/// specify if the scan provider should allow slice pushdowns
///
/// Defaults to `false`
fn allows_slice_pushdown(&self) -> bool {
false
}
}

impl<F> AnonymousScan for F
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl LogicalPlanBuilder {
function,
schema: schema.clone(),
predicate: None,
aggregate: vec![],
options: AnonymousScanOptions {
fmt_str: name,
schema,
Expand Down
14 changes: 13 additions & 1 deletion polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ fn to_aexprs(input: Vec<Expr>, arena: &mut Arena<AExpr>) -> Vec<Node> {
input.into_iter().map(|e| to_aexpr(e, arena)).collect()
}

// converts expression to AExpr, which uses an arena (Vec) for allocation
/// converts expression to AExpr and adds it to the arena, which uses an arena (Vec) for allocation
pub(crate) fn to_aexpr(expr: Expr, arena: &mut Arena<AExpr>) -> Node {
let v = match expr {
Expr::IsUnique(expr) => AExpr::IsUnique(to_aexpr(*expr, arena)),
Expand Down Expand Up @@ -151,6 +151,9 @@ pub(crate) fn to_aexpr(expr: Expr, arena: &mut Arena<AExpr>) -> Node {
arena.add(v)
}

/// converts LogicalPlan to ALogicalPlan
/// it adds expressions & lps to the respective arenas as it traverses the plan
/// finally it returns the top node of the logical plan
pub(crate) fn to_alp(
lp: LogicalPlan,
expr_arena: &mut Arena<AExpr>,
Expand All @@ -161,12 +164,17 @@ pub(crate) fn to_alp(
function,
schema,
predicate,
aggregate,
options,
} => ALogicalPlan::AnonymousScan {
function,
schema,
output_schema: None,
predicate: predicate.map(|expr| to_aexpr(expr, expr_arena)),
aggregate: aggregate
.into_iter()
.map(|expr| to_aexpr(expr, expr_arena))
.collect(),
options,
},
#[cfg(feature = "python")]
Expand Down Expand Up @@ -422,6 +430,7 @@ pub(crate) fn to_alp(
Ok(lp_arena.add(v))
}

/// converts a node from the AExpr arena to Expr
pub(crate) fn node_to_expr(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
let expr = expr_arena.get(node).clone();

Expand Down Expand Up @@ -649,6 +658,7 @@ fn nodes_to_exprs(nodes: &[Node], expr_arena: &Arena<AExpr>) -> Vec<Expr> {
nodes.iter().map(|n| node_to_expr(*n, expr_arena)).collect()
}

/// converts a node from the ALogicalPlan arena to a LogicalPlan
pub(crate) fn node_to_lp(
node: Node,
expr_arena: &mut Arena<AExpr>,
Expand All @@ -662,12 +672,14 @@ pub(crate) fn node_to_lp(
function,
schema,
output_schema: _,
aggregate,
predicate,
options,
} => LogicalPlan::AnonymousScan {
function,
schema,
predicate: predicate.map(|n| node_to_expr(n, expr_arena)),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
options,
},
#[cfg(feature = "python")]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum LogicalPlan {
function: Arc<dyn AnonymousScan>,
schema: SchemaRef,
predicate: Option<Expr>,
aggregate: Vec<Expr>,
options: AnonymousScanOptions,
},
#[cfg(feature = "python")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,37 @@ impl PredicatePushDown {
};
Ok(lp)
}
AnonymousScan {
function,
schema,
output_schema,
options,
predicate,
aggregate
} => {
if function.allows_predicate_pushdown() {
let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena);
let lp = AnonymousScan {
function,
schema,
output_schema,
options,
predicate,
aggregate
};
Ok(lp)
} else {
let lp = AnonymousScan {
function,
schema,
output_schema,
options,
predicate,
aggregate
};
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
}
}

Explode { input, columns, schema } => {
let condition = |name: Arc<str>| columns.iter().any(|s| s.as_str() == &*name);
Expand Down Expand Up @@ -466,10 +497,7 @@ impl PredicatePushDown {
lp @ PythonScan {..} => {
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
}
lp @ AnonymousScan {..} => {
// TODO: add predicate pushdowns.
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,29 +342,44 @@ impl ProjectionPushDown {
schema,
predicate,
mut options,
..
output_schema,
aggregate,
} => {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena);
let output_schema = if options.with_columns.is_none() {
None
} else {
Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&*schema,
true,
)))
};
options.output_schema = output_schema.clone();
if function.allows_projection_pushdown() {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena);

let lp = AnonymousScan {
function,
schema,
output_schema,
options,
predicate,
};
Ok(lp)
let output_schema = if options.with_columns.is_none() {
None
} else {
Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&*schema,
true,
)))
};
options.output_schema = output_schema.clone();

let lp = AnonymousScan {
function,
schema,
output_schema,
options,
predicate,
aggregate,
};
Ok(lp)
} else {
let lp = AnonymousScan {
function,
schema,
predicate,
options,
output_schema,
aggregate,
};
Ok(lp)
}
}
DataFrameScan {
df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl SlicePushDown {
output_schema,
predicate,
options,

aggregate,
},
// TODO! we currently skip slice pushdown if there is a predicate.
// we can modify the readers to only limit after predicates have been applied
Expand All @@ -112,7 +112,8 @@ impl SlicePushDown {
schema,
output_schema,
predicate,
options
options,
aggregate,
};

Ok(lp)
Expand Down

0 comments on commit 6db9de6

Please sign in to comment.