Skip to content

Commit

Permalink
Python: allow_threads in collect_all
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 11, 2021
1 parent 6c876bc commit 4d5facf
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
10 changes: 5 additions & 5 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ impl PyLazyFrame {
ldf.cache().into()
}

pub fn collect(&self) -> PyResult<PyDataFrame> {
let ldf = self.ldf.clone();
let gil = Python::acquire_gil();
let py = gil.python();
pub fn collect(&self, py: Python) -> PyResult<PyDataFrame> {
// if we don't allow threads and we have udfs trying to acquire the gil from different
// threads we deadlock.
let df = py.allow_threads(|| ldf.collect().map_err(PyPolarsEr::from))?;
let df = py.allow_threads(|| {
let ldf = self.ldf.clone();
ldf.collect().map_err(PyPolarsEr::from)
})?;
Ok(df.into())
}

Expand Down
21 changes: 12 additions & 9 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,19 @@ fn ipc_schema(py: Python, py_f: PyObject) -> PyResult<PyObject> {
}

#[pyfunction]
fn collect_all(lfs: Vec<PyLazyFrame>) -> PyResult<Vec<PyDataFrame>> {
fn collect_all(lfs: Vec<PyLazyFrame>, py: Python) -> PyResult<Vec<PyDataFrame>> {
use polars_core::utils::rayon::prelude::*;
let out = polars_core::POOL.install(|| {
lfs.par_iter()
.map(|lf| {
let df = lf.ldf.clone().collect()?;
Ok(PyDataFrame::new(df))
})
.collect::<polars_core::error::Result<Vec<_>>>()
.map_err(PyPolarsEr::from)

let out = py.allow_threads(|| {
polars_core::POOL.install(|| {
lfs.par_iter()
.map(|lf| {
let df = lf.ldf.clone().collect()?;
Ok(PyDataFrame::new(df))
})
.collect::<polars_core::error::Result<Vec<_>>>()
.map_err(PyPolarsEr::from)
})
});

Ok(out?)
Expand Down

0 comments on commit 4d5facf

Please sign in to comment.