Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions vortex-array/src/array/erased.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,56 @@ impl ArrayRef {
self.with_slots(slots)
}

/// Take a slot for executor-owned physical rewrites. This has the result that the array may
/// either be taken or cloned from the parent.
///
/// The array can be put back with [`put_slot_unchecked`].
///
/// # Safety
/// The caller must put back a slot with the same logical dtype and length before exposing the
/// parent array, and must only use this for physical rewrites.
pub(crate) unsafe fn take_slot_unchecked(
mut self,
slot_idx: usize,
) -> VortexResult<(ArrayRef, ArrayRef)> {
let child = if let Some(inner) = Arc::get_mut(&mut self.0) {
// # Safety: ensured by the caller.
unsafe { inner.slots_mut()[slot_idx].take() }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safety comment?

.vortex_expect("take_slot_unchecked cannot take an absent slot")
} else {
self.slots()[slot_idx]
.as_ref()
.vortex_expect("take_slot_unchecked cannot take an absent slot")
.clone()
};

Ok((self, child))
}

/// Puts an array into `slot_idx` by either, cloning the inner array if the Arc is not exclusive
/// or replacing the slot in this `ArrayRef`.
/// This is the mirror of [`take_slot_unchecked`].
///
/// # Safety
/// The replacement must have the same logical dtype and length as the taken slot, and this
/// must only be used for physical rewrites.
pub(crate) unsafe fn put_slot_unchecked(
mut self,
slot_idx: usize,
replacement: ArrayRef,
) -> VortexResult<ArrayRef> {
if let Some(inner) = Arc::get_mut(&mut self.0) {
// # Safety: ensured by the caller.
unsafe { inner.slots_mut()[slot_idx] = Some(replacement) };
return Ok(self);
}

let mut slots = self.slots().to_vec();
slots[slot_idx] = Some(replacement);
let inner = Arc::clone(&self.0);
inner.with_slots(self, slots)
}

/// Returns a new array with the provided slots.
///
/// This is only valid for physical rewrites: slot count, presence, logical `DType`, and
Expand Down Expand Up @@ -611,6 +661,7 @@ impl<V: VTable> Matcher for V {

fn try_match<'a>(array: &'a ArrayRef) -> Option<ArrayView<'a, V>> {
let inner = array.0.as_any().downcast_ref::<ArrayInner<V>>()?;
// # Safety checked by `downcast_ref`.
Some(unsafe { ArrayView::new_unchecked(array, &inner.data) })
}
}
11 changes: 11 additions & 0 deletions vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
/// Returns the slots of the array.
fn slots(&self) -> &[Option<ArrayRef>];

/// Returns mutable slots of the array.
///
/// # Safety: any slot (Some(child)) that replaces an existing slot must have a compatible
/// DType and length. Currently compatible means equal, but there is no reason why that must
/// be the case.
unsafe fn slots_mut(&mut self) -> &mut [Option<ArrayRef>];
Comment thread
joseph-isaacs marked this conversation as resolved.

/// Returns the encoding ID of the array.
fn encoding_id(&self) -> ArrayId;

Expand Down Expand Up @@ -212,6 +219,10 @@ impl<V: VTable> DynArray for ArrayInner<V> {
&self.slots
}

unsafe fn slots_mut(&mut self) -> &mut [Option<ArrayRef>] {
&mut self.slots
}

fn encoding_id(&self) -> ArrayId {
self.vtable.id()
}
Expand Down
68 changes: 56 additions & 12 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::AnyCanonical;
use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::dtype::DType;
use crate::matcher::Matcher;
use crate::memory::HostAllocatorRef;
use crate::memory::MemorySessionExt;
Expand Down Expand Up @@ -107,22 +108,21 @@ impl ArrayRef {
/// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
let mut current = self.optimize()?;
// Stack frames: (parent, slot_idx, done_predicate_for_slot)
let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
let mut stack: Vec<StackFrame> = Vec::new();

for _ in 0..max_iterations() {
// Step 1: done / canonical — splice back into stacked parent or return.
let is_done = stack
.last()
.map_or(M::matches as DonePredicate, |frame| frame.2);
.map_or(M::matches as DonePredicate, |frame| frame.done);
if is_done(&current) || AnyCanonical::matches(&current) {
match stack.pop() {
None => {
ctx.log(format_args!("-> {}", current));
return Ok(current);
}
Some((parent, slot_idx, _)) => {
current = parent.with_slot(slot_idx, current)?.optimize()?;
Some(frame) => {
current = frame.put_back(current)?.optimize()?;
continue;
}
}
Expand All @@ -139,8 +139,8 @@ impl ArrayRef {
current, rewritten
));
current = rewritten.optimize()?;
if let Some((parent, slot_idx, _)) = stack.pop() {
current = parent.with_slot(slot_idx, current)?.optimize()?;
if let Some(frame) = stack.pop() {
current = frame.put_back(current)?.optimize()?;
}
continue;
}
Expand All @@ -150,14 +150,15 @@ impl ArrayRef {
let (array, step) = result.into_parts();
match step {
ExecutionStep::ExecuteSlot(i, done) => {
let child = array.slots()[i]
.clone()
.vortex_expect("ExecuteSlot index in bounds");
// SAFETY: we record the child's dtype and len, and assert they are preserved
// when the slot is put back via `put_slot_unchecked`.
let (parent, child) = unsafe { array.take_slot_unchecked(i) }?;
ctx.log(format_args!(
"ExecuteSlot({i}): pushing {}, focusing on {}",
array, child
parent, child
));
stack.push((array, i, done));
let frame = StackFrame::new(parent, i, done, &child);
stack.push(frame);
current = child.optimize()?;
}
ExecutionStep::Done => {
Expand All @@ -174,6 +175,49 @@ impl ArrayRef {
}
}

/// A stack frame for the iterative executor, tracking the parent array whose slot is being
/// executed and the original child's dtype/len for validation on put-back.
struct StackFrame {
parent: ArrayRef,
slot_idx: usize,
done: DonePredicate,
original_dtype: DType,
original_len: usize,
}

impl StackFrame {
fn new(parent: ArrayRef, slot_idx: usize, done: DonePredicate, child: &ArrayRef) -> Self {
Self {
parent,
slot_idx,
done,
original_dtype: child.dtype().clone(),
original_len: child.len(),
}
}

fn put_back(self, replacement: ArrayRef) -> VortexResult<ArrayRef> {
debug_assert_eq!(
replacement.dtype(),
&self.original_dtype,
"slot {} dtype changed from {} to {} during execution",
self.slot_idx,
self.original_dtype,
replacement.dtype()
);
debug_assert_eq!(
replacement.len(),
self.original_len,
"slot {} len changed from {} to {} during execution",
self.slot_idx,
self.original_len,
replacement.len()
);
// SAFETY: we assert above that dtype and len are preserved.
unsafe { self.parent.put_slot_unchecked(self.slot_idx, replacement) }
}
}

/// Execution context for batch CPU compute.
#[derive(Debug, Clone)]
pub struct ExecutionCtx {
Expand Down
Loading