Skip to content

Commit

Permalink
add sort by expressions to lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 6, 2021
1 parent 021544f commit b6b6d90
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 50 deletions.
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ docs = []
temporal = ["chrono", "regex"]
random = ["rand", "rand_distr"]
default = ["docs", "temporal", "performant"]
lazy = []
lazy = ["sort_multiple"]

# commented out until UB is fixed
#parallel = []
Expand Down
10 changes: 9 additions & 1 deletion polars/polars-core/src/frame/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub trait Selection<'a, S> {
fn to_selection_vec(self) -> Vec<&'a str>;

// a single column is selected
fn single(&self) -> Option<&'a str> {
fn single(&self) -> Option<&str> {
None
}
}
Expand Down Expand Up @@ -36,6 +36,14 @@ where
fn to_selection_vec(self) -> Vec<&'a str> {
self.as_ref().iter().map(|s| s.as_ref()).collect()
}

fn single(&self) -> Option<&str> {
let a = self.as_ref();
match a.len() {
1 => Some(a[0].as_ref()),
_ => None,
}
}
}

impl<'a> Selection<'a, &str> for (&'a str, &'a str) {
Expand Down
11 changes: 10 additions & 1 deletion polars/polars-lazy/src/datafusion/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,16 @@ pub fn to_datafusion_lp(lp: LogicalPlan) -> Result<DLogicalPlan> {
reverse,
} => DLogicalPlan::Sort {
input: Arc::new(to_datafusion_lp(*input)?),
expr: vec![col(&by_column).sort(!reverse, true)],
expr: by_column
.into_iter()
.map(|e| {
if reverse {
to_datafusion_expr(e.reverse())
} else {
to_datafusion_expr(e)
}
})
.collect::<Result<Vec<_>>>()?,
},
Join {
input_left,
Expand Down
22 changes: 21 additions & 1 deletion polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,31 @@ impl LazyFrame {
let opt_state = self.get_opt_state();
let lp = self
.get_plan_builder()
.sort(by_column.into(), reverse)
.sort(vec![col(by_column)], reverse)
.build();
Self::from_logical_plan(lp, opt_state)
}

/// Add a sort operation to the logical plan.
///
/// # Example
///
/// ```rust
/// use polars_core::prelude::*;
/// use polars_lazy::prelude::*;
///
/// /// Sort DataFrame by 'sepal.width' column
/// fn example(df: DataFrame) -> LazyFrame {
/// df.lazy()
/// .sort_by_exprs(vec![col("sepal.width")], false)
/// }
/// ```
pub fn sort_by_exprs(self, by_exprs: Vec<Expr>, reverse: bool) -> Self {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().sort(by_exprs, reverse).build();
Self::from_logical_plan(lp, opt_state)
}

/// Reverse the DataFrame
///
/// # Example
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub enum ALogicalPlan {
},
Sort {
input: Node,
by_column: String,
by_column: Vec<Node>,
reverse: bool,
},
Explode {
Expand Down
51 changes: 21 additions & 30 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ pub(crate) fn to_alp(
reverse,
} => {
let input = to_alp(*input, expr_arena, lp_arena);
let by_column = by_column
.into_iter()
.map(|x| to_aexpr(x, expr_arena))
.collect();
ALogicalPlan::Sort {
input,
by_column,
Expand Down Expand Up @@ -578,6 +582,13 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
}
}

fn nodes_to_exprs(nodes: &[Node], expr_arena: &Arena<AExpr>) -> Vec<Expr> {
nodes
.into_iter()
.map(|n| node_to_exp(*n, expr_arena))
.collect()
}

pub(crate) fn node_to_lp(
node: Node,
expr_arena: &mut Arena<AExpr>,
Expand Down Expand Up @@ -625,10 +636,7 @@ pub(crate) fn node_to_lp(
stop_after_n_rows,
with_columns,
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
aggregate: aggregate
.into_iter()
.map(|n| node_to_exp(n, expr_arena))
.collect(),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
cache,
},
#[cfg(feature = "parquet")]
Expand All @@ -645,10 +653,7 @@ pub(crate) fn node_to_lp(
schema,
with_columns,
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
aggregate: aggregate
.into_iter()
.map(|n| node_to_exp(n, expr_arena))
.collect(),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
stop_after_n_rows,
cache,
},
Expand All @@ -670,11 +675,10 @@ pub(crate) fn node_to_lp(
input,
schema,
} => {
let exprs = expr.iter().map(|x| node_to_exp(*x, expr_arena)).collect();
let i = node_to_lp(input, expr_arena, lp_arena);

LogicalPlan::Projection {
expr: exprs,
expr: nodes_to_exprs(&expr, expr_arena),
input: Box::new(i),
schema,
}
Expand All @@ -684,11 +688,10 @@ pub(crate) fn node_to_lp(
input,
schema,
} => {
let exprs = expr.iter().map(|x| node_to_exp(*x, expr_arena)).collect();
let i = node_to_lp(input, expr_arena, lp_arena);

LogicalPlan::LocalProjection {
expr: exprs,
expr: nodes_to_exprs(&expr, expr_arena),
input: Box::new(i),
schema,
}
Expand All @@ -701,7 +704,7 @@ pub(crate) fn node_to_lp(
let input = Box::new(node_to_lp(input, expr_arena, lp_arena));
LogicalPlan::Sort {
input,
by_column,
by_column: nodes_to_exprs(&by_column, expr_arena),
reverse,
}
}
Expand All @@ -721,13 +724,11 @@ pub(crate) fn node_to_lp(
apply,
} => {
let i = node_to_lp(input, expr_arena, lp_arena);
let a = aggs.iter().map(|x| node_to_exp(*x, expr_arena)).collect();
let keys = Arc::new(keys.iter().map(|x| node_to_exp(*x, expr_arena)).collect());

LogicalPlan::Aggregate {
input: Box::new(i),
keys,
aggs: a,
keys: Arc::new(nodes_to_exprs(&keys, expr_arena)),
aggs: nodes_to_exprs(&aggs, expr_arena),
schema,
apply,
}
Expand All @@ -745,22 +746,13 @@ pub(crate) fn node_to_lp(
let i_l = node_to_lp(input_left, expr_arena, lp_arena);
let i_r = node_to_lp(input_right, expr_arena, lp_arena);

let l_on = left_on
.into_iter()
.map(|n| node_to_exp(n, expr_arena))
.collect();
let r_on = right_on
.into_iter()
.map(|n| node_to_exp(n, expr_arena))
.collect();

LogicalPlan::Join {
input_left: Box::new(i_l),
input_right: Box::new(i_r),
schema,
how,
left_on: l_on,
right_on: r_on,
left_on: nodes_to_exprs(&left_on, expr_arena),
right_on: nodes_to_exprs(&right_on, expr_arena),
allow_par,
force_par,
}
Expand All @@ -771,11 +763,10 @@ pub(crate) fn node_to_lp(
schema,
} => {
let i = node_to_lp(input, expr_arena, lp_arena);
let e = exprs.iter().map(|x| node_to_exp(*x, expr_arena)).collect();

LogicalPlan::HStack {
input: Box::new(i),
exprs: e,
exprs: nodes_to_exprs(&exprs, expr_arena),
schema,
}
}
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ pub enum LogicalPlan {
/// Sort the table
Sort {
input: Box<LogicalPlan>,
by_column: String,
by_column: Vec<Expr>,
reverse: bool,
},
/// An explode operation
Expand Down Expand Up @@ -370,7 +370,7 @@ impl fmt::Debug for LogicalPlan {
}
Sort {
input, by_column, ..
} => write!(f, "SORT {:?} BY COLUMN {}", input, by_column),
} => write!(f, "SORT {:?} BY {:?}", input, by_column),
Explode { input, columns, .. } => {
write!(f, "EXPLODE COLUMN(S) {:?} OF {:?}", columns, input)
}
Expand Down Expand Up @@ -526,7 +526,7 @@ impl LogicalPlan {
Sort {
input, by_column, ..
} => {
let current_node = format!("SORT by {} [{}]", by_column, id);
let current_node = format!("SORT BY {:?} [{}]", by_column, id);
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
}
Expand Down Expand Up @@ -1126,7 +1126,7 @@ impl LogicalPlanBuilder {
.into()
}

pub fn sort(self, by_column: String, reverse: bool) -> Self {
pub fn sort(self, by_column: Vec<Expr>, reverse: bool) -> Self {
LogicalPlan::Sort {
input: Box::new(self.0),
by_column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,19 @@ impl ProjectionPushDown {
reverse,
} => {
if !acc_projections.is_empty() {
// Make sure that the column used for the sort is projected
let node = expr_arena.add(AExpr::Column(Arc::new(by_column.clone())));
add_expr_to_accumulated(
node,
&mut acc_projections,
&mut projected_names,
expr_arena,
);
// Make sure that the column(s) used for the sort is projected
by_column.iter().for_each(|node| {
aexpr_to_root_nodes(*node, expr_arena)
.iter()
.for_each(|root| {
add_expr_to_accumulated(
*root,
&mut acc_projections,
&mut projected_names,
expr_arena,
);
})
});
}

self.pushdown_and_assign(
Expand Down
26 changes: 23 additions & 3 deletions polars/polars-lazy/src/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,34 @@ impl Executor for ExplodeExec {

pub(crate) struct SortExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) by_column: String,
pub(crate) by_column: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) reverse: bool,
}

impl Executor for SortExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
df.sort(self.by_column.as_str(), self.reverse)
let mut df = self.input.execute(state)?;

let by_columns = self
.by_column
.iter()
.map(|e| e.evaluate(&df, state))
.collect::<Result<Vec<_>>>()?;
let mut column_names = Vec::with_capacity(by_columns.len());
// replace the columns in the DataFrame with the expressions
// for col("foo") this is redundant
// for col("foo").reverse() this is not
for column in by_columns {
let name = column.name();
column_names.push(name.to_string());
// if error, expression create a new named column and we must add it to the DataFrame
// if ok, we have replaced the column with the expression eval
if let Err(_) = df.apply(name, |_| column.clone()) {
df.hstack(&[column])?;
}
}

df.sort(&column_names, self.reverse)
}
}

Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ impl DefaultPlanner {
reverse,
} => {
let input = self.create_initial_physical_plan(input, lp_arena, expr_arena)?;
let by_column =
self.create_physical_expressions(by_column, Context::Default, expr_arena)?;
Ok(Box::new(SortExec {
input,
by_column,
Expand Down

0 comments on commit b6b6d90

Please sign in to comment.