Skip to content

Commit

Permalink
feat(rust,python): support horizontal concatenation of LazyFrames (#1…
Browse files Browse the repository at this point in the history
…3139)

Co-authored-by: Ritchie Vink <ritchie46@gmail.com>
  • Loading branch information
adamreeve and ritchie46 committed Dec 28, 2023
1 parent 9b1c550 commit f553c4c
Show file tree
Hide file tree
Showing 31 changed files with 631 additions and 53 deletions.
2 changes: 2 additions & 0 deletions crates/polars-lazy/Cargo.toml
Expand Up @@ -114,6 +114,7 @@ dynamic_group_by = ["polars-plan/dynamic_group_by", "polars-time", "temporal"]
ewma = ["polars-plan/ewma"]
dot_diagram = ["polars-plan/dot_diagram"]
diagonal_concat = []
horizontal_concat = ["polars-plan/horizontal_concat", "polars-core/horizontal_concat"]
unique_counts = ["polars-plan/unique_counts"]
log = ["polars-plan/log"]
list_eval = []
Expand Down Expand Up @@ -298,6 +299,7 @@ features = [
"cumulative_eval",
"list_eval",
"diagonal_concat",
"horizontal_concat",
"hist",
"replace",
]
Expand Down
43 changes: 43 additions & 0 deletions crates/polars-lazy/src/dsl/functions.rs
Expand Up @@ -169,6 +169,49 @@ pub fn concat_lf_diagonal<L: AsRef<[LazyFrame]>>(
concat(lfs_with_all_columns, args)
}

#[cfg(feature = "horizontal_concat")]
/// Concat [LazyFrame]s horizontally.
pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
inputs: L,
args: UnionArgs,
) -> PolarsResult<LazyFrame> {
let lfs = inputs.as_ref();
let mut opt_state = lfs.first().map(|lf| lf.opt_state).ok_or_else(
|| polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"),
)?;

for lf in &lfs[1..] {
// ensure we enable file caching if any lf has it enabled
opt_state.file_caching |= lf.opt_state.file_caching;
}

let mut lps = Vec::with_capacity(lfs.len());
let mut schemas = Vec::with_capacity(lfs.len());

for lf in lfs.iter() {
let mut lf = lf.clone();
let schema = lf.schema()?;
schemas.push(schema);
let lp = std::mem::take(&mut lf.logical_plan);
lps.push(lp);
}

let combined_schema = merge_schemas(&schemas)?;

let options = HConcatOptions {
parallel: args.parallel,
};
let lp = LogicalPlan::HConcat {
inputs: lps,
schema: Arc::new(combined_schema),
options,
};
let mut lf = LazyFrame::from(lp);
lf.opt_state = opt_state;

Ok(lf)
}

#[derive(Clone, Copy)]
pub struct UnionArgs {
pub parallel: bool,
Expand Down
63 changes: 63 additions & 0 deletions crates/polars-lazy/src/physical_plan/executors/hconcat.rs
@@ -0,0 +1,63 @@
use polars_core::functions::concat_df_horizontal;

use super::*;

pub(crate) struct HConcatExec {
pub(crate) inputs: Vec<Box<dyn Executor>>,
pub(crate) options: HConcatOptions,
}

impl Executor for HConcatExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run HConcatExec")
}
}
let mut inputs = std::mem::take(&mut self.inputs);

let dfs = if !self.options.parallel {
if state.verbose() {
println!("HCONCAT: `parallel=false` hconcat is run sequentially")
}
let mut dfs = Vec::with_capacity(inputs.len());
for (idx, mut input) in inputs.into_iter().enumerate() {
let mut state = state.split();
state.branch_idx += idx;

let df = input.execute(&mut state)?;

dfs.push(df);
}
dfs
} else {
if state.verbose() {
println!("HCONCAT: hconcat is run in parallel")
}
// We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
// within bounds
let out = POOL.install(|| {
inputs
.chunks_mut(POOL.current_num_threads() * 3)
.map(|chunk| {
chunk
.into_par_iter()
.enumerate()
.map(|(idx, input)| {
let mut input = std::mem::take(input);
let mut state = state.split();
state.branch_idx += idx;
input.execute(&mut state)
})
.collect::<PolarsResult<Vec<_>>>()
})
.collect::<PolarsResult<Vec<_>>>()
});
out?.into_iter().flatten().collect()
};

concat_df_horizontal(&dfs)
}
}
4 changes: 4 additions & 0 deletions crates/polars-lazy/src/physical_plan/executors/mod.rs
Expand Up @@ -6,6 +6,8 @@ mod group_by;
mod group_by_dynamic;
mod group_by_partitioned;
pub(super) mod group_by_rolling;
#[cfg(feature = "horizontal_concat")]
mod hconcat;
mod join;
mod projection;
mod projection_utils;
Expand Down Expand Up @@ -37,6 +39,8 @@ pub(super) use self::group_by_dynamic::*;
pub(super) use self::group_by_partitioned::*;
#[cfg(feature = "dynamic_group_by")]
pub(super) use self::group_by_rolling::GroupByRollingExec;
#[cfg(feature = "horizontal_concat")]
pub(super) use self::hconcat::*;
pub(super) use self::join::*;
pub(super) use self::projection::*;
#[cfg(feature = "python")]
Expand Down
4 changes: 1 addition & 3 deletions crates/polars-lazy/src/physical_plan/file_cache.rs
Expand Up @@ -50,10 +50,8 @@ impl FileCache {
where
F: FnMut() -> PolarsResult<DataFrame>,
{
debug_assert_ne!(total_read_count, 0);
if total_read_count == 1 {
if total_read_count == 0 {
eprintln!("we have hit an unexpected branch, please open an issue")
}
reader()
} else {
// should exist
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Expand Up @@ -172,6 +172,16 @@ pub fn create_physical_plan(
.collect::<PolarsResult<Vec<_>>>()?;
Ok(Box::new(executors::UnionExec { inputs, options }))
},
#[cfg(feature = "horizontal_concat")]
HConcat {
inputs, options, ..
} => {
let inputs = inputs
.into_iter()
.map(|node| create_physical_plan(node, lp_arena, expr_arena))
.collect::<PolarsResult<Vec<_>>>()?;
Ok(Box::new(executors::HConcatExec { inputs, options }))
},
Slice { input, offset, len } => {
let input = create_physical_plan(input, lp_arena, expr_arena)?;
Ok(Box::new(executors::SliceExec { input, offset, len }))
Expand Down

0 comments on commit f553c4c

Please sign in to comment.