Skip to content

Commit

Permalink
python fix segfault in groupby.apply
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 24, 2021
1 parent 78b8f10 commit 448f23f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
43 changes: 33 additions & 10 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,25 +1146,28 @@ impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
DataFrame::new(cols)
}

/// Apply a closure over the groups as a new DataFrame.
pub fn apply<F>(&self, f: F) -> Result<DataFrame>
where
F: Fn(DataFrame) -> Result<DataFrame> + Send + Sync,
{
let df = if let Some(agg) = &self.selected_agg {
fn prepare_apply(&self) -> Result<DataFrame> {
if let Some(agg) = &self.selected_agg {
if agg.is_empty() {
self.df.clone()
Ok(self.df.clone())
} else {
let mut new_cols = Vec::with_capacity(self.selected_keys.len() + agg.len());
new_cols.extend_from_slice(&self.selected_keys);
let cols = self.df.select_series(agg)?;
new_cols.extend(cols.into_iter());
DataFrame::new_no_checks(new_cols)
Ok(DataFrame::new_no_checks(new_cols))
}
} else {
self.df.clone()
};
Ok(self.df.clone())
}
}

/// Apply a closure over the groups as a new DataFrame in parallel.
pub fn par_apply<F>(&self, f: F) -> Result<DataFrame>
where
F: Fn(DataFrame) -> Result<DataFrame> + Send + Sync,
{
let df = self.prepare_apply()?;
let dfs = self
.get_groups()
.par_iter()
Expand All @@ -1178,6 +1181,26 @@ impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
df.as_single_chunk();
Ok(df)
}

/// Apply a closure over the groups as a new DataFrame.
pub fn apply<F>(&self, f: F) -> Result<DataFrame>
where
F: Fn(DataFrame) -> Result<DataFrame> + Send + Sync,
{
let df = self.prepare_apply()?;
let dfs = self
.get_groups()
.iter()
.map(|t| {
let sub_df = unsafe { df.take_iter_unchecked(t.1.iter().map(|i| *i as usize)) };
f(sub_df)
})
.collect::<Result<Vec<_>>>()?;

let mut df = accumulate_dataframes_vertical(dfs)?;
df.as_single_chunk();
Ok(df)
}
}

#[derive(Copy, Clone)]
Expand Down
8 changes: 5 additions & 3 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,10 +751,12 @@ impl PyDataFrame {
// Finally get the actual DataFrame
Ok(pydf.df)
};
// We don't use `py.allow_threads(|| gb.par_apply(..)` because that segfaulted
// due to code related to Pyo3 or rayon, cannot reproduce it in native polars
// so we lose parallelism, but it doesn't really matter because we are GIL bound anyways
// and this function should not be used in ideomatic polars anyway.
let df = gb.apply(function).map_err(PyPolarsEr::from)?;

let gil = Python::acquire_gil();
let py = gil.python();
let df = py.allow_threads(|| gb.apply(function).map_err(PyPolarsEr::from))?;
Ok(df.into())
}

Expand Down

0 comments on commit 448f23f

Please sign in to comment.