Skip to content

Commit

Permalink
python dict parallel dataframe creation (#3630)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 8, 2022
1 parent 37f4a46 commit 06ba4f3
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 22 deletions.
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ Parallelization
:toctree: api/

collect_all
threadpool_size
2 changes: 2 additions & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def version() -> str:
scan_parquet,
)
from polars.string_cache import StringCache
from polars.utils import threadpool_size

__all__ = [
"exceptions",
Expand Down Expand Up @@ -232,6 +233,7 @@ def version() -> str:
"from_pandas",
# testing
"testing",
"threadpool_size",
]

__version__ = version()
Expand Down
25 changes: 25 additions & 0 deletions py-polars/polars/internals/construction.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
polars_type_to_constructor,
py_type_to_constructor,
)
from polars.utils import threadpool_size

try:
from polars.polars import PyDataFrame, PySeries
Expand Down Expand Up @@ -398,6 +399,30 @@ def dict_to_pydf(
]
data_series = _handle_columns_arg(data_series, columns=columns)
return PyDataFrame(data_series)

all_numpy = True
for val in data.values():
# only start a thread pool from a reasonable size.
all_numpy = all_numpy and isinstance(val, np.ndarray) and len(val) > 1000
if not all_numpy:
break

if all_numpy:
# yes, multi-threading was easier in python here
# we cannot run multiple threads that run python code
# and release the gil in pyo3
# it will deadlock.

# dummy is threaded
import multiprocessing.dummy

pool_size = threadpool_size()
pool = multiprocessing.dummy.Pool(pool_size)
data_series = pool.map(
lambda t: pli.Series(t[0], t[1]).inner(), [(k, v) for k, v in data.items()]
)
return PyDataFrame(data_series)

# fast path
return PyDataFrame.read_dict(data)

Expand Down
14 changes: 14 additions & 0 deletions py-polars/polars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@

import numpy as np

try:
from polars.polars import pool_size as _pool_size

_DOCUMENTING = False
except ImportError: # pragma: no cover
_DOCUMENTING = True

from polars.datatypes import DataType, Date, Datetime

if sys.version_info >= (3, 10):
Expand Down Expand Up @@ -232,3 +239,10 @@ def format_path(path: Union[str, Path]) -> str:
Returns a string path, expanding the home directory if present.
"""
return os.path.expanduser(path)


def threadpool_size() -> int:
"""
Get the size of polars; thread pool
"""
return _pool_size()
7 changes: 7 additions & 0 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use polars::functions::{diag_concat_df, hor_concat_df};
use polars::prelude::Null;
use polars_core::datatypes::TimeUnit;
use polars_core::prelude::IntoSeries;
use polars_core::POOL;
use pyo3::panic::PanicException;
use pyo3::types::{PyBool, PyDict, PyFloat, PyInt, PyString};

Expand Down Expand Up @@ -446,6 +447,11 @@ fn as_struct(exprs: Vec<PyExpr>) -> PyExpr {
polars::lazy::dsl::as_struct(&exprs).into()
}

#[pyfunction]
fn pool_size() -> usize {
POOL.current_num_threads()
}

#[pymodule]
fn polars(py: Python, m: &PyModule) -> PyResult<()> {
m.add("NotFoundError", py.get_type::<NotFoundError>())
Expand Down Expand Up @@ -505,5 +511,6 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(max_exprs)).unwrap();
m.add_wrapped(wrap_pyfunction!(as_struct)).unwrap();
m.add_wrapped(wrap_pyfunction!(repeat)).unwrap();
m.add_wrapped(wrap_pyfunction!(pool_size)).unwrap();
Ok(())
}
47 changes: 25 additions & 22 deletions py-polars/src/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ macro_rules! init_method {
#[pymethods]
impl PySeries {
#[staticmethod]
pub fn $name(name: &str, val: &PyArray1<$type>, _strict: bool) -> PySeries {
unsafe {
PySeries {
series: Series::new(name, val.as_slice().unwrap()),
}
}
pub fn $name(
py: Python,
name: &str,
array: &PyArray1<$type>,
_strict: bool,
) -> PySeries {
let array = array.readonly();
let vals = array.as_slice().unwrap();
py.allow_threads(|| PySeries {
series: Series::new(name, vals),
})
}
}
};
Expand All @@ -58,41 +63,39 @@ init_method!(new_u64, u64);
#[pymethods]
impl PySeries {
#[staticmethod]
pub fn new_f32(name: &str, val: &PyArray1<f32>, nan_is_null: bool) -> PySeries {
// numpy array as slice is unsafe
unsafe {
pub fn new_f32(py: Python, name: &str, array: &PyArray1<f32>, nan_is_null: bool) -> PySeries {
let array = array.readonly();
let vals = array.as_slice().unwrap();
py.allow_threads(|| {
if nan_is_null {
let mut ca: Float32Chunked = val
.as_slice()
.expect("contiguous array")
let mut ca: Float32Chunked = vals
.iter()
.map(|&val| if f32::is_nan(val) { None } else { Some(val) })
.collect_trusted();
ca.rename(name);
ca.into_series().into()
} else {
Series::new(name, val.as_slice().unwrap()).into()
Series::new(name, vals).into()
}
}
})
}

#[staticmethod]
pub fn new_f64(name: &str, val: &PyArray1<f64>, nan_is_null: bool) -> PySeries {
// numpy array as slice is unsafe
unsafe {
pub fn new_f64(py: Python, name: &str, array: &PyArray1<f64>, nan_is_null: bool) -> PySeries {
let array = array.readonly();
let vals = array.as_slice().unwrap();
py.allow_threads(|| {
if nan_is_null {
let mut ca: Float64Chunked = val
.as_slice()
.expect("contiguous array")
let mut ca: Float64Chunked = vals
.iter()
.map(|&val| if f64::is_nan(val) { None } else { Some(val) })
.collect_trusted();
ca.rename(name);
ca.into_series().into()
} else {
Series::new(name, val.as_slice().unwrap()).into()
Series::new(name, vals).into()
}
}
})
}

pub fn struct_to_frame(&self) -> PyResult<PyDataFrame> {
Expand Down

0 comments on commit 06ba4f3

Please sign in to comment.