Skip to content

Commit

Permalink
improve agg_list performance of chunked numerical data (#3351)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 10, 2022
1 parent 2d4defa commit 96cc3d9
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 117 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ coverage:
$(MAKE) -C py-polars venv; \
source py-polars/venv/bin/activate; \
$(MAKE) -C polars test; \
$(MAKE) -C polars integration-tests; \
$(MAKE) -C py-polars test-with-cov; \
cargo llvm-cov --no-run --lcov --output-path coverage.lcov; \
"
6 changes: 0 additions & 6 deletions polars/polars-arrow/src/bitmap/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
use arrow::bitmap::MutableBitmap;

pub trait MutableBitmapExtension {
/// Initializes a [`MutableBitmap`] with all values set to valid/ true.
fn from_len_set(length: usize) -> MutableBitmap {
let values = vec![u8::MAX; length.saturating_add(7) / 8];
MutableBitmap::from_vec(values, length)
}

fn as_slice_mut(&mut self) -> &mut [u8];

/// # Safety
Expand Down
190 changes: 97 additions & 93 deletions polars/polars-core/src/frame/groupby/aggregations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::POOL;
use arrow::bitmap::MutableBitmap;
use num::{Bounded, Num, NumCast, ToPrimitive, Zero};
use rayon::prelude::*;

Expand Down Expand Up @@ -757,63 +758,67 @@ where
ChunkedArray<T>: IntoSeries,
{
fn agg_list(&self, groups: &GroupsProxy) -> Series {
let ca = self.rechunk();

match groups {
GroupsProxy::Idx(groups) => {
let mut can_fast_explode = true;
let arr = match self.cont_slice() {
Ok(values) => {
let mut offsets = Vec::<i64>::with_capacity(groups.len() + 1);
let mut length_so_far = 0i64;
offsets.push(length_so_far);

let mut list_values = Vec::<T::Native>::with_capacity(self.len());
groups.iter().for_each(|(_, idx)| {
let idx_len = idx.len();
if idx_len == 0 {
can_fast_explode = false;
}
let arr = ca.downcast_iter().next().unwrap();
let values = arr.values();

length_so_far += idx_len as i64;
// Safety:
// group tuples are in bounds
unsafe {
list_values.extend(idx.iter().map(|idx| {
debug_assert!((*idx as usize) < values.len());
*values.get_unchecked(*idx as usize)
}));
// Safety:
// we know that offsets has allocated enough slots
offsets.push_unchecked(length_so_far);
}
});
let array = PrimitiveArray::from_data(
T::get_dtype().to_arrow(),
list_values.into(),
None,
);
let data_type =
ListArray::<i64>::default_datatype(T::get_dtype().to_arrow());
ListArray::<i64>::from_data(
data_type,
offsets.into(),
Arc::new(array),
None,
)
let mut offsets = Vec::<i64>::with_capacity(groups.len() + 1);
let mut length_so_far = 0i64;
offsets.push(length_so_far);

let mut list_values = Vec::<T::Native>::with_capacity(self.len());
groups.iter().for_each(|(_, idx)| {
let idx_len = idx.len();
if idx_len == 0 {
can_fast_explode = false;
}
_ => {
let mut builder = ListPrimitiveChunkedBuilder::<T::Native>::new(
self.name(),
groups.len(),
self.len(),
self.dtype().clone(),
);
for idx in groups.all().iter() {
let s = unsafe { self.take_unchecked(idx.into()).into_series() };
builder.append_series(&s);
}
return builder.finish().into_series();

length_so_far += idx_len as i64;
// Safety:
// group tuples are in bounds
unsafe {
list_values.extend(idx.iter().map(|idx| {
debug_assert!((*idx as usize) < values.len());
*values.get_unchecked(*idx as usize)
}));
// Safety:
// we know that offsets has allocated enough slots
offsets.push_unchecked(length_so_far);
}
});

let validity = if arr.null_count() > 0 {
let old_validity = arr.validity().unwrap();
let mut validity = MutableBitmap::from_len_set(list_values.len());

let mut count = 0;
groups.iter().for_each(|(_, idx)| unsafe {
for i in idx {
if !old_validity.get_bit_unchecked(*i as usize) {
validity.set_bit_unchecked(count, false)
}
count += 1;
}
});
Some(validity.into())
} else {
None
};

let array = PrimitiveArray::from_data(
T::get_dtype().to_arrow(),
list_values.into(),
validity,
);
let data_type = ListArray::<i64>::default_datatype(T::get_dtype().to_arrow());
let arr =
ListArray::<i64>::from_data(data_type, offsets.into(), Arc::new(array), None);

let mut ca = ListChunked::from_chunks(self.name(), vec![Arc::new(arr)]);
if can_fast_explode {
ca.set_fast_explode()
Expand All @@ -822,55 +827,54 @@ where
}
GroupsProxy::Slice(groups) => {
let mut can_fast_explode = true;
let arr = match self.cont_slice() {
Ok(values) => {
let mut offsets = Vec::<i64>::with_capacity(groups.len() + 1);
let mut length_so_far = 0i64;
offsets.push(length_so_far);
let arr = ca.downcast_iter().next().unwrap();
let values = arr.values();

let mut list_values = Vec::<T::Native>::with_capacity(self.len());
groups.iter().for_each(|&[first, len]| {
if len == 0 {
can_fast_explode = false;
}
let mut offsets = Vec::<i64>::with_capacity(groups.len() + 1);
let mut length_so_far = 0i64;
offsets.push(length_so_far);

length_so_far += len as i64;
list_values
.extend_from_slice(&values[first as usize..(first + len) as usize]);
unsafe {
// Safety:
// we know that offsets has allocated enough slots
offsets.push_unchecked(length_so_far);
}
});
let array = PrimitiveArray::from_data(
T::get_dtype().to_arrow(),
list_values.into(),
None,
);
let data_type =
ListArray::<i64>::default_datatype(T::get_dtype().to_arrow());
ListArray::<i64>::from_data(
data_type,
offsets.into(),
Arc::new(array),
None,
)
let mut list_values = Vec::<T::Native>::with_capacity(self.len());
groups.iter().for_each(|&[first, len]| {
if len == 0 {
can_fast_explode = false;
}
_ => {
let mut builder = ListPrimitiveChunkedBuilder::<T::Native>::new(
self.name(),
groups.len(),
self.len(),
self.dtype().clone(),
);
for &[first, len] in groups {
let s = self.slice(first as i64, len as usize).into_series();
builder.append_series(&s);
}
return builder.finish().into_series();

length_so_far += len as i64;
list_values.extend_from_slice(&values[first as usize..(first + len) as usize]);
unsafe {
// Safety:
// we know that offsets has allocated enough slots
offsets.push_unchecked(length_so_far);
}
});

let validity = if arr.null_count() > 0 {
let old_validity = arr.validity().unwrap();
let mut validity = MutableBitmap::from_len_set(list_values.len());

let mut count = 0;
groups.iter().for_each(|[first, len]| unsafe {
for i in *first..(*first + *len) {
if !old_validity.get_bit_unchecked(i as usize) {
validity.set_bit_unchecked(count, false)
}
count += 1;
}
});
Some(validity.into())
} else {
None
};

let array = PrimitiveArray::from_data(
T::get_dtype().to_arrow(),
list_values.into(),
validity,
);
let data_type = ListArray::<i64>::default_datatype(T::get_dtype().to_arrow());
let arr =
ListArray::<i64>::from_data(data_type, offsets.into(), Arc::new(array), None);
let mut ca = ListChunked::from_chunks(self.name(), vec![Arc::new(arr)]);
if can_fast_explode {
ca.set_fast_explode()
Expand Down
41 changes: 24 additions & 17 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,27 +306,34 @@ impl<'df> GroupBy<'df> {
}

pub fn keys_sliced(&self, slice: Option<(i64, usize)>) -> Vec<Series> {
#[allow(unused_assignments)]
// needed to keep the lifetimes valid for this scope
let mut groups_owned = None;

let groups = if let Some((offset, len)) = slice {
groups_owned = Some(self.groups.slice(offset, len));
groups_owned.as_deref().unwrap()
} else {
&self.groups
};

POOL.install(|| {
self.selected_keys
.par_iter()
.map(|s| {
#[allow(unused_assignments)]
// needed to keep the lifetimes valid for this scope
let mut groups_owned = None;

let groups = if let Some((offset, len)) = slice {
groups_owned = Some(self.groups.slice(offset, len));
groups_owned.as_deref().unwrap()
} else {
&self.groups
};

// Safety
// groupby indexes are in bound.
unsafe {
s.take_iter_unchecked(
&mut groups.idx_ref().iter().map(|(idx, _)| idx as usize),
)
match groups {
GroupsProxy::Idx(groups) => {
let mut iter = groups.iter().map(|(first, _idx)| first as usize);
// Safety:
// groups are always in bounds
unsafe { s.take_iter_unchecked(&mut iter) }
}
GroupsProxy::Slice(groups) => {
let mut iter = groups.iter().map(|&[first, _len]| first as usize);
// Safety:
// groups are always in bounds
unsafe { s.take_iter_unchecked(&mut iter) }
}
}
})
.collect()
Expand Down

0 comments on commit 96cc3d9

Please sign in to comment.