Skip to content

Commit

Permalink
Several projection improvements (#2795)
Browse files Browse the repository at this point in the history
* cache schema in projection and use it

* don't allow std::collections::Hashmap

* set schema in groupby

* use schema when known
  • Loading branch information
ritchie46 committed Feb 28, 2022
1 parent 708a641 commit 7ddc947
Show file tree
Hide file tree
Showing 22 changed files with 119 additions and 72 deletions.
1 change: 1 addition & 0 deletions polars/clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
disallowed-types = ["std::collections::HashMap", "std::collections::HashSet"]
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use ahash::RandomState;
use hashbrown::hash_map::{Entry, RawEntryMut};
use hashbrown::HashMap;
use rayon::prelude::*;
use std::collections::HashSet;
use std::fmt::Debug;
use std::hash::{BuildHasher, Hash, Hasher};

Expand Down Expand Up @@ -1017,7 +1016,7 @@ impl DataFrame {
mut df_right: DataFrame,
suffix: Option<String>,
) -> Result<DataFrame> {
let mut left_names = HashSet::with_capacity_and_hasher(df_left.width(), RandomState::new());
let mut left_names = PlHashSet::with_capacity(df_left.width());

df_left.columns.iter().for_each(|series| {
left_names.insert(series.name());
Expand All @@ -1036,6 +1035,7 @@ impl DataFrame {
df_right.rename(&name, &format!("{}{}", name, suffix))?;
}

drop(left_names);
df_left.hstack_mut(&df_right.columns)?;
Ok(df_left)
}
Expand Down
23 changes: 9 additions & 14 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! DataFrame module.
use std::borrow::Cow;
use std::collections::HashSet;
use std::iter::{FromIterator, Iterator};
use std::mem;
use std::ops;
Expand Down Expand Up @@ -33,7 +32,6 @@ use crate::vector_hasher::boost_hash_combine;
#[cfg(feature = "row_hash")]
use crate::vector_hasher::df_rows_to_hashes_threaded;
use crate::POOL;
use hashbrown::HashMap;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::hash::{BuildHasher, Hash, Hasher};
Expand Down Expand Up @@ -155,14 +153,6 @@ impl DataFrame {
}
}

fn hash_names(&self) -> HashSet<String, RandomState> {
let mut set = HashSet::with_capacity_and_hasher(self.columns.len(), RandomState::default());
for s in &self.columns {
set.insert(s.name().to_string());
}
set
}

/// Create a DataFrame from a Vector of Series.
///
/// # Example
Expand Down Expand Up @@ -673,7 +663,11 @@ impl DataFrame {
/// }
/// ```
pub fn hstack_mut(&mut self, columns: &[Series]) -> Result<&mut Self> {
let mut names = self.hash_names();
let mut names = PlHashSet::with_capacity(self.columns.len());
for s in &self.columns {
names.insert(s.name());
}

let height = self.height();
// first loop check validity. We don't do this in a single pass otherwise
// this DataFrame is already modified when an error occurs.
Expand All @@ -693,8 +687,9 @@ impl DataFrame {
.into(),
));
}
names.insert(name.to_string());
names.insert(name);
}
drop(names);
Ok(self.hstack_mut_no_checks(columns))
}

Expand Down Expand Up @@ -1280,10 +1275,10 @@ impl DataFrame {

/// A non generic implementation to reduce compiler bloat.
fn select_series_impl(&self, cols: &[String]) -> Result<Vec<Series>> {
let selected = if cols.len() > 1 && self.columns.len() > 300 {
let selected = if cols.len() > 1 && self.columns.len() > 10 {
// we hash, because there are user that having millions of columns.
// # https://github.com/pola-rs/polars/issues/1023
let name_to_idx: HashMap<&str, usize> = self
let name_to_idx: PlHashMap<&str, usize> = self
.columns
.iter()
.enumerate()
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ test = [
[dependencies]
ahash = "0.7"
glob = "0.3"
parking_lot = "0.12"
rayon = "1.5"
regex = { version = "1.5", optional = true }

Expand Down
11 changes: 4 additions & 7 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ use crate::logical_plan::ParquetOptions;
use crate::logical_plan::{det_melt_schema, Context, CsvParserOptions};
use crate::prelude::*;
use crate::utils::{aexprs_to_schema, PushNode};
use ahash::RandomState;
use polars_core::prelude::*;
use polars_utils::arena::{Arena, Node};
use std::collections::HashSet;
#[cfg(any(feature = "csv-file", feature = "parquet"))]
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -669,18 +667,16 @@ impl<'a> ALogicalPlanBuilder<'a> {
let schema_right = self.lp_arena.get(other).schema(self.lp_arena);

// column names of left table
let mut names: HashSet<&str, RandomState> = HashSet::with_capacity_and_hasher(
schema_left.len() + schema_right.len(),
Default::default(),
);
let mut names: PlHashSet<&str> =
PlHashSet::with_capacity(schema_left.len() + schema_right.len());
let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len());

for (name, dtype) in schema_left.iter() {
names.insert(name.as_str());
new_schema.with_column(name.to_string(), dtype.clone())
}

let right_names: HashSet<_, RandomState> = right_on
let right_names: PlHashSet<_> = right_on
.iter()
.map(|e| match self.expr_arena.get(*e) {
AExpr::Alias(_, name) => name.clone(),
Expand Down Expand Up @@ -710,6 +706,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
right_on,
options,
};
drop(names);
let root = self.lp_arena.add(lp);
Self::new(root, self.expr_arena, self.lp_arena)
}
Expand Down
9 changes: 4 additions & 5 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::logical_plan::projection::rewrite_projections;
use crate::prelude::*;
use crate::utils;
use crate::utils::{combine_predicates_expr, has_expr};
use ahash::RandomState;
use polars_core::prelude::*;
use polars_io::csv::CsvEncoding;
#[cfg(feature = "csv-file")]
Expand All @@ -17,7 +16,6 @@ use polars_io::{
csv::NullValues,
csv_core::utils::{get_reader_bytes, is_compressed},
};
use std::collections::HashSet;
use std::io::{Read, Seek, SeekFrom};
use std::path::PathBuf;

Expand Down Expand Up @@ -378,22 +376,22 @@ impl LogicalPlanBuilder {
let schema_right = other.schema();

// column names of left table
let mut names: HashSet<&String, RandomState> = HashSet::default();
let mut names: PlHashSet<&str> = PlHashSet::default();
let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len());

for (name, dtype) in schema_left.iter() {
names.insert(name);
new_schema.with_column(name.to_string(), dtype.clone())
}

let right_names: HashSet<_, RandomState> = right_on
let right_names: PlHashSet<_> = right_on
.iter()
.map(|e| utils::expr_output_name(e).expect("could not find name"))
.collect();

for (name, dtype) in schema_right.iter() {
if !right_names.iter().any(|s| s.as_ref() == name) {
if names.contains(name) {
if names.contains(&**name) {
let new_name = format!("{}{}", name, options.suffix.as_ref());
new_schema.with_column(new_name, dtype.clone())
} else {
Expand All @@ -404,6 +402,7 @@ impl LogicalPlanBuilder {

let schema = Arc::new(new_schema);

drop(names);
LogicalPlan::Join {
input_left: Box::new(self.0),
input_right: Box::new(other),
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct GroupByExec {
aggs: Vec<Arc<dyn PhysicalExpr>>,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
input_schema: SchemaRef,
}

impl GroupByExec {
Expand All @@ -22,13 +23,15 @@ impl GroupByExec {
aggs: Vec<Arc<dyn PhysicalExpr>>,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
input_schema: SchemaRef,
) -> Self {
Self {
input,
keys,
aggs,
apply,
maintain_order,
input_schema,
}
}
}
Expand All @@ -44,6 +47,7 @@ fn groupby_helper(
let gb = df.groupby_with_series(keys, true, maintain_order)?;

if let Some(f) = apply {
state.clear_schema_cache();
return gb.apply(|df| f.call_udf(df));
}

Expand Down Expand Up @@ -74,6 +78,7 @@ fn groupby_helper(
let agg_columns = agg_columns?;

columns.extend(agg_columns.into_iter().flatten());
state.clear_schema_cache();
DataFrame::new(columns)
}

Expand All @@ -83,6 +88,7 @@ impl Executor for GroupByExec {
eprintln!("aggregates are not partitionable: running default HASH AGGREGATION")
}
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());
let keys = self
.keys
.iter()
Expand Down Expand Up @@ -289,6 +295,7 @@ impl Executor for PartitionGroupByExec {
// MERGE phase
// merge and hash aggregate again
let df = accumulate_dataframes_vertical(dfs)?;
// the partitioned groupby has added columns so we must update the schema.
let key = self.key.evaluate(&df, state)?;

// first get mutable access and optionally sort
Expand Down Expand Up @@ -321,6 +328,7 @@ impl Executor for PartitionGroupByExec {
POOL.install(|| rayon::join(get_columns, get_agg));

columns.extend(agg_columns);
state.clear_schema_cache();

let df = DataFrame::new_no_checks(columns);
Ok(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ pub(crate) struct GroupByDynamicExec {
pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) options: DynamicGroupOptions,
pub(crate) input_schema: SchemaRef,
}

impl Executor for GroupByDynamicExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
#[cfg(feature = "dynamic_groupby")]
{
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());
let keys = self
.keys
.iter()
Expand Down Expand Up @@ -44,6 +46,7 @@ impl Executor for GroupByDynamicExec {
.collect::<Result<Vec<_>>>()
})?;

state.clear_schema_cache();
let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());
columns.extend(keys);
columns.push(time_key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ pub(crate) struct GroupByRollingExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) options: RollingGroupOptions,
pub(crate) input_schema: SchemaRef,
}

impl Executor for GroupByRollingExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
#[cfg(feature = "dynamic_groupby")]
{
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());

let (time_key, groups) = df.groupby_rolling(&self.options)?;

Expand All @@ -36,6 +38,7 @@ impl Executor for GroupByRollingExec {
.collect::<Result<Vec<_>>>()
})?;

state.clear_schema_cache();
let mut columns = Vec::with_capacity(agg_columns.len() + 1);
columns.push(time_key);
columns.extend(agg_columns.into_iter().flatten());
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub(crate) fn evaluate_physical_expressions(
.collect::<Result<_>>()
})?
};
state.clear_schema_cache();

check_expand_literals(selected_columns, zero_length)
}
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ pub struct ProjectionExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) has_windows: bool,
pub(crate) input_schema: SchemaRef,
#[cfg(test)]
pub(crate) schema: SchemaRef,
}

impl Executor for ProjectionExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());

let df = evaluate_physical_expressions(&df, &self.expr, state, self.has_windows);

Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl Executor for DataFrameExec {
// projection should be before selection as those are free
// TODO: this is only the case if we don't create new columns
if let Some(projection) = &self.projection {
state.may_set_schema(&df, projection.len());
df = evaluate_physical_expressions(&df, projection, state, self.has_windows)?;
}

Expand All @@ -274,6 +275,7 @@ impl Executor for DataFrameExec {
})?;
df = df.filter(mask)?;
}
state.clear_schema_cache();

if let Some(limit) = set_n_rows(None) {
Ok(df.head(Some(limit)))
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ pub struct StackExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) has_windows: bool,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) input_schema: SchemaRef,
}

impl Executor for StackExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let mut df = self.input.execute(state)?;

state.set_schema(self.input_schema.clone());
let res = if self.has_windows {
// we have a different run here
// to ensure the window functions run sequential and share caches
Expand All @@ -26,12 +28,13 @@ impl Executor for StackExec {
.collect::<Result<Vec<_>>>()
})?
};
state.clear_schema_cache();
state.clear_expr_cache();

for s in res {
df.with_column(s)?;
}

state.clear_expr_cache();
Ok(df)
}
}
18 changes: 10 additions & 8 deletions polars/polars-lazy/src/physical_plan/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ impl PhysicalExpr for ColumnExpr {
fn as_expression(&self) -> &Expr {
&self.1
}
fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> Result<Series> {
let column = match &*self.0 {
"" => df.select_at_idx(0).ok_or_else(|| {
PolarsError::NoData("could not select a column from an empty DataFrame".into())
})?,
_ => df.column(&self.0)?,
};
Ok(column.clone())
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
match state.get_schema() {
None => df.column(&self.0).cloned(),
Some(schema) => {
let (idx, _, _) = schema
.get_full(&self.0)
.ok_or_else(|| PolarsError::NotFound(self.0.to_string()))?;
Ok(df.get_columns()[idx].clone())
}
}
}
#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
Expand Down

0 comments on commit 7ddc947

Please sign in to comment.