Skip to content

Commit

Permalink
Python: parallel concat df (#3671)
Browse files Browse the repository at this point in the history
  • Loading branch information
gunjunlee committed Jun 15, 2022
1 parent 26e142a commit e177a75
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl RevMappingBuilder {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum RevMapping {
/// Hashmap: maps the indexes from the global cache/categorical array to indexes in the local Utf8Array
/// Utf8Array: caches the string values
Expand Down
8 changes: 7 additions & 1 deletion polars/polars-core/src/frame/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl<'a> From<&AnyValue<'a>> for DataType {
Int8(_) => DataType::Int8,
Int16(_) => DataType::Int16,
#[cfg(feature = "dtype-categorical")]
Categorical(_, _) => DataType::Categorical(None),
Categorical(_, rev_map) => DataType::Categorical(Some(Arc::new((*rev_map).clone()))),
#[cfg(feature = "object")]
Object(o) => DataType::Object(o.type_name()),
}
Expand Down Expand Up @@ -347,6 +347,8 @@ pub(crate) enum AnyValueBuffer<'a> {
Float32(PrimitiveChunkedBuilder<Float32Type>),
Float64(PrimitiveChunkedBuilder<Float64Type>),
Utf8(Utf8ChunkedBuilder),
#[cfg(feature = "dtype-categorical")]
Categorical(CategoricalChunkedBuilder),
All(Vec<AnyValue<'a>>),
}

Expand Down Expand Up @@ -413,6 +415,8 @@ impl<'a> AnyValueBuffer<'a> {
Float32(b) => b.finish().into_series(),
Float64(b) => b.finish().into_series(),
Utf8(b) => b.finish().into_series(),
#[cfg(feature = "dtype-categorical")]
Categorical(b) => b.finish().into_series(),
All(vals) => Series::new("", vals),
}
}
Expand Down Expand Up @@ -440,6 +444,8 @@ impl From<(&DataType, usize)> for AnyValueBuffer<'_> {
Float32 => AnyValueBuffer::Float32(PrimitiveChunkedBuilder::new("", len)),
Float64 => AnyValueBuffer::Float64(PrimitiveChunkedBuilder::new("", len)),
Utf8 => AnyValueBuffer::Utf8(Utf8ChunkedBuilder::new("", len, len * 5)),
#[cfg(feature = "dtype-categorical")]
Categorical(_) => AnyValueBuffer::Categorical(CategoricalChunkedBuilder::new("", len)),
// Struct and List can be recursive so use anyvalues for that
_ => AnyValueBuffer::All(Vec::with_capacity(len)),
}
Expand Down
39 changes: 33 additions & 6 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use mimalloc::MiMalloc;
use polars::functions::{diag_concat_df, hor_concat_df};
use polars::prelude::Null;
use polars_core::datatypes::TimeUnit;
use polars_core::frame::row::Row;
use polars_core::prelude::DataFrame;
use polars_core::prelude::IntoSeries;
use polars_core::POOL;
use pyo3::panic::PanicException;
Expand Down Expand Up @@ -263,18 +265,43 @@ fn py_duration(
}

#[pyfunction]
fn concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
fn concat_df(dfs: &PyAny, py: Python) -> PyResult<PyDataFrame> {
use polars_core::utils::rayon::prelude::*;

let (seq, _len) = get_pyseq(dfs)?;
let mut iter = seq.iter()?;
let first = iter.next().unwrap()?;

let mut df = get_df(first)?;
let first_rdf = get_df(first)?;
let schema = first_rdf.schema();

for res in iter {
let item = res?;
let other = get_df(item)?;
df.vstack_mut(&other).map_err(PyPolarsErr::from)?;
let mut rdfs: Vec<polars_core::error::Result<DataFrame>> = vec![Ok(first_rdf)];

for item in iter {
let rdf = get_df(item?)?;
rdfs.push(Ok(rdf));
}

let identity = || DataFrame::from_rows_and_schema(&[Row::default()], &schema);

let df = py
.allow_threads(|| {
polars_core::POOL.install(|| {
rdfs.into_par_iter()
.fold(identity, |acc, df| {
let mut acc = acc?;
acc.vstack_mut(&df?)?;
Ok(acc)
})
.reduce(identity, |acc, df| {
let mut acc = acc?;
acc.vstack_mut(&df?)?;
Ok(acc)
})
})
})
.map_err(PyPolarsErr::from)?;

Ok(df.into())
}

Expand Down

0 comments on commit e177a75

Please sign in to comment.