Skip to content

Commit

Permalink
python: parallel arrow FFI array conversion (#3155)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 16, 2022
1 parent ae6bfa5 commit c3d6f9a
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions py-polars/src/arrow_interop/to_rust.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::error::PyPolarsErr;
use polars_core::export::rayon::prelude::*;
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::utils::arrow::{array::ArrayRef, ffi};
use polars_core::POOL;
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;

Expand Down Expand Up @@ -37,15 +39,47 @@ pub fn to_rust_df(rb: &[&PyAny]) -> PyResult<DataFrame> {
let dfs = rb
.iter()
.map(|rb| {
let mut run_parallel = false;

let columns = (0..names.len())
.map(|i| {
let array = rb.call_method1("column", (i,))?;
let arr = array_to_rust(array)?;
let s =
Series::try_from((names[i].as_str(), arr)).map_err(PyPolarsErr::from)?;
Ok(s)
run_parallel |= matches!(
arr.data_type(),
ArrowDataType::Utf8 | ArrowDataType::Dictionary(_, _, _)
);
Ok(arr)
})
.collect::<PyResult<Vec<_>>>()?;

// we parallelize this part because we can have dtypes that are not zero copy
// for instance utf8 -> large-utf8
// dict encoded to categorical
let columns = if run_parallel {
POOL.install(|| {
columns
.into_par_iter()
.enumerate()
.map(|(i, arr)| {
let s = Series::try_from((names[i].as_str(), arr))
.map_err(PyPolarsErr::from)?;
Ok(s)
})
.collect::<PyResult<Vec<_>>>()
})
.collect::<PyResult<_>>()?;
} else {
columns
.into_iter()
.enumerate()
.map(|(i, arr)| {
let s = Series::try_from((names[i].as_str(), arr))
.map_err(PyPolarsErr::from)?;
Ok(s)
})
.collect::<PyResult<Vec<_>>>()
}?;

Ok(DataFrame::new(columns).map_err(PyPolarsErr::from)?)
})
.collect::<PyResult<Vec<_>>>()?;
Expand Down

0 comments on commit c3d6f9a

Please sign in to comment.