Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use UnionArgs for DSL side #16017

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 4 additions & 39 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
//!
use polars_core::prelude::*;
pub use polars_plan::dsl::functions::*;
use polars_plan::prelude::UnionArgs;
use rayon::prelude::*;

use crate::prelude::*;

pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
inputs: L,
rechunk: bool,
parallel: bool,
from_partitioned_ds: bool,
convert_supertypes: bool,
args: UnionArgs,
) -> PolarsResult<LazyFrame> {
let mut inputs = inputs.as_ref().to_vec();

Expand All @@ -24,12 +22,6 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
);

let mut opt_state = lf.opt_state;
let options = UnionOptions {
parallel,
from_partitioned_ds,
rechunk,
..Default::default()
};

let mut lps = Vec::with_capacity(inputs.len());
lps.push(lf.logical_plan);
Expand All @@ -41,11 +33,7 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
lps.push(lp)
}

let lp = DslPlan::Union {
inputs: lps,
options,
convert_supertypes,
};
let lp = DslPlan::Union { inputs: lps, args };
let mut lf = LazyFrame::from(lp);
lf.opt_state = opt_state;
Ok(lf)
Expand Down Expand Up @@ -152,32 +140,9 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
Ok(lf)
}

#[derive(Clone, Copy)]
pub struct UnionArgs {
pub parallel: bool,
pub rechunk: bool,
pub to_supertypes: bool,
}

impl Default for UnionArgs {
fn default() -> Self {
Self {
parallel: true,
rechunk: true,
to_supertypes: false,
}
}
}

/// Concat multiple [`LazyFrame`]s vertically.
pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
concat_impl(
inputs,
args.rechunk,
args.parallel,
false,
args.to_supertypes,
)
concat_impl(inputs, args)
}

/// Collect all [`LazyFrame`] computations.
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use polars_plan::logical_plan::{
AnonymousScan, AnonymousScanArgs, AnonymousScanOptions, DslPlan, Literal, LiteralValue, Null,
NULL,
};
pub use polars_plan::prelude::UnionArgs;
pub(crate) use polars_plan::prelude::*;
#[cfg(feature = "rolling_window")]
pub use polars_time::{prelude::RollingOptions, Duration};
Expand Down
9 changes: 8 additions & 1 deletion crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ impl LazyFileListReader for LazyCsvReader {

fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
// set to false, as the csv parser has full thread utilization
concat_impl(&lfs, self.rechunk(), false, true, false)
let args = UnionArgs {
rechunk: self.rechunk(),
parallel: false,
to_supertypes: false,
from_partitioned_ds: true,
..Default::default()
};
concat_impl(&lfs, args)
}
}
10 changes: 9 additions & 1 deletion crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::utils::is_cloud_url;
use polars_io::RowIndex;
use polars_plan::prelude::UnionArgs;

use crate::prelude::*;

Expand Down Expand Up @@ -83,7 +84,14 @@ pub trait LazyFileListReader: Clone {
/// This method should not take into consideration [LazyFileListReader::n_rows]
/// nor [LazyFileListReader::row_index].
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
concat_impl(&lfs, self.rechunk(), true, true, false)
let args = UnionArgs {
rechunk: self.rechunk(),
parallel: true,
to_supertypes: false,
from_partitioned_ds: true,
..Default::default()
};
concat_impl(&lfs, args)
}

/// Get the final [LazyFrame].
Expand Down
9 changes: 3 additions & 6 deletions crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,18 @@ pub fn to_alp_impl(
options,
predicate: None,
},
DslPlan::Union {
inputs,
options,
convert_supertypes,
} => {
DslPlan::Union { inputs, args } => {
let mut inputs = inputs
.into_iter()
.map(|lp| to_alp_impl(lp, expr_arena, lp_arena, convert))
.collect::<PolarsResult<Vec<_>>>()
.map_err(|e| e.context(failed_input!(vertical concat)))?;

if convert_supertypes {
if args.to_supertypes {
convert_utils::convert_st_union(&mut inputs, lp_arena, expr_arena)
.map_err(|e| e.context(failed_input!(vertical concat)))?;
}
let options = args.into();
IR::Union { inputs, options }
},
DslPlan::HConcat {
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-plan/src/logical_plan/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ impl IR {
},
#[cfg(feature = "python")]
IR::PythonScan { options, .. } => DslPlan::PythonScan { options },
IR::Union { inputs, options } => {
IR::Union { inputs, .. } => {
let inputs = inputs
.into_iter()
.map(|node| convert_to_lp(node, lp_arena))
.collect();
DslPlan::Union {
inputs,
options,
convert_supertypes: false,
args: Default::default(),
}
},
IR::HConcat {
Expand Down
22 changes: 11 additions & 11 deletions crates/polars-plan/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::borrow::Cow;
use std::fmt;
use std::fmt::{Debug, Display, Formatter, Write};
use std::fmt::{Debug, Display, Formatter};
use std::path::PathBuf;

use polars_core::prelude::AnyValue;
Expand Down Expand Up @@ -81,16 +81,16 @@ impl DslPlan {
options.n_rows,
)
},
Union {
inputs, options, ..
} => {
let mut name = String::new();
let name = if let Some(slice) = options.slice {
write!(name, "SLICED UNION: {slice:?}")?;
name.as_str()
} else {
"UNION"
};
Union { inputs, .. } => {
// let mut name = String::new();
// THIS is commented out, but must be restored once we format IR's
// let name = if let Some(slice) = options.slice {
// write!(name, "SLICED UNION: {slice:?}")?;
// name.as_str()
// } else {
// "UNION"
// };
let name = "UNION";
// 3 levels of indentation
// - 0 => UNION ... END UNION
// - 1 => PLAN 0, PLAN 1, ... PLAN N
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ pub enum DslPlan {
input: Arc<DslPlan>,
function: DslFunction,
},
/// Vertical concatenation
Union {
inputs: Vec<DslPlan>,
options: UnionOptions,
convert_supertypes: bool,
args: UnionArgs,
},
/// Horizontal concatenation of multiple plans
HConcat {
Expand Down Expand Up @@ -197,7 +197,7 @@ impl Clone for DslPlan {
Self::Sort {input,by_column, slice, sort_options } => Self::Sort { input: input.clone(), by_column: by_column.clone(), slice: slice.clone(), sort_options: sort_options.clone() },
Self::Slice { input, offset, len } => Self::Slice { input: input.clone(), offset: offset.clone(), len: len.clone() },
Self::MapFunction { input, function } => Self::MapFunction { input: input.clone(), function: function.clone() },
Self::Union { inputs, options, convert_supertypes } => Self::Union { inputs: inputs.clone(), options: options.clone(), convert_supertypes: *convert_supertypes },
Self::Union { inputs, args} => Self::Union { inputs: inputs.clone(), args: args.clone() },
Self::HConcat { inputs, schema, options } => Self::HConcat { inputs: inputs.clone(), schema: schema.clone(), options: options.clone() },
Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() },
Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() },
Expand Down
37 changes: 37 additions & 0 deletions crates/polars-plan/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,40 @@ impl Default for ProjectionOptions {
}
}
}

// Arguments given to `concat`. Differs from `UnionOptions` as the latter is IR state.
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct UnionArgs {
pub parallel: bool,
pub rechunk: bool,
pub to_supertypes: bool,
pub diagonal: bool,
// If it is a union from a scan over multiple files.
pub from_partitioned_ds: bool,
}

impl Default for UnionArgs {
fn default() -> Self {
Self {
parallel: true,
rechunk: true,
to_supertypes: false,
diagonal: false,
from_partitioned_ds: false,
}
}
}

impl From<UnionArgs> for UnionOptions {
fn from(args: UnionArgs) -> Self {
UnionOptions {
slice: None,
parallel: args.parallel,
rows: (None, 0),
from_partitioned_ds: args.from_partitioned_ds,
flattened_by_opt: false,
rechunk: args.rechunk,
}
}
}
19 changes: 8 additions & 11 deletions crates/polars-plan/src/logical_plan/tree_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,16 @@ impl<'a> TreeFmtNode<'a> {
vec![]
},
),
NL(
h,
Union {
inputs, options, ..
},
) => ND(
NL(h, Union { inputs, .. }) => ND(
wh(
h,
&(if let Some(slice) = options.slice {
format!("SLICED UNION: {slice:?}")
} else {
"UNION".to_string()
}),
// THis is commented out, but must be restored when we convert to IR's.
// &(if let Some(slice) = options.slice {
// format!("SLICED UNION: {slice:?}")
// } else {
// "UNION".to_string()
// }),
"UNION",
),
inputs
.iter()
Expand Down
4 changes: 4 additions & 0 deletions py-polars/src/functions/lazy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use polars::lazy::dsl;
use polars::prelude::*;
use polars_plan::prelude::UnionArgs;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
Expand Down Expand Up @@ -172,6 +173,7 @@ pub fn concat_lf(
rechunk,
parallel,
to_supertypes,
..Default::default()
},
)
.map_err(PyPolarsErr::from)?;
Expand Down Expand Up @@ -288,6 +290,7 @@ pub fn concat_lf_diagonal(
rechunk,
parallel,
to_supertypes,
..Default::default()
},
)
.map_err(PyPolarsErr::from)?;
Expand All @@ -309,6 +312,7 @@ pub fn concat_lf_horizontal(lfs: &PyAny, parallel: bool) -> PyResult<PyLazyFrame
rechunk: false, // No need to rechunk with horizontal concatenation
parallel,
to_supertypes: false,
..Default::default()
};
let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;
Ok(lf.into())
Expand Down
Loading