Skip to content

Commit

Permalink
fix rayon SO in partition_by (#3391)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 13, 2022
1 parent fad45bb commit e27a964
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
9 changes: 5 additions & 4 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl DataFrame {
if left_join && join_tuples.len() == self.height() {
self.clone()
} else {
self.take_unchecked_slice(join_tuples)
self.take_unchecked_slice(join_tuples, true)
}
}

Expand Down Expand Up @@ -497,7 +497,8 @@ impl DataFrame {
|| unsafe { self.create_left_df_from_slice(join_idx_left, false) },
|| unsafe {
// remove join columns
remove_selected(other, &selected_right).take_unchecked_slice(join_idx_right)
remove_selected(other, &selected_right)
.take_unchecked_slice(join_idx_right, true)
},
);
self.finish_join(df_left, df_right, suffix)
Expand Down Expand Up @@ -682,7 +683,7 @@ impl DataFrame {
other
.drop(s_right.name())
.unwrap()
.take_unchecked_slice(join_tuples_right)
.take_unchecked_slice(join_tuples_right, true)
},
);
self.finish_join(df_left, df_right, suffix)
Expand Down Expand Up @@ -840,7 +841,7 @@ impl DataFrame {
if let Some((offset, len)) = slice {
idx = slice_slice(idx, offset, len);
}
self.take_unchecked_slice(idx)
self.take_unchecked_slice(idx, true)
}

#[cfg(feature = "semi_anti_join")]
Expand Down
39 changes: 27 additions & 12 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1616,12 +1616,23 @@ impl DataFrame {
}

pub(crate) unsafe fn take_unchecked(&self, idx: &IdxCa) -> Self {
let cols = POOL.install(|| {
self.apply_columns_par(&|s| match s.dtype() {
DataType::Utf8 => s.take_unchecked_threaded(idx, true).unwrap(),
_ => s.take_unchecked(idx).unwrap(),
self.take_unchecked_impl(idx, true)
}

unsafe fn take_unchecked_impl(&self, idx: &IdxCa, allow_threads: bool) -> Self {
let cols = if allow_threads {
POOL.install(|| {
self.apply_columns_par(&|s| match s.dtype() {
DataType::Utf8 => s.take_unchecked_threaded(idx, true).unwrap(),
_ => s.take_unchecked(idx).unwrap(),
})
})
});
} else {
self.columns
.iter()
.map(|s| s.take_unchecked(idx).unwrap())
.collect()
};
DataFrame::new_no_checks(cols)
}

Expand Down Expand Up @@ -2903,13 +2914,15 @@ impl DataFrame {
DataFrame::new_no_checks(cols)
}

pub(crate) unsafe fn take_unchecked_slice(&self, idx: &[IdxSize]) -> Self {
/// Be careful with allowing threads when calling this in a large hot loop
/// every thread split may be on rayon stack and lead to SO
pub(crate) unsafe fn take_unchecked_slice(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
let ptr = idx.as_ptr() as *mut IdxSize;
let len = idx.len();

// create a temporary vec. we will not drop it.
let mut ca = IdxCa::from_vec("", Vec::from_raw_parts(ptr, len, len));
let out = self.take_unchecked(&ca);
let out = self.take_unchecked_impl(&ca, allow_threads);

// ref count of buffers should be one because we dropped all allocations
let arr = {
Expand Down Expand Up @@ -2938,19 +2951,21 @@ impl DataFrame {
self.groupby(cols)?.groups
};

Ok(POOL.install(move || {
// don't parallelize this
// there is a lot of parallelization in take and this may easily SO
POOL.install(|| {
match groups {
GroupsProxy::Idx(idx) => {
idx.into_par_iter().map(|(_, group)| {
Ok(idx.into_par_iter().map(|(_, group)| {
// groups are in bounds
unsafe { self.take_unchecked_slice(&group) }
})
unsafe { self.take_unchecked_slice(&group, false) }
}))
}
_ => {
unimplemented!()
}
}
}))
})
}

/// Split into multiple DataFrames partitioned by groups
Expand Down

0 comments on commit e27a964

Please sign in to comment.