Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion encodings/runend/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,14 @@ mod tests {
/// Filter unwrap one layer at a time so RunEnd's FilterKernel can fire.
#[test]
fn filter_sliced_run_end_preserves_encoding() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();

// 4 runs of 32 each = 128 rows. Large enough that FilterKernel takes
// the run-preserving path (true_count >= 25).
let values: Vec<i32> = [10, 20, 30, 40]
.iter()
.flat_map(|&v| std::iter::repeat_n(v, 32))
.collect();
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let arr = RunEnd::encode(PrimitiveArray::from_iter(values).into_array(), &mut ctx)?;

// Slice off the first 16 rows. Slice(RunEnd), 112 rows, 4 runs.
Expand Down
6 changes: 6 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21466,6 +21466,8 @@ pub vortex_array::ColumnarView::Constant(vortex_array::ArrayView<'a, vortex_arra

pub enum vortex_array::ExecutionStep

pub vortex_array::ExecutionStep::AppendChild(usize)

pub vortex_array::ExecutionStep::Done

pub vortex_array::ExecutionStep::ExecuteSlot(usize, vortex_array::DonePredicate)
Expand Down Expand Up @@ -22586,6 +22588,8 @@ pub struct vortex_array::ExecutionResult

impl vortex_array::ExecutionResult

pub fn vortex_array::ExecutionResult::append_child(array: impl vortex_array::IntoArray, slot_idx: usize) -> Self

pub fn vortex_array::ExecutionResult::array(&self) -> &vortex_array::ArrayRef

pub fn vortex_array::ExecutionResult::done(result: impl vortex_array::IntoArray) -> Self
Expand Down Expand Up @@ -25176,6 +25180,8 @@ pub fn vortex_session::VortexSession::create_execution_ctx(&self) -> vortex_arra

pub fn vortex_array::child_to_validity(child: &core::option::Option<vortex_array::ArrayRef>, nullability: vortex_array::dtype::Nullability) -> vortex_array::validity::Validity

pub fn vortex_array::execute_into_builder(array: vortex_array::ArrayRef, builder: alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>>

pub fn vortex_array::patches_child(patches: &vortex_array::patches::Patches, idx: usize) -> vortex_array::ArrayRef

pub fn vortex_array::patches_child_name(idx: usize) -> &'static str
Expand Down
71 changes: 53 additions & 18 deletions vortex-array/src/array/erased.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::ArrayHash;
use crate::ArrayView;
use crate::Canonical;
use crate::ExecutionCtx;
use crate::ExecutionResult;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VTable;
Expand Down Expand Up @@ -93,6 +94,12 @@ impl ArrayRef {
&self.0
}

/// Returns a reference to the inner Arc.
#[inline(always)]
pub(crate) fn inner_mut(&mut self) -> &mut Arc<dyn DynArray> {
&mut self.0
}

/// Consumes the array reference, returning the owned backing allocation.
#[inline(always)]
pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
Expand Down Expand Up @@ -431,10 +438,13 @@ impl ArrayRef {
self.with_slots(slots)
}

/// Take a slot for executor-owned physical rewrites. This has the result that the array may
/// either be taken or cloned from the parent.
/// Take a slot for executor-owned physical rewrites.
///
/// The array can be put back with [`put_slot_unchecked`].
/// On return the produced parent has the taken slot set to `None`
/// callers must put the slot back (typically via [`put_slot_unchecked`]) before the parent is
/// returned from the execution loop.
///
/// When the `Arc` was shared this allocates a fresh parent.
///
/// # Safety
/// The caller must put back a slot with the same logical dtype and length before exposing the
Expand All @@ -443,18 +453,27 @@ impl ArrayRef {
mut self,
slot_idx: usize,
) -> VortexResult<(ArrayRef, ArrayRef)> {
let child = if let Some(inner) = Arc::get_mut(&mut self.0) {
// # Safety: ensured by the caller.
unsafe { inner.slots_mut()[slot_idx].take() }
.vortex_expect("take_slot_unchecked cannot take an absent slot")
} else {
self.slots()[slot_idx]
.as_ref()
.vortex_expect("take_slot_unchecked cannot take an absent slot")
.clone()
};
if let Some(inner) = Arc::get_mut(&mut self.0) {
// SAFETY: ensured by the caller.
let child = unsafe { inner.slots_mut()[slot_idx].take() }
.vortex_expect("take_slot_unchecked cannot take an absent slot");
return Ok((self, child));
}

// Arc is shared: clone the child out and build a fresh parent with slot_idx = None,
// bypassing encoding-level validation so the absent slot does not panic `V::validate`.
let child = self.slots()[slot_idx]
.as_ref()
.vortex_expect("take_slot_unchecked cannot take an absent slot")
.clone();

let mut new_slots = self.slots().to_vec();
new_slots[slot_idx] = None;

Ok((self, child))
// SAFETY: ensured by the caller — the None slot is either put back or driven to completion
// via the builder path before the parent escapes the executor.
let new_parent = unsafe { self.0.with_slots_unchecked(&self, new_slots) };
Ok((new_parent, child))
}

/// Puts an array into `slot_idx` by either, cloning the inner array if the Arc is not exclusive
Expand Down Expand Up @@ -532,12 +551,28 @@ impl ArrayRef {
self.0.reduce_parent(self, parent, child_idx)
}

pub(crate) fn execute_encoding(
pub(crate) fn execute_encoding(self, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
let inner = Arc::as_ptr(&self.0);
// Safety the Arc outline the DynArray function call
unsafe { (&*inner).execute(self, ctx) }
}

/// Execute a single encoding step without applying `Done`-result postconditions.
///
/// This is for the iterative executor only. It may operate on suspended executor-private
/// arrays whose slots temporarily contain `None`, so the executor itself must interpret
/// `Done`, enforce any `len`/`dtype` invariants, and transfer statistics.
pub(crate) fn execute_encoding_unchecked(
self,
ctx: &mut ExecutionCtx,
) -> VortexResult<crate::ExecutionResult> {
let inner = Arc::clone(&self.0);
inner.execute(self, ctx)
) -> VortexResult<ExecutionResult> {
let inner = Arc::as_ptr(&self.0);
// Safety the Arc outline the DynArray function call
let inner = unsafe { &*inner };
// SAFETY: `inner` points at the allocation owned by `self.0`. `self` stays alive for the
// duration of the call, so the pointee remains valid. Avoiding an extra `Arc` clone here
// preserves uniqueness so execute-time metadata cursors can use `Arc::get_mut`.
unsafe { inner.execute_unchecked(self, ctx) }
}

pub fn execute_parent(
Expand Down
91 changes: 84 additions & 7 deletions vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
/// Returns the array as a reference to a generic [`Any`] trait object.
fn as_any(&self) -> &dyn Any;

/// Returns the array as a mutable reference to a generic [`Any`] trait object.
fn as_any_mut(&mut self) -> &mut dyn Any;

/// Converts an owned array allocation into an owned [`Any`] allocation for downcasting.
fn into_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn Any + Send + Sync>;

Expand Down Expand Up @@ -143,6 +146,24 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
/// Returns a new array with the given slots.
fn with_slots(&self, this: ArrayRef, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef>;

/// Returns a new array with the given slots, bypassing encoding-level validation.
///
/// Used by the executor to temporarily carry an array that has had one of its child slots
/// taken out (leaving `None`) without panicking `V::validate`. The caller must ensure the
/// missing slot is filled back in (via `put_slot_unchecked`) or driven to completion by the
/// builder path before the array becomes externally observable.
///
/// # Safety
///
/// The array returned may have slots whose content does not match the encoding's normal
/// invariants. Callers must re-establish those invariants before handing the array to
/// anything outside the executor.
unsafe fn with_slots_unchecked(
&self,
this: &ArrayRef,
slots: Vec<Option<ArrayRef>>,
) -> ArrayRef;

/// Attempt to reduce the array to a simpler representation.
fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>>;

Expand All @@ -155,8 +176,30 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
) -> VortexResult<Option<ArrayRef>>;

/// Execute the array by taking a single encoding-specific execution step.
///
/// This is the checked entry point. If the encoding reports
/// [`ExecutionStep::Done`](crate::ExecutionStep::Done), implementations must validate that the
/// returned array preserves this array's logical `len` and `dtype`, and must transfer this
/// array's statistics to the returned array.
fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult>;

/// Execute the array by taking a single encoding-specific execution step without applying
/// `Done`-result postconditions.
///
/// This exists for the iterative executor, which may call into `execute` on suspended
/// executor-private arrays whose slots temporarily contain `None`. In that mode the executor
/// itself is responsible for deciding when a `Done` result represents a real logical array,
/// enforcing any `len`/`dtype` invariants, and transferring statistics.
///
/// # Safety
/// The `array` returned should have it's `DType` and len checked
/// (optionally it should have its stats propagated from `this`).
unsafe fn execute_unchecked(
&self,
this: ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<ExecutionResult>;

/// Attempt to execute the parent of this array.
fn execute_parent(
&self,
Expand Down Expand Up @@ -203,6 +246,10 @@ impl<V: VTable> DynArray for ArrayInner<V> {
self
}

fn as_any_mut(&mut self) -> &mut dyn Any {
self
}

fn into_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn Any + Send + Sync> {
self
}
Expand Down Expand Up @@ -387,6 +434,26 @@ impl<V: VTable> DynArray for ArrayInner<V> {
.into_array())
}

unsafe fn with_slots_unchecked(
&self,
this: &ArrayRef,
slots: Vec<Option<ArrayRef>>,
) -> ArrayRef {
// SAFETY: we intentionally skip `V::validate` here. Caller guarantees that the resulting
// array is either repaired or not externally observed.
let inner = unsafe {
ArrayInner::<V>::from_data_unchecked(
self.vtable.clone(),
this.dtype().clone(),
self.len,
self.data.clone(),
slots,
self.stats.clone(),
)
};
ArrayRef::from_inner(std::sync::Arc::new(inner))
}

fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>> {
let view = unsafe { ArrayView::new_unchecked(this, &self.data) };
let Some(reduced) = V::reduce(view)? else {
Expand Down Expand Up @@ -437,12 +504,8 @@ impl<V: VTable> DynArray for ArrayInner<V> {
fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
let len = this.len();
let dtype = this.dtype().clone();
let stats = this.statistics().to_owned();

let typed = Array::<V>::try_from_array_ref(this)
.map_err(|_| vortex_err!("Failed to downcast array for execute"))
.vortex_expect("Failed to downcast array for execute");
let result = V::execute(typed, ctx)?;
let stats = this.statistics().to_array_stats();
let result = unsafe { self.execute_unchecked(this, ctx)? };

if matches!(result.step(), ExecutionStep::Done) {
if cfg!(debug_assertions) {
Expand All @@ -458,12 +521,26 @@ impl<V: VTable> DynArray for ArrayInner<V> {
);
}

result.array().statistics().set_iter(stats.into_iter());
result
.array()
.statistics()
.set_iter(crate::stats::StatsSet::from(stats).into_iter());
}

Ok(result)
}

unsafe fn execute_unchecked(
&self,
this: ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<ExecutionResult> {
let typed = Array::<V>::try_from_array_ref(this)
.map_err(|_| vortex_err!("Failed to downcast array for execute"))
.vortex_expect("Failed to downcast array for execute");
V::execute(typed, ctx)
}

fn execute_parent(
&self,
this: &ArrayRef,
Expand Down
8 changes: 8 additions & 0 deletions vortex-array/src/array/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ impl<V: VTable> Array<V> {
&self.downcast_inner().data
}

/// Try to fetch a mut ref to the inner ArrayData.
pub fn data_mut(&mut self) -> Option<&mut V::ArrayData> {
let m = self.inner.inner_mut();
let inner = Arc::get_mut(m)?;
let array_inner = inner.as_any_mut().downcast_mut::<ArrayInner<V>>();
Some(&mut array_inner?.data)
}

/// Returns the full typed array construction parts if this handle owns the allocation.
pub fn try_into_parts(self) -> Result<ArrayParts<V>, Self> {
let Self { inner, _phantom } = self;
Expand Down
Loading
Loading