Skip to content

Commit

Permalink
Add Scalar/Datum abstraction (apache#1047)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jun 9, 2023
1 parent 9b2b4ca commit 04cc386
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 13 deletions.
11 changes: 11 additions & 0 deletions arrow-array/src/array/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ impl BooleanArray {
Self { values, nulls }
}

/// Create a new [`BooleanArray`] with length `len` consisting only of nulls
pub fn new_null(len: usize) -> Self {
let buffer = MutableBuffer::from_len_zeroed(bit_util::ceil(len, 8));
let values = BooleanBuffer::new(buffer.into(), 0, len);
let nulls = NullBuffer::new_null(len);
Self {
values,
nulls: Some(nulls),
}
}

/// Returns the length of this array.
pub fn len(&self) -> usize {
self.values.len()
Expand Down
3 changes: 3 additions & 0 deletions arrow-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ pub use arithmetic::ArrowNativeTypeOp;
mod numeric;
pub use numeric::*;

mod scalar;
pub use scalar::*;

pub mod builder;
pub mod cast;
mod delta;
Expand Down
110 changes: 110 additions & 0 deletions arrow-array/src/scalar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::Array;

/// A possibly [`Scalar`] [`Array`]
///
/// This allows optimised binary kernels where one or more arguments are constant
///
/// ```
/// # use arrow_array::*;
/// # use arrow_buffer::{BooleanBuffer, MutableBuffer, NullBuffer};
/// # use arrow_schema::ArrowError;
/// #
/// fn eq_impl<T: ArrowPrimitiveType>(
/// a: &PrimitiveArray<T>,
/// a_scalar: bool,
/// b: &PrimitiveArray<T>,
/// b_scalar: bool,
/// ) -> BooleanArray {
/// let (array, scalar) = match (a_scalar, b_scalar) {
/// (true, true) | (false, false) => {
/// let len = a.len().min(b.len());
/// let nulls = NullBuffer::union(a.nulls(), b.nulls());
/// let buffer = BooleanBuffer::collect_bool(len, |idx| a.value(idx) == b.value(idx));
/// return BooleanArray::new(buffer, nulls);
/// }
/// (true, false) => (b, (a.null_count() == 0).then(|| a.value(0))),
/// (false, true) => (a, (b.null_count() == 0).then(|| b.value(0))),
/// };
/// match scalar {
/// Some(v) => {
/// let len = array.len();
/// let nulls = array.nulls().cloned();
/// let buffer = BooleanBuffer::collect_bool(len, |idx| array.value(idx) == v);
/// BooleanArray::new(buffer, nulls)
/// }
/// None => BooleanArray::new_null(array.len()),
/// }
/// }
///
/// pub fn eq(l: &dyn Datum, r: &dyn Datum) -> Result<BooleanArray, ArrowError> {
/// let (l_array, l_scalar) = l.get();
/// let (r_array, r_scalar) = r.get();
/// downcast_primitive_array!(
/// (l_array, r_array) => Ok(eq_impl(l_array, l_scalar, r_array, r_scalar)),
/// (a, b) => Err(ArrowError::NotYetImplemented(format!("{a} == {b}"))),
/// )
/// }
///
/// // Comparison of two arrays
/// let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
/// let b = Int32Array::from(vec![1, 2, 4, 7, 3]);
/// let r = eq(&a, &b).unwrap();
/// let values: Vec<_> = r.values().iter().collect();
/// assert_eq!(values, &[true, true, false, false, false]);
///
/// // Comparison of an array and a scalar
/// let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
/// let b = Int32Array::from(vec![1]);
/// let r = eq(&a, &Scalar::new(&b)).unwrap();
/// let values: Vec<_> = r.values().iter().collect();
/// assert_eq!(values, &[true, false, false, false, false]);
pub trait Datum {
/// Returns the value for this [`Datum`] and a boolean indicating if the value is scalar
fn get(&self) -> (&dyn Array, bool);
}

impl<T: Array> Datum for T {
fn get(&self) -> (&dyn Array, bool) {
(self, false)
}
}

/// A wrapper around a single value [`Array`] indicating kernels should treat it as a scalar value
///
/// See [`Datum`] for more information
pub struct Scalar<'a>(&'a dyn Array);

impl<'a> Scalar<'a> {
/// Create a new [`Scalar`] from an [`Array`]
///
/// # Panics
///
/// Panics if `array.len() != 1`
pub fn new(array: &'a dyn Array) -> Self {
assert_eq!(array.len(), 1);
Self(array)
}
}

impl<'a> Datum for Scalar<'a> {
fn get(&self) -> (&dyn Array, bool) {
(self.0, true)
}
}
10 changes: 10 additions & 0 deletions arrow-buffer/src/buffer/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ impl BooleanBuffer {
}
}

/// Create a new [`BooleanBuffer`] of the given `length` where all values are `false`
pub fn new_zeroed(len: usize) -> Self {
let buffer = MutableBuffer::new_null(len).into_buffer();
Self {
buffer,
offset: 0,
len,
}
}

/// Invokes `f` with indexes `0..len` collecting the boolean results into a new `BooleanBuffer`
pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, f: F) -> Self {
let buffer = MutableBuffer::collect_bool(len, f);
Expand Down
4 changes: 1 addition & 3 deletions arrow-buffer/src/buffer/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ impl NullBuffer {

/// Create a new [`NullBuffer`] of length `len` where all values are null
pub fn new_null(len: usize) -> Self {
let buffer = MutableBuffer::new_null(len).into_buffer();
let buffer = BooleanBuffer::new(buffer, 0, len);
Self {
buffer,
buffer: BooleanBuffer::new_zeroed(len),
null_count: len,
}
}
Expand Down
10 changes: 0 additions & 10 deletions arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,16 +321,6 @@ fn filter_array(
// actually filter
_ => downcast_primitive_array! {
values => Ok(Arc::new(filter_primitive(values, predicate))),
DataType::Decimal128(p, s) => {
let values = values.as_any().downcast_ref::<Decimal128Array>().unwrap();
let filtered = filter_primitive(values, predicate);
Ok(Arc::new(filtered.with_precision_and_scale(*p, *s).unwrap()))
}
DataType::Decimal256(p, s) => {
let values = values.as_any().downcast_ref::<Decimal256Array>().unwrap();
let filtered = filter_primitive(values, predicate);
Ok(Arc::new(filtered.with_precision_and_scale(*p, *s).unwrap()))
}
DataType::Boolean => {
let values = values.as_any().downcast_ref::<BooleanArray>().unwrap();
Ok(Arc::new(filter_boolean(values, predicate)))
Expand Down

0 comments on commit 04cc386

Please sign in to comment.