Skip to content

Commit

Permalink
feat(rust, python): lazy diagonal concat. (#5647)
Browse files Browse the repository at this point in the history
Co-authored-by: Ritchie Vink <ritchie46@gmail.com>
  • Loading branch information
AnatolyBuga and ritchie46 committed Dec 5, 2022
1 parent 776945e commit beb3725
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 15 deletions.
2 changes: 1 addition & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pct_change = ["polars-core/pct_change", "polars-lazy/pct_change"]
moment = ["polars-core/moment", "polars-lazy/moment", "polars-ops/moment"]
arange = ["polars-lazy/arange"]
true_div = ["polars-lazy/true_div"]
diagonal_concat = ["polars-core/diagonal_concat"]
diagonal_concat = ["polars-core/diagonal_concat", "polars-lazy/diagonal_concat"]
horizontal_concat = ["polars-core/horizontal_concat"]
abs = ["polars-core/abs", "polars-lazy/abs"]
dynamic_groupby = ["polars-core/dynamic_groupby", "polars-lazy/dynamic_groupby"]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ pub fn hor_concat_df(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
#[cfg_attr(docsrs, doc(cfg(feature = "diagonal_concat")))]
/// Concat diagonally thereby combining different schemas.
pub fn diag_concat_df(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
// TODO! replace with lazy only?
let upper_bound_width = dfs.iter().map(|df| df.width()).sum();
let mut column_names = AHashSet::with_capacity(upper_bound_width);
let mut schema = Vec::with_capacity(upper_bound_width);
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ random = ["polars-plan/random"]
dynamic_groupby = ["polars-plan/dynamic_groupby", "polars-time", "temporal"]
ewma = ["polars-plan/ewma"]
dot_diagram = ["polars-plan/dot_diagram"]
diagonal_concat = []
unique_counts = ["polars-plan/unique_counts"]
log = ["polars-plan/log"]
list_eval = []
Expand Down
95 changes: 95 additions & 0 deletions polars/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,63 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
}
}

#[cfg(feature = "diagonal_concat")]
#[cfg_attr(docsrs, doc(cfg(feature = "diagonal_concat")))]
/// Concat [LazyFrame]s diagonally.
/// Calls [concat] internally.
pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
lfs: L,
rechunk: bool,
parallel: bool,
) -> PolarsResult<LazyFrame> {
let lfs = lfs.as_ref().to_vec();
let schemas = lfs
.iter()
.map(|lf| lf.schema())
.collect::<PolarsResult<Vec<_>>>()?;

let upper_bound_width = schemas.iter().map(|sch| sch.len()).sum();

// Use Vec to preserve order
let mut column_names = Vec::with_capacity(upper_bound_width);
let mut total_schema = Vec::with_capacity(upper_bound_width);

for sch in schemas.iter() {
sch.iter().for_each(|(name, dtype)| {
if !column_names.contains(name) {
column_names.push(name.clone());
total_schema.push((name.clone(), dtype.clone()));
}
});
}

let lfs_with_all_columns = lfs
.into_iter()
// Zip Frames with their Schemas
.zip(schemas.into_iter())
.map(|(mut lf, lf_schema)| {
for (name, dtype) in total_schema.iter() {
// If a name from Total Schema is not present - append
if lf_schema.get_field(name).is_none() {
lf = lf.with_column(NULL.lit().cast(dtype.clone()).alias(name))
}
}

// Now, reorder to match schema
let reordered_lf = lf.select(
column_names
.iter()
.map(|col_name| col(col_name))
.collect::<Vec<Expr>>(),
);

Ok(reordered_lf)
})
.collect::<PolarsResult<Vec<_>>>()?;

concat(lfs_with_all_columns, rechunk, parallel)
}

/// Concat multiple
pub fn concat<L: AsRef<[LazyFrame]>>(
inputs: L,
Expand All @@ -68,3 +125,41 @@ where

polars_core::POOL.install(|| iter.map(|lf| lf.collect()).collect())
}

#[cfg(test)]
mod test {
use super::*;

#[test]
#[cfg(feature = "diagonal_concat")]
fn test_diag_concat_lf() -> PolarsResult<()> {
let a = df![
"a" => [1, 2],
"b" => ["a", "b"]
]?;

let b = df![
"b" => ["a", "b"],
"c" => [1, 2]
]?;

let c = df![
"a" => [5, 7],
"c" => [1, 2],
"d" => [1, 2]
]?;

let out = diag_concat_lf(&[a.lazy(), b.lazy(), c.lazy()], false, false)?.collect()?;

let expected = df![
"a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
"b" => [Some("a"), Some("b"), Some("a"), Some("b"), None, None],
"c" => [None, None, Some(1), Some(2), Some(1), Some(2)],
"d" => [None, None, None, None, Some(1), Some(2)]
]?;

assert!(out.frame_equal_missing(&expected));

Ok(())
}
}
10 changes: 8 additions & 2 deletions py-polars/polars/internals/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from polars.polars import py_date_range as _py_date_range
from polars.polars import py_date_range_lazy as _py_date_range_lazy
from polars.polars import py_diag_concat_df as _diag_concat_df
from polars.polars import py_diag_concat_lf as _diag_concat_lf
from polars.polars import py_hor_concat_df as _hor_concat_df

_DOCUMENTING = False
Expand Down Expand Up @@ -235,13 +236,18 @@ def concat(
out = pli.wrap_df(_hor_concat_df(items))
else:
raise ValueError(
f"how must be one of {{'vertical', 'diagonal'}}, got {how}"
f"how must be one of {{'vertical', 'diagonal', 'horizontal'}}, "
f"got {how}"
)
elif isinstance(first, pli.LazyFrame):
if how == "vertical":
return pli.wrap_ldf(_concat_lf(items, rechunk, parallel))
if how == "diagonal":
return pli.wrap_ldf(_diag_concat_lf(items, rechunk, parallel))
else:
raise ValueError("Lazy only allows 'vertical' concat strategy.")
raise ValueError(
"Lazy only allows {{'vertical', 'diagonal'}} concat strategy."
)
elif isinstance(first, pli.Series):
out = pli.wrap_s(_concat_series(items))
elif isinstance(first, pli.Expr):
Expand Down
18 changes: 18 additions & 0 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,23 @@ fn py_diag_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
Ok(df.into())
}

#[pyfunction]
fn py_diag_concat_lf(lfs: &PyAny, rechunk: bool, parallel: bool) -> PyResult<PyLazyFrame> {
let (seq, _len) = get_pyseq(lfs)?;
let iter = seq.iter()?;

let lfs = iter
.map(|item| {
let item = item?;
get_lf(item)
})
.collect::<PyResult<Vec<_>>>()?;

let lf = polars::lazy::dsl::functions::diag_concat_lf(lfs, rechunk, parallel)
.map_err(PyPolarsErr::from)?;
Ok(lf.into())
}

#[pyfunction]
fn py_hor_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
let (seq, _len) = get_pyseq(dfs)?;
Expand Down Expand Up @@ -629,6 +646,7 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(spearman_rank_corr)).unwrap();
m.add_wrapped(wrap_pyfunction!(map_mul)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_diag_concat_df)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_diag_concat_lf)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_hor_concat_df)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_datetime)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_duration)).unwrap();
Expand Down
5 changes: 3 additions & 2 deletions py-polars/tests/unit/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,10 @@ def test_lazy_concat_err() -> None:
}
)

for how in ["horizontal", "diagonal"]:
for how in ["horizontal"]:
with pytest.raises(
ValueError, match="Lazy only allows 'vertical' concat strategy."
ValueError,
match="Lazy only allows {{'vertical', 'diagonal'}} concat strategy.",
):
pl.concat([df1.lazy(), df2.lazy()], how=how).collect()

Expand Down
23 changes: 13 additions & 10 deletions py-polars/tests/unit/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,20 @@ def test_diag_concat() -> None:
b = pl.DataFrame({"b": ["a", "b"], "c": [1, 2]})
c = pl.DataFrame({"a": [5, 7], "c": [1, 2], "d": [1, 2]})

out = pl.concat([a, b, c], how="diagonal")
expected = pl.DataFrame(
{
"a": [1, 2, None, None, 5, 7],
"b": [None, None, "a", "b", None, None],
"c": [None, None, 1, 2, 1, 2],
"d": [None, None, None, None, 1, 2],
}
)
for out in [
pl.concat([a, b, c], how="diagonal"),
pl.concat([a.lazy(), b.lazy(), c.lazy()], how="diagonal").collect(),
]:
expected = pl.DataFrame(
{
"a": [1, 2, None, None, 5, 7],
"b": [None, None, "a", "b", None, None],
"c": [None, None, 1, 2, 1, 2],
"d": [None, None, None, None, 1, 2],
}
)

assert out.frame_equal(expected, null_equal=True)
assert out.frame_equal(expected, null_equal=True)


def test_concat_horizontal() -> None:
Expand Down

0 comments on commit beb3725

Please sign in to comment.