Skip to content

Commit

Permalink
fix(rust, python): fix freeze/stall when writing more than 2^31 strin…
Browse files Browse the repository at this point in the history
…g values to parquet (#5366)
  • Loading branch information
ritchie46 committed Nov 6, 2022
1 parent 1f45cb4 commit 609ac98
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 46 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ bitflags = "1.3"
[workspace.dependencies.arrow]
package = "arrow2"
# git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
# rev = "6c102a0c3e2dbeb185360dd3d5c3637b5e2028fd"
git = "https://github.com/ritchie46/arrow2"
# rev = "e106cff24dc0c8942603712d7332a97871dce44e"
# path = "../../../arrow2"
# branch = "comparison_and_validity"
branch = "2022_11_06"
version = "0.14.1"
default-features = false
features = [
Expand Down
4 changes: 1 addition & 3 deletions polars/polars-arrow/src/kernels/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ mod test {

let out = sublist_get_indexes(&arr, 1);
assert_eq!(
out.into_iter()
.map(|opt_v| opt_v.cloned())
.collect::<Vec<_>>(),
out.into_iter().collect::<Vec<_>>(),
&[None, None, None, Some(4), Some(7), Some(10)]
);
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-arrow/src/kernels/rolling/nulls/min_max.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::bitmap::utils::{count_zeros, zip_validity};
use arrow::bitmap::utils::{count_zeros, ZipValidityIter};
use nulls;
use nulls::{rolling_apply_agg_window, RollingAggWindowNulls};

Expand All @@ -9,7 +9,7 @@ pub fn is_reverse_sorted_max_nulls<T: NativeType + PartialOrd + IsFloat>(
validity: &Bitmap,
) -> bool {
let mut current_max = None;
for opt_v in zip_validity(values.iter(), Some(validity.iter())) {
for opt_v in ZipValidityIter::new(values.iter(), validity.iter()) {
match (current_max, opt_v) {
// do nothing
(None, None) => {}
Expand Down
11 changes: 9 additions & 2 deletions polars/polars-arrow/src/trusted_len/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod rev;
use std::iter::Scan;
use std::slice::Iter;

use arrow::bitmap::utils::{BitmapIter, ZipValidity};
use arrow::bitmap::utils::{BitmapIter, ZipValidity, ZipValidityIter};
pub use push_unchecked::*;
pub use rev::FromIteratorReversed;

Expand Down Expand Up @@ -66,7 +66,14 @@ unsafe impl<I: Iterator<Item = J>, J> TrustedLen for TrustMyLength<I, J> {}
unsafe impl<T> TrustedLen for std::ops::Range<T> where std::ops::Range<T>: Iterator {}
unsafe impl TrustedLen for arrow::array::Utf8ValuesIter<'_, i64> {}
unsafe impl TrustedLen for arrow::array::BinaryValueIter<'_, i64> {}
unsafe impl<T, I: TrustedLen + Iterator<Item = T>> TrustedLen for ZipValidity<'_, T, I> {}
unsafe impl<T, I: TrustedLen + Iterator<Item = T>, V: TrustedLen + Iterator<Item = bool>> TrustedLen
for ZipValidityIter<T, I, V>
{
}
unsafe impl<T, I: TrustedLen + Iterator<Item = T>, V: TrustedLen + Iterator<Item = bool>> TrustedLen
for ZipValidity<T, I, V>
{
}
unsafe impl TrustedLen for BitmapIter<'_> {}
unsafe impl<A: TrustedLen> TrustedLen for std::iter::StepBy<A> {}

Expand Down
42 changes: 30 additions & 12 deletions polars/polars-core/src/chunked_array/iterator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,18 +306,36 @@ impl<'a> IntoIterator for &'a ListChunked {
fn into_iter(self) -> Self::IntoIter {
let dtype = self.inner_dtype();

// we know that we only iterate over length == self.len()
unsafe {
Box::new(
self.downcast_iter()
.flat_map(|arr| arr.iter())
.trust_my_length(self.len())
.map(move |arr| {
arr.map(|arr| {
Series::from_chunks_and_dtype_unchecked("", vec![arr], &dtype)
})
}),
)
if self.null_count() == 0 {
// we know that we only iterate over length == self.len()
unsafe {
Box::new(
self.downcast_iter()
.flat_map(|arr| arr.iter().unwrap_required())
.trust_my_length(self.len())
.map(move |arr| {
Some(Series::from_chunks_and_dtype_unchecked(
"",
vec![arr],
&dtype,
))
}),
)
}
} else {
// we know that we only iterate over length == self.len()
unsafe {
Box::new(
self.downcast_iter()
.flat_map(|arr| arr.iter())
.trust_my_length(self.len())
.map(move |arr| {
arr.map(|arr| {
Series::from_chunks_and_dtype_unchecked("", vec![arr], &dtype)
})
}),
)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/chunked_array/ops/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ impl<'a> ChunkApply<'a, Series, Series> for ListChunked {
});
f(x)
});
let len = values.len();
let len = array.len();

// we know the iterators len
unsafe {
Expand Down
31 changes: 22 additions & 9 deletions polars/polars-core/src/series/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,34 @@ impl Series {
})
}
} else {
// TODO! null_count paths, but first exactsize iters must be implmeneted upstream
match dtype {
DataType::Utf8 => {
let arr = arr.as_any().downcast_ref::<Utf8Array<i64>>().unwrap();
Box::new(arr.iter().map(|value| match value {
Some(value) => AnyValue::Utf8(value),
None => AnyValue::Null,
})) as Box<dyn ExactSizeIterator<Item = AnyValue<'_>> + '_>
if arr.null_count() == 0 {
Box::new(arr.values_iter().map(AnyValue::Utf8))
as Box<dyn ExactSizeIterator<Item = AnyValue<'_>> + '_>
} else {
let zipvalid = arr.iter();
Box::new(zipvalid.unwrap_optional().map(|v| match v {
Some(value) => AnyValue::Utf8(value),
None => AnyValue::Null,
}))
as Box<dyn ExactSizeIterator<Item = AnyValue<'_>> + '_>
}
}
DataType::Boolean => {
let arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
Box::new(arr.iter().map(|value| match value {
Some(value) => AnyValue::Boolean(value),
None => AnyValue::Null,
})) as Box<dyn ExactSizeIterator<Item = AnyValue<'_>> + '_>
if arr.null_count() == 0 {
Box::new(arr.values_iter().map(AnyValue::Boolean))
as Box<dyn ExactSizeIterator<Item = AnyValue<'_>> + '_>
} else {
let zipvalid = arr.iter();
Box::new(zipvalid.unwrap_optional().map(|v| match v {
Some(value) => AnyValue::Boolean(value),
None => AnyValue::Null,
}))
as Box<dyn ExactSizeIterator<Item = AnyValue<'_>> + '_>
}
}
_ => Box::new(self.iter()),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ where
arr.values().iter().copied().fold_first_(min_or_max_fn)
} else {
arr.iter()
.unwrap_optional()
.map(|opt| opt.copied())
.fold_first_(|a, b| match (a, b) {
(Some(a), Some(b)) => Some(min_or_max_fn(a, b)),
Expand Down
37 changes: 23 additions & 14 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 609ac98

Please sign in to comment.