Skip to content

Commit

Permalink
external context (#4276)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 5, 2022
1 parent 5ef37dd commit 1e3f703
Show file tree
Hide file tree
Showing 38 changed files with 407 additions and 33 deletions.
4 changes: 2 additions & 2 deletions polars/polars-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,8 @@ impl PartialEq<ArrowDataType> for DataType {
derive(Serialize, Deserialize)
)]
pub struct Field {
name: String,
dtype: DataType,
pub name: String,
pub dtype: DataType,
}

impl Field {
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-lazy/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ impl LogicalPlan {
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
}
ExtContext { input, .. } => {
let current_node = format!("EXTERNAL CONTEXT [{:?}]", (branch, id));
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
}
Error { err, .. } => {
let current_node = format!("{:?}", &**err);
self.write_dot(acc_str, prev_node, &current_node, id)
Expand Down
16 changes: 11 additions & 5 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,6 @@ impl LazyFrame {

let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena)?;

// simplify expression is valuable for projection and predicate pushdown optimizers, so we
// run that first
// this optimization will run twice because optimizer may create dumb expressions
lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top);

// we do simplification
if simplify_expr {
rules.push(Box::new(SimplifyExprRule {}));
Expand Down Expand Up @@ -1020,6 +1015,17 @@ impl LazyFrame {
Self::from_logical_plan(lp, opt_state)
}

pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
let contexts = contexts
.as_ref()
.iter()
.map(|lf| lf.logical_plan.clone())
.collect();
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().with_context(contexts).build();
Self::from_logical_plan(lp, opt_state)
}

/// Aggregate all the columns as their maximum values.
pub fn max(self) -> LazyFrame {
self.select_local(vec![col("*").max()])
Expand Down
26 changes: 25 additions & 1 deletion polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ pub enum ALogicalPlan {
inputs: Vec<Node>,
options: UnionOptions,
},
ExtContext {
input: Node,
contexts: Vec<Node>,
schema: SchemaRef,
},
}

impl Default for ALogicalPlan {
Expand Down Expand Up @@ -221,14 +226,19 @@ impl ALogicalPlan {
None => input_schema,
};
}
ExtContext { schema, .. } => schema,
};
Cow::Borrowed(schema)
}
}

impl ALogicalPlan {
/// Takes the expressions of an LP node and the inputs of that node and reconstruct
pub fn with_exprs_and_input(&self, mut exprs: Vec<Node>, inputs: Vec<Node>) -> ALogicalPlan {
pub fn with_exprs_and_input(
&self,
mut exprs: Vec<Node>,
mut inputs: Vec<Node>,
) -> ALogicalPlan {
use ALogicalPlan::*;

match self {
Expand Down Expand Up @@ -441,6 +451,11 @@ impl ALogicalPlan {
options: *options,
schema: schema.clone(),
},
ExtContext { schema, .. } => ExtContext {
input: inputs.pop().unwrap(),
contexts: inputs,
schema: schema.clone(),
},
}
}

Expand Down Expand Up @@ -527,6 +542,7 @@ impl ALogicalPlan {
container.push(*node)
}
}
ExtContext { .. } => {}
}
}

Expand Down Expand Up @@ -573,6 +589,14 @@ impl ALogicalPlan {
HStack { input, .. } => *input,
Distinct { input, .. } => *input,
Udf { input, .. } => *input,
ExtContext {
input, contexts, ..
} => {
for n in contexts {
container.push_node(*n)
}
*input
}
#[cfg(feature = "parquet")]
ParquetScan { .. } => return,
#[cfg(feature = "ipc")]
Expand Down
23 changes: 23 additions & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,29 @@ impl LogicalPlanBuilder {
.into()
}

pub fn with_context(self, contexts: Vec<LogicalPlan>) -> Self {
let mut schema = try_delayed!(self.0.schema(), &self.0, into)
.as_ref()
.as_ref()
.clone();

for lp in &contexts {
let other_schema = try_delayed!(lp.schema(), lp, into);

for fld in other_schema.iter_fields() {
if schema.get(fld.name()).is_none() {
schema.with_column(fld.name, fld.dtype)
}
}
}
LogicalPlan::ExtContext {
input: Box::new(self.0),
contexts,
schema: Arc::new(schema),
}
.into()
}

/// Apply a filter
pub fn filter(self, predicate: Expr) -> Self {
let predicate = if has_expr(&predicate, |e| match e {
Expand Down
32 changes: 32 additions & 0 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,22 @@ pub(crate) fn to_alp(
let mut err = err.lock();
return Err(err.take().unwrap());
}
LogicalPlan::ExtContext {
input,
contexts,
schema,
} => {
let input = to_alp(*input, expr_arena, lp_arena)?;
let contexts = contexts
.into_iter()
.map(|lp| to_alp(lp, expr_arena, lp_arena))
.collect::<Result<_>>()?;
ALogicalPlan::ExtContext {
input,
contexts,
schema,
}
}
};
Ok(lp_arena.add(v))
}
Expand Down Expand Up @@ -906,5 +922,21 @@ pub(crate) fn node_to_lp(
schema,
}
}
ALogicalPlan::ExtContext {
input,
contexts,
schema,
} => {
let input = Box::new(node_to_lp(input, expr_arena, lp_arena));
let contexts = contexts
.into_iter()
.map(|node| node_to_lp(node, expr_arena, lp_arena))
.collect();
LogicalPlan::ExtContext {
input,
contexts,
schema,
}
}
}
}
3 changes: 3 additions & 0 deletions polars/polars-lazy/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ FROM
}
Udf { input, options, .. } => write!(f, "{} \n{:?}", options.fmt_str, input),
Error { input, err } => write!(f, "{:?}\n{:?}", err, input),
ExtContext { input, .. } => {
write!(f, "{:?}\nExtContext", input)
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ pub enum LogicalPlan {
input: Box<LogicalPlan>,
err: Arc<Mutex<Option<PolarsError>>>,
},
/// This allows expressions to access other tables
ExtContext {
input: Box<LogicalPlan>,
contexts: Vec<LogicalPlan>,
schema: SchemaRef,
},
}

impl Default for LogicalPlan {
Expand Down Expand Up @@ -249,6 +255,7 @@ impl LogicalPlan {
}
}
Error { input, .. } => input.schema(),
ExtContext { schema, .. } => Ok(Cow::Borrowed(schema)),
}
}
pub fn describe(&self) -> String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ impl OptimizationRule for FastProjection {
schema,
..
} => {
let schema = Some(schema.clone());
impl_fast_projection(*input, expr, schema, expr_arena)
if !matches!(lp_arena.get(*input), ALogicalPlan::ExtContext { .. }) {
let schema = Some(schema.clone());
impl_fast_projection(*input, expr, schema, expr_arena)
} else {
None
}
}
ALogicalPlan::LocalProjection {
input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ impl PredicatePushDown {
}

// projections should only have a single input.
assert_eq!(inputs.len(), 1);
let input = inputs[0];
if inputs.len() > 1 {
// except for ExtContext
assert!(matches!(lp, ALogicalPlan::ExtContext { .. }));
}
let input = inputs[inputs.len() - 1];
let (local_predicates, projections) =
rewrite_projection_node(expr_arena, lp_arena, &mut acc_predicates, exprs, input);

Expand Down Expand Up @@ -506,7 +509,7 @@ impl PredicatePushDown {
lp @ Cache { .. } | lp @ Sort { .. } => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
}
lp @ HStack {..} | lp @ Projection {..} => {
lp @ HStack {..} | lp @ Projection {..} | lp @ ExtContext {..} => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
}
// NOT Pushed down passed these nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn update_scan_schema(
for name in aexpr_to_root_names(*node, expr_arena) {
let item = schema.get_full(&name).ok_or_else(|| {
PolarsError::ComputeError(
format!("column {} not available in schema {:?}", name, schema).into(),
format!("column '{}' not available in schema {:?}", name, schema).into(),
)
})?;
new_cols.push(item);
Expand Down Expand Up @@ -196,6 +196,8 @@ impl ProjectionPushDown {
/// This pushes down the projection that are validated
/// that they can be done successful at the schema above
/// The result is assigned to this node.
///
/// The local projections are return and still have to be applied
fn pushdown_and_assign_check_schema(
&self,
input: Node,
Expand Down Expand Up @@ -940,6 +942,41 @@ impl ProjectionPushDown {
.build();
Ok(lp)
}
ExtContext {
input, contexts, ..
} => {
// local projections are ignored. These are just root nodes
// complex expression will still be done later
let _local_projections = self.pushdown_and_assign_check_schema(
input,
acc_projections,
projections_seen,
lp_arena,
expr_arena,
)?;

let mut new_schema = lp_arena
.get(input)
.schema(lp_arena)
.as_ref()
.as_ref()
.clone();

for node in &contexts {
let other_schema = lp_arena.get(*node).schema(lp_arena);
for fld in other_schema.iter_fields() {
if new_schema.get(fld.name()).is_none() {
new_schema.with_column(fld.name, fld.dtype)
}
}
}

Ok(ExtContext {
input,
contexts,
schema: Arc::new(new_schema),
})
}
Udf {
input,
function,
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ pub struct CacheExec {

impl Executor for CacheExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run Cache")
}
}
if let Some(df) = state.cache_hit(&self.key) {
return Ok(df);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ pub(crate) struct DropDuplicatesExec {

impl Executor for DropDuplicatesExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run DropDuplicatesExec")
}
}
let df = self.input.execute(state)?;
let subset = self.options.subset.as_ref().map(|v| &***v);
let keep = self.options.keep_strategy;
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ pub(crate) struct ExplodeExec {

impl Executor for ExplodeExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run ExplodeExec")
}
}
let df = self.input.execute(state)?;
df.explode(&self.columns)
}
Expand Down
28 changes: 28 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/ext_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::prelude::*;

pub struct ExternalContext {
pub input: Box<dyn Executor>,
pub contexts: Vec<Box<dyn Executor>>,
}

impl Executor for ExternalContext {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run ExternalContext")
}
}
let df = self.input.execute(state)?;
let contexts = self
.contexts
.iter_mut()
.map(|e| e.execute(state))
.collect::<Result<Vec<_>>>()?;

state.ext_contexts = Arc::new(contexts);
Ok(df)
}
}

0 comments on commit 1e3f703

Please sign in to comment.