Skip to content

Commit

Permalink
feat[rust, python]: Add profiling to lazy execution (#4764)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 7, 2022
1 parent da7d469 commit 7cf9960
Show file tree
Hide file tree
Showing 29 changed files with 731 additions and 279 deletions.
86 changes: 39 additions & 47 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ use polars_core::datatypes::PlHashMap;
use polars_core::frame::explode::MeltArgs;
use polars_core::frame::hash_join::JoinType;
use polars_core::prelude::*;
#[cfg(feature = "dtype-categorical")]
use polars_core::toggle_string_cache;
use polars_io::RowCount;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -127,7 +125,6 @@ pub struct OptState {
pub simplify_expr: bool,
pub file_caching: bool,
pub aggregate_pushdown: bool,
pub global_string_cache: bool,
pub slice_pushdown: bool,
}

Expand All @@ -138,7 +135,6 @@ impl Default for OptState {
predicate_pushdown: true,
type_coercion: true,
simplify_expr: true,
global_string_cache: false,
slice_pushdown: true,
// will be toggled by a scan operation such as csv scan or parquet scan
file_caching: false,
Expand Down Expand Up @@ -185,7 +181,6 @@ impl LazyFrame {
predicate_pushdown: false,
type_coercion: true,
simplify_expr: false,
global_string_cache: false,
slice_pushdown: false,
// will be toggled by a scan operation such as csv scan or parquet scan
file_caching: false,
Expand Down Expand Up @@ -223,12 +218,6 @@ impl LazyFrame {
self
}

/// Toggle global string cache.
pub fn with_string_cache(mut self, toggle: bool) -> Self {
self.opt_state.global_string_cache = toggle;
self
}

/// Toggle slice pushdown optimization
pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.slice_pushdown = toggle;
Expand Down Expand Up @@ -680,40 +669,12 @@ impl LazyFrame {
Ok(lp_top)
}

/// Execute all the lazy operations and collect them into a [DataFrame](polars_core::frame::DataFrame).
/// Before execution the query is being optimized.
///
/// # Example
///
/// ```rust
/// use polars_core::prelude::*;
/// use polars_lazy::prelude::*;
///
/// fn example(df: DataFrame) -> Result<DataFrame> {
/// df.lazy()
/// .groupby([col("foo")])
/// .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
/// .collect()
/// }
/// ```
pub fn collect(self) -> Result<DataFrame> {
fn prepare_collect(self) -> Result<(ExecutionState, Box<dyn Executor>)> {
let file_caching = self.opt_state.file_caching;
#[cfg(feature = "dtype-categorical")]
let using_string_cache = self.opt_state.global_string_cache;
#[cfg(feature = "dtype-categorical")]
if using_string_cache {
eprint!("global string cache in combination with LazyFrames is deprecated; please set the global string cache globally.")
}
let mut expr_arena = Arena::with_capacity(256);
let mut lp_arena = Arena::with_capacity(128);
let lp_top = self.optimize(&mut lp_arena, &mut expr_arena)?;

// if string cache was already set, we skip this and global settings are respected
#[cfg(feature = "dtype-categorical")]
if using_string_cache {
toggle_string_cache(using_string_cache);
}

let finger_prints = if file_caching {
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
{
Expand All @@ -730,23 +691,54 @@ impl LazyFrame {
};

let planner = PhysicalPlanner::default();
let mut physical_plan =
planner.create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;
let physical_plan = planner.create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;

let mut state = ExecutionState::with_finger_prints(finger_prints);
let state = ExecutionState::with_finger_prints(finger_prints);
Ok((state, physical_plan))
}

/// Execute all the lazy operations and collect them into a [DataFrame](polars_core::frame::DataFrame).
/// Before execution the query is being optimized.
///
/// # Example
///
/// ```rust
/// use polars_core::prelude::*;
/// use polars_lazy::prelude::*;
///
/// fn example(df: DataFrame) -> Result<DataFrame> {
/// df.lazy()
/// .groupby([col("foo")])
/// .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
/// .collect()
/// }
/// ```
pub fn collect(self) -> Result<DataFrame> {
let (mut state, mut physical_plan) = self.prepare_collect()?;
let out = physical_plan.execute(&mut state);
#[cfg(debug_assertions)]
{
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
state.file_cache.assert_empty();
}
#[cfg(feature = "dtype-categorical")]
if using_string_cache {
toggle_string_cache(!using_string_cache);
}
out
}

//// Profile a LazyFrame.
////
//// This will run the query and return a tuple
//// containing the materialized DataFrame and a DataFrame that contains profiling information
//// of each node that is executed.
////
//// The units of the timings are microseconds.
pub fn profile(self) -> Result<(DataFrame, DataFrame)> {
let (mut state, mut physical_plan) = self.prepare_collect()?;
state.time_nodes();
let out = physical_plan.execute(&mut state)?;
let timer_df = state.finish_timer()?;
Ok((out, timer_df))
}

/// Filter by some predicate expression.
///
/// # Example
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::borrow::Cow;

use polars_core::prelude::*;

use crate::physical_plan::state::ExecutionState;
Expand All @@ -20,9 +22,12 @@ impl Executor for DropDuplicatesExec {
let subset = self.options.subset.as_ref().map(|v| &***v);
let keep = self.options.keep_strategy;

match self.options.maintain_order {
true => df.unique_stable(subset, keep),
false => df.unique(subset, keep),
}
state.record(
|| match self.options.maintain_order {
true => df.unique_stable(subset, keep),
false => df.unique(subset, keep),
},
Cow::Borrowed("unique()"),
)
}
}
4 changes: 3 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/explode.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::borrow::Cow;

use polars_core::prelude::*;

use crate::physical_plan::state::ExecutionState;
Expand All @@ -17,6 +19,6 @@ impl Executor for ExplodeExec {
}
}
let df = self.input.execute(state)?;
df.explode(&self.columns)
state.record(|| df.explode(&self.columns), Cow::Borrowed("explode()"))
}
}
24 changes: 19 additions & 5 deletions polars/polars-lazy/src/physical_plan/executors/filter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::borrow::Cow;

use polars_core::prelude::*;

use crate::physical_plan::state::ExecutionState;
Expand Down Expand Up @@ -25,10 +27,22 @@ impl Executor for FilterExec {
let df = self.input.execute(state)?;
let s = self.predicate.evaluate(&df, state)?;
let mask = s.bool().expect("filter predicate wasn't of type boolean");
let df = df.filter(mask)?;
if state.verbose() {
eprintln!("dataframe filtered");
}
Ok(df)

let profile_name = if state.has_node_timer() {
Cow::Owned(format!(".filter({})", &self.predicate.as_ref()))
} else {
Cow::Borrowed("")
};

state.record(
|| {
let df = df.filter(mask)?;
if state.verbose() {
eprintln!("dataframe filtered");
}
Ok(df)
},
profile_name,
)
}
}
48 changes: 36 additions & 12 deletions polars/polars-lazy/src/physical_plan/executors/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,8 @@ pub(super) fn groupby_helper(
DataFrame::new(columns)
}

impl Executor for GroupByExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run GroupbyExec")
}
}
if state.verbose() {
eprintln!("keys/aggregates are not partitionable: running default HASH AGGREGATION")
}
let df = self.input.execute(state)?;
impl GroupByExec {
fn execute_impl(&mut self, state: &mut ExecutionState, df: DataFrame) -> Result<DataFrame> {
state.set_schema(self.input_schema.clone());
let keys = self
.keys
Expand All @@ -120,3 +110,37 @@ impl Executor for GroupByExec {
)
}
}

impl Executor for GroupByExec {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run GroupbyExec")
}
}
if state.verbose() {
eprintln!("keys/aggregates are not partitionable: running default HASH AGGREGATION")
}
let df = self.input.execute(state)?;

let profile_name = if state.has_node_timer() {
let by = self
.keys
.iter()
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
.collect::<Result<Vec<_>>>()?;
let name = column_delimited("groupby".to_string(), &by);
Cow::Owned(name)
} else {
Cow::Borrowed("")
};

if state.has_node_timer() {
let new_state = state.clone();
new_state.record(|| self.execute_impl(state, df), profile_name)
} else {
self.execute_impl(state, df)
}
}
}
83 changes: 54 additions & 29 deletions polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,9 @@ pub(crate) struct GroupByDynamicExec {
pub(crate) slice: Option<(i64, usize)>,
}

impl Executor for GroupByDynamicExec {
#[cfg(not(feature = "dynamic_groupby"))]
fn execute(&mut self, _state: &mut ExecutionState) -> Result<DataFrame> {
panic!("activate feature dynamic_groupby")
}

impl GroupByDynamicExec {
#[cfg(feature = "dynamic_groupby")]
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run GroupbyDynamicExec")
}
}
let mut df = self.input.execute(state)?;
fn execute_impl(&mut self, state: &mut ExecutionState, mut df: DataFrame) -> Result<DataFrame> {
df.as_single_chunk_par();
state.set_schema(self.input_schema.clone());

Expand Down Expand Up @@ -63,21 +51,21 @@ impl Executor for GroupByDynamicExec {
}

let agg_columns = POOL.install(|| {
self.aggs
.par_iter()
.map(|expr| {
let agg = expr.evaluate_on_groups(&df, groups, state)?.finalize();
if agg.len() != groups.len() {
return Err(PolarsError::ComputeError(
format!("returned aggregation is a different length: {} than the group lengths: {}",
agg.len(),
groups.len()).into()
))
}
Ok(agg)
})
.collect::<Result<Vec<_>>>()
})?;
self.aggs
.par_iter()
.map(|expr| {
let agg = expr.evaluate_on_groups(&df, groups, state)?.finalize();
if agg.len() != groups.len() {
return Err(PolarsError::ComputeError(
format!("returned aggregation is a different length: {} than the group lengths: {}",
agg.len(),
groups.len()).into()
))
}
Ok(agg)
})
.collect::<Result<Vec<_>>>()
})?;

state.clear_schema_cache();
let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());
Expand All @@ -88,3 +76,40 @@ impl Executor for GroupByDynamicExec {
DataFrame::new(columns)
}
}

impl Executor for GroupByDynamicExec {
#[cfg(not(feature = "dynamic_groupby"))]
fn execute(&mut self, _state: &mut ExecutionState) -> Result<DataFrame> {
panic!("activate feature dynamic_groupby")
}

#[cfg(feature = "dynamic_groupby")]
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run GroupbyDynamicExec")
}
}
let df = self.input.execute(state)?;

let profile_name = if state.has_node_timer() {
let by = self
.keys
.iter()
.map(|s| Ok(s.to_field(&self.input_schema)?.name))
.collect::<Result<Vec<_>>>()?;
let name = column_delimited("groupby_dynamic".to_string(), &by);
Cow::Owned(name)
} else {
Cow::Borrowed("")
};

if state.has_node_timer() {
let new_state = state.clone();
new_state.record(|| self.execute_impl(state, df), profile_name)
} else {
self.execute_impl(state, df)
}
}
}

0 comments on commit 7cf9960

Please sign in to comment.