Skip to content

Commit

Permalink
perf(rust, python): fix quadratic time complexity of groupby in strea…
Browse files Browse the repository at this point in the history
…m… (#5614)
  • Loading branch information
ritchie46 committed Nov 24, 2022
1 parent b7be15a commit 931ec8e
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Operator for FilterOperator {
) -> PolarsResult<OperatorResult> {
let s = self
.predicate
.evaluate(chunk, context.execution_state.as_ref())?;
.evaluate(chunk, context.execution_state.as_any())?;
let mask = s.bool().map_err(|e| {
PolarsError::ComputeError(
format!("Filter predicate must be of type Boolean, got: {:?}", e).into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Operator for ProjectionOperator {
let projected = self
.exprs
.iter()
.map(|e| e.evaluate(chunk, context.execution_state.as_ref()))
.map(|e| e.evaluate(chunk, context.execution_state.as_any()))
.collect::<PolarsResult<Vec<_>>>()?;

let chunk = chunk.with_data(DataFrame::new_no_checks(projected));
Expand All @@ -66,7 +66,7 @@ impl Operator for HstackOperator {
let projected = self
.exprs
.iter()
.map(|e| e.evaluate(chunk, context.execution_state.as_ref()))
.map(|e| e.evaluate(chunk, context.execution_state.as_any()))
.collect::<PolarsResult<Vec<_>>>()?;

let mut df = chunk.data.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct GenericGroupbySink {
// this vec will have two functions. We will use these functions
// to populate the buffer where the hashmap points to
agg_fns: Vec<AggregateFunction>,
input_schema: SchemaRef,
output_schema: SchemaRef,
// amortize allocations
aggregation_series: Vec<Series>,
Expand All @@ -83,6 +84,7 @@ impl GenericGroupbySink {
key_columns: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
aggregation_columns: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
agg_fns: Vec<AggregateFunction>,
input_schema: SchemaRef,
output_schema: SchemaRef,
slice: Option<(i64, usize)>,
) -> Self {
Expand All @@ -106,6 +108,7 @@ impl GenericGroupbySink {
aggregation_columns,
hb,
agg_fns,
input_schema,
output_schema,
aggregation_series: vec![],
keys_series: vec![],
Expand Down Expand Up @@ -199,16 +202,20 @@ impl GenericGroupbySink {

impl Sink for GenericGroupbySink {
fn sink(&mut self, context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult> {
let state = context.execution_state.as_ref();
if !state.input_schema_is_set() {
state.set_input_schema(self.input_schema.clone())
}
let num_aggs = self.number_of_aggs();

// todo! amortize allocation
for phys_e in self.aggregation_columns.iter() {
let s = phys_e.evaluate(&chunk, context.execution_state.as_ref())?;
let s = phys_e.evaluate(&chunk, context.execution_state.as_any())?;
let s = s.to_physical_repr();
self.aggregation_series.push(s.rechunk());
}
for phys_e in self.key_columns.iter() {
let s = phys_e.evaluate(&chunk, context.execution_state.as_ref())?;
let s = phys_e.evaluate(&chunk, context.execution_state.as_any())?;
let s = s.to_physical_repr();
self.keys_series.push(s.rechunk());
}
Expand Down Expand Up @@ -400,6 +407,7 @@ impl Sink for GenericGroupbySink {
self.key_columns.clone(),
self.aggregation_columns.clone(),
self.agg_fns.iter().map(|func| func.split2()).collect(),
self.input_schema.clone(),
self.output_schema.clone(),
self.slice,
);
Expand All @@ -408,7 +416,8 @@ impl Sink for GenericGroupbySink {
Box::new(new)
}

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
fn finalize(&mut self, context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
context.execution_state.clear_input_schema();
let dfs = self.pre_finalize()?;
if dfs.is_empty() {
return Ok(FinalizedSink::Finished(DataFrame::from(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct PrimitiveGroupbySink<K: PolarsNumericType> {
// this vec will have two functions. We will use these functions
// to populate the buffer where the hashmap points to
agg_fns: Vec<AggregateFunction>,
input_schema: SchemaRef,
output_schema: SchemaRef,
// amortize allocations
aggregation_series: Vec<Series>,
Expand All @@ -76,6 +77,7 @@ where
key: Arc<dyn PhysicalPipedExpr>,
aggregation_columns: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
agg_fns: Vec<AggregateFunction>,
input_schema: SchemaRef,
output_schema: SchemaRef,
slice: Option<(i64, usize)>,
) -> Self {
Expand All @@ -94,6 +96,7 @@ where
aggregation_columns,
hb,
agg_fns,
input_schema,
output_schema,
aggregation_series: vec![],
hashes: vec![],
Expand Down Expand Up @@ -179,11 +182,15 @@ where
ChunkedArray<K>: IntoSeries,
{
fn sink(&mut self, context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult> {
let state = context.execution_state.as_ref();
if !state.input_schema_is_set() {
state.set_input_schema(self.input_schema.clone())
}
let num_aggs = self.number_of_aggs();

let s = self
.key
.evaluate(&chunk, context.execution_state.as_ref())?;
.evaluate(&chunk, context.execution_state.as_any())?;
let s = s.to_physical_repr();
let s = s.rechunk();

Expand All @@ -202,7 +209,7 @@ where

// todo! ammortize allocation
for phys_e in self.aggregation_columns.iter() {
let s = phys_e.evaluate(&chunk, context.execution_state.as_ref())?;
let s = phys_e.evaluate(&chunk, context.execution_state.as_any())?;
let s = s.to_physical_repr();
self.aggregation_series.push(s.rechunk());
}
Expand Down Expand Up @@ -300,7 +307,8 @@ where
});
}

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
fn finalize(&mut self, context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
context.execution_state.clear_input_schema();
let dfs = self.pre_finalize()?;
if dfs.is_empty() {
return Ok(FinalizedSink::Finished(DataFrame::from(
Expand All @@ -316,6 +324,7 @@ where
self.key.clone(),
self.aggregation_columns.clone(),
self.agg_fns.iter().map(|func| func.split2()).collect(),
self.input_schema.clone(),
self.output_schema.clone(),
self.slice,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct Utf8GroupbySink {
// this vec will have two functions. We will use these functions
// to populate the buffer where the hashmap points to
agg_fns: Vec<AggregateFunction>,
input_schema: SchemaRef,
output_schema: SchemaRef,
// amortize allocations
aggregation_series: Vec<Series>,
Expand All @@ -60,6 +61,7 @@ impl Utf8GroupbySink {
key_column: Arc<dyn PhysicalPipedExpr>,
aggregation_columns: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
agg_fns: Vec<AggregateFunction>,
input_schema: SchemaRef,
output_schema: SchemaRef,
slice: Option<(i64, usize)>,
) -> Self {
Expand All @@ -80,6 +82,7 @@ impl Utf8GroupbySink {
aggregation_columns,
hb,
agg_fns,
input_schema,
output_schema,
aggregation_series: vec![],
hashes: vec![],
Expand Down Expand Up @@ -166,18 +169,22 @@ impl Utf8GroupbySink {

impl Sink for Utf8GroupbySink {
fn sink(&mut self, context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult> {
let state = context.execution_state.as_ref();
if !state.input_schema_is_set() {
state.set_input_schema(self.input_schema.clone())
}
let num_aggs = self.number_of_aggs();
self.hashes.reserve(chunk.data.height());

// todo! amortize allocation
for phys_e in self.aggregation_columns.iter() {
let s = phys_e.evaluate(&chunk, context.execution_state.as_ref())?;
let s = phys_e.evaluate(&chunk, context.execution_state.as_any())?;
let s = s.to_physical_repr();
self.aggregation_series.push(s.rechunk());
}
let s = self
.key_column
.evaluate(&chunk, context.execution_state.as_ref())?;
.evaluate(&chunk, context.execution_state.as_any())?;
let s = s.rechunk();
// write the hashes to self.hashes buffer
// s.vec_hash(self.hb.clone(), &mut self.hashes).unwrap();
Expand Down Expand Up @@ -334,6 +341,7 @@ impl Sink for Utf8GroupbySink {
self.key_column.clone(),
self.aggregation_columns.clone(),
self.agg_fns.iter().map(|func| func.split2()).collect(),
self.input_schema.clone(),
self.output_schema.clone(),
self.slice,
);
Expand All @@ -342,7 +350,8 @@ impl Sink for Utf8GroupbySink {
Box::new(new)
}

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
fn finalize(&mut self, context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
context.execution_state.clear_input_schema();
let dfs = self.pre_finalize()?;
if dfs.is_empty() {
return Ok(FinalizedSink::Finished(DataFrame::from(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl GenericBuild {
) -> PolarsResult<&[Series]> {
self.join_series.clear();
for phys_e in self.join_columns_left.iter() {
let s = phys_e.evaluate(chunk, context.execution_state.as_ref())?;
let s = phys_e.evaluate(chunk, context.execution_state.as_any())?;
let s = s.to_physical_repr();
let s = s.rechunk();
self.materialized_join_cols.push(s.array_ref(0).clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl GenericJoinProbe {
.iter()
.flat_map(|phys_e| {
phys_e
.evaluate(&tmp, context.execution_state.as_ref())
.evaluate(&tmp, context.execution_state.as_any())
.ok()
.map(|s| s.name().to_string())
})
Expand Down Expand Up @@ -119,7 +119,7 @@ impl GenericJoinProbe {
) -> PolarsResult<&[Series]> {
self.join_series.clear();
for phys_e in self.join_columns_right.iter() {
let s = phys_e.evaluate(chunk, context.execution_state.as_ref())?;
let s = phys_e.evaluate(chunk, context.execution_state.as_any())?;
let s = s.to_physical_repr();
self.join_series.push(s.rechunk());
}
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/polars-pipe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ pub mod pipeline;
// ideal chunk size we strive to
#[cfg(feature = "compile")]
pub(crate) const CHUNK_SIZE: usize = 50_000;
#[cfg(feature = "compile")]
pub use operators::SExecutionContext;
16 changes: 14 additions & 2 deletions polars/polars-lazy/polars-pipe/src/operators/context.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
use std::any::Any;

use polars_core::schema::SchemaRef;

pub trait SExecutionContext: Send + Sync {
fn input_schema_is_set(&self) -> bool;

fn set_input_schema(&self, schema: SchemaRef);

fn clear_input_schema(&self);

fn as_any(&self) -> &dyn Any;
}

pub struct PExecutionContext {
// injected upstream in polars-lazy
pub(crate) execution_state: Box<dyn Any + Send + Sync>,
pub(crate) execution_state: Box<dyn SExecutionContext>,
}

impl PExecutionContext {
pub(crate) fn new(state: Box<dyn Any + Send + Sync>) -> Self {
pub(crate) fn new(state: Box<dyn SExecutionContext>) -> Self {
PExecutionContext {
execution_state: state,
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/polars-pipe/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod sink;
mod source;

pub(crate) use chunks::*;
pub(crate) use context::*;
pub use context::*;
pub(crate) use operator::*;
pub(crate) use polars_core::prelude::*;
pub use sink::*;
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ where
let mut aggregation_columns = Vec::with_capacity(aggs.len());
let mut agg_fns = Vec::with_capacity(aggs.len());

let input_schema = lp_arena.get(*input).schema(lp_arena);
let input_schema = lp_arena.get(*input).schema(lp_arena).into_owned();

for node in aggs {
let (index, agg_fn) =
Expand All @@ -178,6 +178,7 @@ where
key_columns[0].clone(),
aggregation_columns,
agg_fns,
input_schema,
output_schema.clone(),
options.slice
)) as Box<dyn Sink>
Expand All @@ -187,13 +188,15 @@ where
key_columns[0].clone(),
aggregation_columns,
agg_fns,
input_schema,
output_schema.clone(),
options.slice,
)) as Box<dyn Sink>,
_ => Box::new(groupby::GenericGroupbySink::new(
key_columns,
aggregation_columns,
agg_fns,
input_schema,
output_schema.clone(),
options.slice,
)) as Box<dyn Sink>,
Expand Down
8 changes: 3 additions & 5 deletions polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::any::Any;

use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::utils::concat_df_unchecked;
Expand All @@ -9,8 +7,8 @@ use rayon::prelude::*;

use crate::executors::sources::DataFrameSource;
use crate::operators::{
DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext, Sink, SinkResult,
Source, SourceResult,
DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext, SExecutionContext, Sink,
SinkResult, Source, SourceResult,
};

pub struct PipeLine {
Expand Down Expand Up @@ -228,7 +226,7 @@ impl PipeLine {
Ok(out.unwrap())
}

pub fn execute(&mut self, state: Box<dyn Any + Send + Sync>) -> PolarsResult<DataFrame> {
pub fn execute(&mut self, state: Box<dyn SExecutionContext>) -> PolarsResult<DataFrame> {
let ec = PExecutionContext::new(state);
let mut sink_out = self.run_pipeline(&ec)?;
let mut pipelines = self.rh_sides.iter_mut();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ impl PartitionGroupByExec {
// of groups.
let keys = self.keys(&original_df, state)?;

// set it here, because `self.input.execute` will clear the schema cache.
state.set_schema(self.input_schema.clone());
if !can_run_partitioned(&keys, &original_df, state, self.from_partitioned_ds)? {
return groupby_helper(
original_df,
Expand All @@ -249,8 +251,6 @@ impl PartitionGroupByExec {
// Run the partitioned aggregations
let n_threads = POOL.current_num_threads();

// set it here, because `self.input.execute` will clear the schema cache.
state.set_schema(self.input_schema.clone());
run_partitions(
&mut original_df,
self,
Expand Down

0 comments on commit 931ec8e

Please sign in to comment.