Skip to content

Commit

Permalink
add diagonal concat utility
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 5, 2021
1 parent ee26601 commit 1f85372
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 4 deletions.
1 change: 1 addition & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ diff = ["polars-core/diff", "polars-lazy/diff"]
moment = ["polars-core/moment", "polars-lazy/moment"]
arange = ["polars-lazy/arange"]
true_div = ["polars-lazy/true_div"]
diagonal_concat = ["polars-core/diagonal_concat"]

# don't use this
private = ["polars-lazy/private"]
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ list = []
rank = []
diff = []
moment = []
diagonal_concat = []


# opt-in datatypes for Series
Expand Down Expand Up @@ -123,6 +124,7 @@ docs-selection = [
"dtype-categorical",
"rank",
"list",
"diagonal_concat"
]

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/chunked_array/builder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ pub fn get_list_builder(
Box::new(builder)
}};
}
match_arrow_data_type_apply_macro!(
match_dtype_to_physical_apply_macro!(
physical_type,
get_primitive_builder,
get_utf8_builder,
Expand Down
74 changes: 74 additions & 0 deletions polars/polars-core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
#[cfg(feature = "sort_multiple")]
use crate::chunked_array::ops::sort::prepare_argsort;
use crate::prelude::*;
#[cfg(feature = "diagonal_concat")]
use crate::utils::concat_df;
#[cfg(feature = "diagonal_concat")]
use ahash::AHashSet;
use arrow::compute;
use arrow::types::simd::Simd;
use num::{Float, NumCast};
Expand Down Expand Up @@ -86,6 +90,7 @@ impl<'a> IterBroadCast<'a> {
/// The concatenated strings are separated by a `delimiter`.
/// If no `delimiter` is needed, an empty &str should be passed as argument.
#[cfg(feature = "concat_str")]
#[cfg_attr(docsrs, doc(cfg(feature = "concat_str")))]
pub fn concat_str(s: &[Series], delimiter: &str) -> Result<Utf8Chunked> {
if s.is_empty() {
return Err(PolarsError::NoData(
Expand Down Expand Up @@ -155,6 +160,42 @@ pub fn concat_str(s: &[Series], delimiter: &str) -> Result<Utf8Chunked> {
Ok(builder.finish())
}

/// Concat `[DataFrame]`s diagonally.
#[cfg(feature = "diagonal_concat")]
#[cfg_attr(docsrs, doc(cfg(feature = "diagonal_concat")))]
pub fn diag_concat_df(dfs: &[DataFrame]) -> Result<DataFrame> {
let upper_bound_width = dfs.iter().map(|df| df.width()).sum();
let mut column_names = AHashSet::with_capacity(upper_bound_width);
let mut schema = Vec::with_capacity(upper_bound_width);

for df in dfs {
df.get_columns().iter().for_each(|s| {
let name = s.name();
if column_names.insert(name) {
schema.push((name, s.dtype()))
}
});
}

let dfs = dfs
.iter()
.map(|df| {
let height = df.height();
let mut columns = Vec::with_capacity(schema.len());

for (name, dtype) in &schema {
match df.column(name).ok() {
Some(s) => columns.push(s.clone()),
None => columns.push(Series::full_null(name, height, dtype)),
}
}
DataFrame::new_no_checks(columns)
})
.collect::<Vec<_>>();

concat_df(&dfs)
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -183,4 +224,37 @@ mod test {
&[Some("foo_spam_literal"), Some("bar_ham_literal")]
);
}

#[test]
#[cfg(feature = "diagonal_concat")]
fn test_diag_concat() -> Result<()> {
let a = df![
"a" => [1, 2],
"b" => ["a", "b"]
]?;

let b = df![
"b" => ["a", "b"],
"c" => [1, 2]
]?;

let c = df![
"a" => [5, 7],
"c" => [1, 2],
"d" => [1, 2]
]?;

let out = diag_concat_df(&[a, b, c])?;

let expected = df![
"a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
"b" => [Some("a"), Some("b"), Some("a"), Some("b"), None, None],
"c" => [None, None, Some(1), Some(2), Some(1), Some(2)],
"d" => [None, None, None, None, Some(1), Some(2)]
]?;

assert!(out.frame_equal_missing(&expected));

Ok(())
}
}
1 change: 1 addition & 0 deletions polars/polars-core/src/series/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
pub mod diff;
#[cfg(feature = "moment")]
pub mod moment;
mod null;
mod to_list;

#[derive(Copy, Clone)]
Expand Down
43 changes: 43 additions & 0 deletions polars/polars-core/src/series/ops/null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::prelude::*;

impl Series {
pub fn full_null(name: &str, size: usize, dtype: &DataType) -> Self {
if dtype == &dtype.to_physical() {
macro_rules! primitive {
($type:ty) => {{
ChunkedArray::<$type>::full_null(name, size).into_series()
}};
}
macro_rules! bool {
() => {{
ChunkedArray::<BooleanType>::full_null(name, size).into_series()
}};
}
macro_rules! utf8 {
() => {{
ChunkedArray::<Utf8Type>::full_null(name, size).into_series()
}};
}
match_dtype_to_logical_apply_macro!(dtype, primitive, utf8, bool)
} else {
// match the logical types and create them
match dtype {
#[cfg(feature = "dtype-categorical")]
DataType::Categorical => CategoricalChunked::full_null(name, size).into_series(),
#[cfg(feature = "dtype-date")]
DataType::Date => Int32Chunked::full_null(name, size)
.into_date()
.into_series(),
#[cfg(feature = "dtype-datetime")]
DataType::Datetime => Int64Chunked::full_null(name, size)
.into_date()
.into_series(),
#[cfg(feature = "dtype-time")]
DataType::Time => Int64Chunked::full_null(name, size)
.into_time()
.into_series(),
dt => panic!("logical-type not yet implemented: {}", dt),
}
}
}
}
28 changes: 27 additions & 1 deletion polars/polars-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl<T: Default> Arena<T> {

/// Apply a macro on the Series
#[macro_export]
macro_rules! match_arrow_data_type_apply_macro {
macro_rules! match_dtype_to_physical_apply_macro {
($obj:expr, $macro:ident, $macro_utf8:ident, $macro_bool:ident $(, $opt_args:expr)*) => {{
match $obj {
DataType::Utf8 => $macro_utf8!($($opt_args)*),
Expand All @@ -285,6 +285,32 @@ macro_rules! match_arrow_data_type_apply_macro {
}};
}

/// Apply a macro on the Series
#[macro_export]
macro_rules! match_dtype_to_logical_apply_macro {
($obj:expr, $macro:ident, $macro_utf8:ident, $macro_bool:ident $(, $opt_args:expr)*) => {{
match $obj {
DataType::Utf8 => $macro_utf8!($($opt_args)*),
DataType::Boolean => $macro_bool!($($opt_args)*),
#[cfg(feature = "dtype-u8")]
DataType::UInt8 => $macro!(UInt8Type $(, $opt_args)*),
#[cfg(feature = "dtype-u16")]
DataType::UInt16 => $macro!(UInt16Type $(, $opt_args)*),
DataType::UInt32 => $macro!(UInt32Type $(, $opt_args)*),
DataType::UInt64 => $macro!(UInt64Type $(, $opt_args)*),
#[cfg(feature = "dtype-i8")]
DataType::Int8 => $macro!(Int8Type $(, $opt_args)*),
#[cfg(feature = "dtype-i16")]
DataType::Int16 => $macro!(Int16Type $(, $opt_args)*),
DataType::Int32 => $macro!(Int32Type $(, $opt_args)*),
DataType::Int64 => $macro!(Int64Type $(, $opt_args)*),
DataType::Float32 => $macro!(Float32Type $(, $opt_args)*),
DataType::Float64 => $macro!(Float64Type $(, $opt_args)*),
dt => panic!("not implemented for dtype {:?}", dt),
}
}};
}

/// Apply a macro on the Downcasted ChunkedArray's
#[macro_export]
macro_rules! match_arrow_data_type_apply_macro_ca {
Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ features = [
"arange",
"true_div",
"dtype-categorical",
"diagonal_concat"
]

#[patch.crates-io]
Expand Down
20 changes: 18 additions & 2 deletions py-polars/polars/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from polars.datatypes import py_type_to_polars_type
from polars.polars import concat_df as _concat_df
from polars.polars import concat_series as _concat_series
from polars.polars import py_diag_concat_df as _diag_concat_df

_DOCUMENTING = False
except ImportError:
Expand All @@ -30,7 +31,9 @@ def get_dummies(df: "pl.DataFrame") -> "pl.DataFrame":


def concat(
items: Union[Sequence["pl.DataFrame"], Sequence["pl.Series"]], rechunk: bool = True
items: Union[Sequence["pl.DataFrame"], Sequence["pl.Series"]],
rechunk: bool = True,
how: str = "vertical",
) -> Union["pl.DataFrame", "pl.Series"]:
"""
Aggregate all the Dataframes/Series in a List of DataFrames/Series to a single DataFrame/Series.
Expand All @@ -41,13 +44,26 @@ def concat(
DataFrames/Series to concatenate.
rechunk
rechunk the final DataFrame/Series.
how
Only used if the items are DataFrames.
On of {"vertical", "diagonal"}.
Vertical: Applies multiple `vstack` operations.
Diagonal: Finds a union between the column schemas and fills missing column values with null.
"""
if not len(items) > 0:
raise ValueError("cannot concat empty list")

out: Union["pl.Series", "pl.DataFrame"]
if isinstance(items[0], pl.DataFrame):
out = pl.wrap_df(_concat_df(items))
if how == "vertical":
out = pl.wrap_df(_concat_df(items))
elif how == "diagonal":
out = pl.wrap_df(_diag_concat_df(items))
else:
raise ValueError(
f"how should be one of {'vertical', 'diagonal'}, got {how}"
)
else:
out = pl.wrap_s(_concat_series(items))

Expand Down
16 changes: 16 additions & 0 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::prelude::{DataType, PyDataType};
use mimalloc::MiMalloc;
use polars_core::export::arrow::io::ipc::read::read_file_metadata;
use pyo3::types::PyDict;
use polars::functions::diag_concat_df;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down Expand Up @@ -165,6 +166,20 @@ fn concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
Ok(df.into())
}

#[pyfunction]
fn py_diag_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
let (seq, _len) = get_pyseq(dfs)?;
let iter = seq.iter()?;

let dfs = iter.map(|item| {
let item = item?;
get_df(item)
}).collect::<PyResult<Vec<_>>>()?;

let df = diag_concat_df(&dfs).map_err(PyPolarsEr::from)?;
Ok(df.into())
}

#[pyfunction]
fn concat_series(series: &PyAny) -> PyResult<PySeries> {
let (seq, _len) = get_pyseq(series)?;
Expand Down Expand Up @@ -258,5 +273,6 @@ fn polars(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(collect_all)).unwrap();
m.add_wrapped(wrap_pyfunction!(spearman_rank_corr)).unwrap();
m.add_wrapped(wrap_pyfunction!(map_mul)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_diag_concat_df)).unwrap();
Ok(())
}
18 changes: 18 additions & 0 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,3 +1213,21 @@ def test_diff_datetime():
)["timestamp"]

assert out[0] == out[1]


def test_diag_concat():
a = pl.DataFrame({"a": [1, 2]})
b = pl.DataFrame({"b": ["a", "b"], "c": [1, 2]})
c = pl.DataFrame({"a": [5, 7], "c": [1, 2], "d": [1, 2]})

out = pl.concat([a, b, c], how="diagonal")
expected = pl.DataFrame(
{
"a": [1, 2, None, None, 5, 7],
"b": [None, None, "a", "b", None, None],
"c": [None, None, 1, 2, 1, 2],
"d": [None, None, None, None, 1, 2],
}
)

assert out.frame_equal(expected, null_equal=True)

0 comments on commit 1f85372

Please sign in to comment.