Skip to content

Commit

Permalink
Add u64 as index type (#2682)
Browse files Browse the repository at this point in the history
This allows polars to compile with a feature gate bigidx, which replaces the current limit of u32::MAX rows to u64::MAX rows in polars.

Now I am curious how long it will take before someone needs to compile polars this way. ;)

Note that the default index is faster and consumes less memory, so don't activate it unless you really need it.
  • Loading branch information
ritchie46 committed Feb 17, 2022
1 parent 6d16f64 commit 65fca73
Show file tree
Hide file tree
Showing 62 changed files with 567 additions and 483 deletions.
1 change: 1 addition & 0 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ thiserror = "^1.0"
[features]
strings = []
compute = ["arrow/compute_cast"]
bigidx = []
16 changes: 16 additions & 0 deletions polars/polars-arrow/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#[cfg(not(feature = "bigidx"))]
use arrow::array::UInt32Array;
#[cfg(feature = "bigidx")]
use arrow::array::UInt64Array;

pub trait IndexToUsize {
/// Translate the negative index to an offset.
fn negative_to_usize(self, index: usize) -> Option<usize>;
Expand All @@ -17,3 +22,14 @@ impl IndexToUsize for i64 {
}
}
}

/// The type used by polars to index data.
#[cfg(not(feature = "bigidx"))]
pub type IdxSize = u32;
#[cfg(feature = "bigidx")]
pub type IdxSize = u64;

#[cfg(not(feature = "bigidx"))]
pub type IdxArr = UInt32Array;
#[cfg(feature = "bigidx")]
pub type IdxArr = UInt64Array;
18 changes: 9 additions & 9 deletions polars/polars-arrow/src/kernels/list.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::index::IndexToUsize;
use crate::index::*;
use crate::kernels::take::take_unchecked;
use crate::trusted_len::PushUnchecked;
use crate::utils::CustomIterTools;
use arrow::array::{ArrayRef, ListArray, PrimitiveArray};
use arrow::array::{ArrayRef, ListArray};
use arrow::buffer::Buffer;

/// Get the indices that would result in a get operation on the lists values.
Expand All @@ -29,13 +29,13 @@ use arrow::buffer::Buffer;
/// [3, 5, 6]
///
/// ```
fn sublist_get_indexes(arr: &ListArray<i64>, index: i64) -> PrimitiveArray<u32> {
fn sublist_get_indexes(arr: &ListArray<i64>, index: i64) -> IdxArr {
let mut iter = arr.offsets().iter();

let mut cum_offset = 0u32;
let mut cum_offset: IdxSize = 0;

if let Some(mut previous) = iter.next().copied() {
let a: PrimitiveArray<u32> = iter
let a: IdxArr = iter
.map(|&offset| {
let len = offset - previous;
// make sure that empty lists don't get accessed
Expand All @@ -46,15 +46,15 @@ fn sublist_get_indexes(arr: &ListArray<i64>, index: i64) -> PrimitiveArray<u32>

let out = index
.negative_to_usize(len as usize)
.map(|idx| idx as u32 + cum_offset);
cum_offset += len as u32;
.map(|idx| idx as IdxSize + cum_offset);
cum_offset += len as IdxSize;
out
})
.collect_trusted();

a
} else {
PrimitiveArray::<u32>::from_slice(&[])
IdxArr::from_slice(&[])
}
}

Expand Down Expand Up @@ -87,7 +87,7 @@ pub fn array_to_unit_list(array: ArrayRef) -> ListArray<i64> {
#[cfg(test)]
mod test {
use super::*;
use arrow::array::Int32Array;
use arrow::array::{Int32Array, PrimitiveArray};
use arrow::buffer::Buffer;
use arrow::datatypes::DataType;
use std::sync::Arc;
Expand Down
27 changes: 12 additions & 15 deletions polars/polars-arrow/src/kernels/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;

/// # Safety
/// Does not do bounds checks
pub unsafe fn take_unchecked(arr: &dyn Array, idx: &UInt32Array) -> ArrayRef {
pub unsafe fn take_unchecked(arr: &dyn Array, idx: &IdxArr) -> ArrayRef {
use PhysicalType::*;
match arr.data_type().to_physical_type() {
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Expand Down Expand Up @@ -39,7 +39,7 @@ pub unsafe fn take_unchecked(arr: &dyn Array, idx: &UInt32Array) -> ArrayRef {
/// caller must ensure indices are in bounds
pub unsafe fn take_primitive_unchecked<T: NativeType>(
arr: &PrimitiveArray<T>,
indices: &UInt32Array,
indices: &IdxArr,
) -> Arc<PrimitiveArray<T>> {
let array_values = arr.values().as_slice();
let index_values = indices.values().as_slice();
Expand Down Expand Up @@ -89,7 +89,7 @@ pub unsafe fn take_primitive_unchecked<T: NativeType>(
/// caller must ensure indices are in bounds
pub unsafe fn take_no_null_primitive<T: NativeType>(
arr: &PrimitiveArray<T>,
indices: &UInt32Array,
indices: &IdxArr,
) -> Arc<PrimitiveArray<T>> {
debug_assert!(!arr.has_validity());
let array_values = arr.values().as_slice();
Expand Down Expand Up @@ -380,7 +380,7 @@ pub unsafe fn take_utf8_opt_iter_unchecked<I: IntoIterator<Item = Option<usize>>
/// caller must ensure indices are in bounds
pub unsafe fn take_utf8_unchecked(
arr: &LargeStringArray,
indices: &UInt32Array,
indices: &IdxArr,
) -> Arc<LargeStringArray> {
let data_len = indices.len();

Expand Down Expand Up @@ -502,8 +502,8 @@ pub unsafe fn take_utf8_unchecked(
/// No bounds checks
pub unsafe fn take_value_indices_from_list(
list: &ListArray<i64>,
indices: &UInt32Array,
) -> (UInt32Array, Vec<i64>) {
indices: &IdxArr,
) -> (IdxArr, Vec<i64>) {
let offsets = list.offsets().as_slice();

let mut new_offsets = Vec::with_capacity(indices.len());
Expand All @@ -528,7 +528,7 @@ pub unsafe fn take_value_indices_from_list(

// if start == end, this slot is empty
while curr < end {
values.push(curr as u32);
values.push(curr as IdxSize);
curr += 1;
}
}
Expand All @@ -547,7 +547,7 @@ pub unsafe fn take_value_indices_from_list(

// if start == end, this slot is empty
while curr < end {
values.push(curr as u32);
values.push(curr as IdxSize);
curr += 1;
}
} else {
Expand All @@ -556,10 +556,7 @@ pub unsafe fn take_value_indices_from_list(
}
}

(
PrimitiveArray::from_data(DataType::UInt32, values.into(), None),
new_offsets,
)
(IdxArr::from_data_default(values.into(), None), new_offsets)
}

#[cfg(test)]
Expand All @@ -570,13 +567,13 @@ mod test {
fn test_utf8_kernel() {
let s = LargeStringArray::from(vec![Some("foo"), None, Some("bar")]);
unsafe {
let out = take_utf8_unchecked(&s, &UInt32Array::from_slice(&[1, 2]));
let out = take_utf8_unchecked(&s, &IdxArr::from_slice(&[1, 2]));
assert!(out.is_null(0));
assert!(out.is_valid(1));
let out = take_utf8_unchecked(&s, &UInt32Array::from(vec![None, Some(2)]));
let out = take_utf8_unchecked(&s, &IdxArr::from(vec![None, Some(2)]));
assert!(out.is_null(0));
assert!(out.is_valid(1));
let out = take_utf8_unchecked(&s, &UInt32Array::from(vec![None, None]));
let out = take_utf8_unchecked(&s, &IdxArr::from(vec![None, None]));
assert!(out.is_null(0));
assert!(out.is_null(1));
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-arrow/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use crate::array::default_arrays::*;
pub use crate::array::*;
pub use crate::kernels::rolling::no_nulls::QuantileInterpolOptions;
pub use crate::{array::*, index::*};
use arrow::array::{ListArray, Utf8Array};

pub type LargeStringArray = Utf8Array<i64>;
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ dtype-categorical = []

parquet = ["arrow/io_parquet"]

# scale to terrabytes?
bigidx = ["polars-arrow/bigidx"]

docs-selection = [
"ndarray",
"is_in",
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-core/src/chunked_array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use crate::prelude::*;
use crate::utils::NoNull;

impl BooleanChunked {
pub fn arg_true(&self) -> UInt32Chunked {
// the allocation is probably cheaper as the filter is super fast
let ca: NoNull<UInt32Chunked> = (0u32..self.len() as u32).collect_trusted();
pub fn arg_true(&self) -> IdxCa {
let ca: NoNull<IdxCa> = (0..self.len() as IdxSize).collect_trusted();
ca.into_inner().filter(self).unwrap()
}
}
4 changes: 2 additions & 2 deletions polars/polars-core/src/chunked_array/kernels/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ pub(crate) fn take_primitive_opt_iter_n_chunks<
/// No bounds checks
pub(crate) unsafe fn take_list_unchecked(
values: &ListArray<i64>,
indices: &UInt32Array,
indices: &IdxArr,
) -> ListArray<i64> {
// taking the whole list or a contiguous sublist
let (list_indices, offsets) = take_value_indices_from_list(values, indices);

// tmp series so that we can take primitives from it
let s = Series::try_from(("", values.values().clone() as ArrayRef)).unwrap();
let taken = s
.take_unchecked(&UInt32Chunked::from_chunks(
.take_unchecked(&IdxCa::from_chunks(
"",
vec![Arc::new(list_indices) as ArrayRef],
))
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-core/src/chunked_array/object/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ mod test {
let values = &[Some(foo1), None, Some(foo2), None];
let ca = ObjectChunked::new("", values);

let groups =
GroupsProxy::Idx(vec![(0u32, vec![0u32, 1]), (2, vec![2]), (3, vec![3])].into());
let groups = GroupsProxy::Idx(vec![(0, vec![0, 1]), (2, vec![2]), (3, vec![3])].into());
let out = ca.agg_list(&groups).unwrap();
assert!(matches!(out.dtype(), DataType::List(_)));
assert_eq!(out.len(), groups.len());
Expand All @@ -214,7 +213,7 @@ mod test {
let values = &[Some(foo1.clone()), None, Some(foo2.clone()), None];
let ca = ObjectChunked::new("", values);

let groups = vec![(0u32, vec![0u32, 1]), (2, vec![2]), (3, vec![3])].into();
let groups = vec![(0, vec![0, 1]), (2, vec![2]), (3, vec![3])].into();
let out = ca.agg_list(&GroupsProxy::Idx(groups)).unwrap();
let a = out.explode().unwrap();

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/chunked_array/ops/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl ExplodeByOffsets for CategoricalChunked {
}

/// Convert Arrow array offsets to indexes of the original list
pub(crate) fn offsets_to_indexes(offsets: &[i64], capacity: usize) -> Vec<u32> {
pub(crate) fn offsets_to_indexes(offsets: &[i64], capacity: usize) -> Vec<IdxSize> {
let mut idx = Vec::with_capacity(capacity);

let mut count = 0;
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ pub trait ChunkUnique<T> {

/// Get first index of the unique values in a `ChunkedArray`.
/// This Vec is sorted.
fn arg_unique(&self) -> Result<UInt32Chunked>;
fn arg_unique(&self) -> Result<IdxCa>;

/// Number of unique values in the `ChunkedArray`
fn n_unique(&self) -> Result<usize> {
Expand Down Expand Up @@ -526,10 +526,10 @@ pub trait ChunkSort<T> {
fn sort(&self, reverse: bool) -> ChunkedArray<T>;

/// Retrieve the indexes needed to sort this array.
fn argsort(&self, reverse: bool) -> UInt32Chunked;
fn argsort(&self, reverse: bool) -> IdxCa;

/// Retrieve the indexes need to sort this and the other arrays.
fn argsort_multiple(&self, _other: &[Series], _reverse: &[bool]) -> Result<UInt32Chunked> {
fn argsort_multiple(&self, _other: &[Series], _reverse: &[bool]) -> Result<IdxCa> {
Err(PolarsError::InvalidOperation(
"argsort_multiple not implemented for this dtype".into(),
))
Expand Down Expand Up @@ -752,7 +752,7 @@ pub trait ArgAgg {
#[cfg_attr(docsrs, doc(cfg(feature = "repeat_by")))]
pub trait RepeatBy {
/// Repeat the values `n` times, where `n` is determined by the values in `by`.
fn repeat_by(&self, _by: &UInt32Chunked) -> ListChunked {
fn repeat_by(&self, _by: &IdxCa) -> ListChunked {
unimplemented!()
}
}
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/src/chunked_array/ops/repeat_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl<T> RepeatBy for ChunkedArray<T>
where
T: PolarsNumericType,
{
fn repeat_by(&self, by: &UInt32Chunked) -> ListChunked {
fn repeat_by(&self, by: &IdxCa) -> ListChunked {
let iter = self
.into_iter()
.zip(by.into_iter())
Expand All @@ -30,7 +30,7 @@ where
}
}
impl RepeatBy for BooleanChunked {
fn repeat_by(&self, by: &UInt32Chunked) -> ListChunked {
fn repeat_by(&self, by: &IdxCa) -> ListChunked {
let iter = self
.into_iter()
.zip(by.into_iter())
Expand All @@ -47,7 +47,7 @@ impl RepeatBy for BooleanChunked {
}
}
impl RepeatBy for Utf8Chunked {
fn repeat_by(&self, by: &UInt32Chunked) -> ListChunked {
fn repeat_by(&self, by: &IdxCa) -> ListChunked {
let iter = self
.into_iter()
.zip(by.into_iter())
Expand All @@ -66,7 +66,7 @@ impl RepeatBy for Utf8Chunked {

#[cfg(feature = "dtype-categorical")]
impl RepeatBy for CategoricalChunked {
fn repeat_by(&self, by: &UInt32Chunked) -> ListChunked {
fn repeat_by(&self, by: &IdxCa) -> ListChunked {
let mut ca = self.deref().repeat_by(by);
ca.categorical_map = self.categorical_map.clone();
ca
Expand Down

1 comment on commit 65fca73

@alexander-beedie
Copy link
Collaborator

@alexander-beedie alexander-beedie commented on 65fca73 Feb 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great that it's there for the outliers; it's a stretch, but not entirely out of the realm of possibility that we could hit this, heh... (1 billion rows down, 3 billion to go) :)

Please sign in to comment.