Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Add take_arrays_with_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Jul 29, 2021
1 parent 84d98f2 commit fdc7bd6
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions src/compute/merge_sort/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//! let slices2 = merge_sort_slices(a2, a3);
//! let slices = merge_sort_slices(slices1, slices2);
//!
//! let array = take_arrays(&[a0, a1, a2, a3], slices, None);
//! let array = take_arrays(&[a0, a1, a2, a3], slices);
//! ```
//!
//! A common operation in query engines is to merge multiple fields based on the
Expand All @@ -49,8 +49,8 @@
//! ```rust,ignore
//! // `slices` computed before-hand
//! // in parallel
//! let array1 = take_arrays(&[a0, a1, a2, a3], slices, None);
//! let array2 = take_arrays(&[b0, b1, b2, b3], slices, None);
//! let array1 = take_arrays(&[a0, a1, a2, a3], slices);
//! let array2 = take_arrays(&[b0, b1, b2, b3], slices);
//! ```
//!
//! To serialize slices, e.g. for checkpointing or transfer via Arrow's IPC, you can store
Expand Down Expand Up @@ -89,11 +89,26 @@ type MergeSlice = (usize, usize, usize);
pub fn take_arrays<I: IntoIterator<Item = MergeSlice>>(
arrays: &[&dyn Array],
slices: I,
limit: Option<usize>,
) -> Box<dyn Array> {
let slices = slices.into_iter();
let len = arrays.iter().map(|array| array.len()).sum();
let limit = limit.unwrap_or(len);
let mut growable = make_growable(arrays, false, len);

for (index, start, len) in slices {
growable.extend(index, start, len)
}
growable.as_box()
}

/// Similar like take_arrays, but has limit check
pub fn take_arrays_with_limit<I: IntoIterator<Item = MergeSlice>>(
arrays: &[&dyn Array],
slices: I,
limit: usize,
) -> Box<dyn Array> {
let slices = slices.into_iter();
let len = arrays.iter().map(|array| array.len()).sum();
let limit = limit.min(len);
let mut growable = make_growable(arrays, false, limit);

let mut current_len = 0;
Expand Down Expand Up @@ -143,7 +158,10 @@ pub fn merge_sort(
let lhs = (0, 0, lhs.len());
let rhs = (1, 0, rhs.len());
let slices = merge_sort_slices(once(&lhs), once(&rhs), &comparator);
Ok(take_arrays(arrays, slices, limit))
Ok(match limit {
Some(limit) => take_arrays_with_limit(arrays, slices, limit),
None => take_arrays(arrays, slices),
})
}

/// Returns a vector of slices from different sorted arrays that can be used to create sorted arrays.
Expand Down Expand Up @@ -541,7 +559,7 @@ mod tests {

let slices = merge_sort_slices(once(&(0, 0, 5)), once(&(1, 0, 5)), &comparator);
// thus, they can be used to take from the arrays
let array = take_arrays(&arrays, slices, Some(5));
let array = take_arrays_with_limit(&arrays, slices, 5);

let expected = Int32Array::from_slice(&[0, 1, 2, 3, 4]);
// values are right
Expand Down Expand Up @@ -576,7 +594,7 @@ mod tests {
);

// thus, they can be used to take from the arrays
let array = take_arrays(&arrays, slices, None);
let array = take_arrays(&arrays, slices);

let expected = Int32Array::from_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);

Expand Down Expand Up @@ -646,7 +664,7 @@ mod tests {
let pairs = vec![(arrays0.as_ref(), &options), (arrays1.as_ref(), &options)];
let slices = slices(&pairs)?;

let array = take_arrays(&[array0, array1], slices, None);
let array = take_arrays(&[array0, array1], slices);

assert_eq!(expected, array.as_ref());
Ok(())
Expand Down

0 comments on commit fdc7bd6

Please sign in to comment.