Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ version = "0.1.0"
aho-corasick = "1.1.3"
anyhow = "1.0.97"
arbitrary = "1.3.2"
arc-swap = "1.8"
arc-swap = "1.9"
arcref = "0.2.0"
arrow-arith = "58"
arrow-array = "58"
Expand Down
44 changes: 41 additions & 3 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -13222,6 +13222,38 @@ pub vortex_array::normalize::NormalizeOptions::operation: vortex_array::normaliz

pub mod vortex_array::optimizer

pub mod vortex_array::optimizer::kernels

pub struct vortex_array::optimizer::kernels::ArrayKernels

impl vortex_array::optimizer::kernels::ArrayKernels

pub fn vortex_array::optimizer::kernels::ArrayKernels::contains_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id) -> bool

pub fn vortex_array::optimizer::kernels::ArrayKernels::empty() -> Self

pub fn vortex_array::optimizer::kernels::ArrayKernels::find_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id) -> core::option::Option<alloc::sync::Arc<vortex_array::optimizer::kernels::ReduceParentFn>>

pub fn vortex_array::optimizer::kernels::ArrayKernels::register_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id, f: vortex_array::optimizer::kernels::ReduceParentFn)

impl core::default::Default for vortex_array::optimizer::kernels::ArrayKernels

pub fn vortex_array::optimizer::kernels::ArrayKernels::default() -> vortex_array::optimizer::kernels::ArrayKernels

impl core::fmt::Debug for vortex_array::optimizer::kernels::ArrayKernels

pub fn vortex_array::optimizer::kernels::ArrayKernels::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub trait vortex_array::optimizer::kernels::ArrayKernelsExt: vortex_session::SessionExt

pub fn vortex_array::optimizer::kernels::ArrayKernelsExt::kernels(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::kernels::ArrayKernels>

impl<S: vortex_session::SessionExt> vortex_array::optimizer::kernels::ArrayKernelsExt for S

pub fn S::kernels(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::kernels::ArrayKernels>

pub type vortex_array::optimizer::kernels::ReduceParentFn = fn(child: &vortex_array::ArrayRef, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult<core::option::Option<vortex_array::ArrayRef>>

pub mod vortex_array::optimizer::rules

pub struct vortex_array::optimizer::rules::ParentReduceRuleAdapter<V, R>
Expand Down Expand Up @@ -13364,13 +13396,17 @@ pub trait vortex_array::optimizer::ArrayOptimizer

pub fn vortex_array::optimizer::ArrayOptimizer::optimize(&self) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self) -> vortex_error::VortexResult<vortex_array::ArrayRef>
pub fn vortex_array::optimizer::ArrayOptimizer::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult<vortex_array::ArrayRef>

impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef

pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::ArrayRef::optimize_recursive(&self) -> vortex_error::VortexResult<vortex_array::ArrayRef>
pub fn vortex_array::ArrayRef::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::ArrayRef::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub mod vortex_array::patches

Expand Down Expand Up @@ -22328,7 +22364,9 @@ impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef

pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::ArrayRef::optimize_recursive(&self) -> vortex_error::VortexResult<vortex_array::ArrayRef>
pub fn vortex_array::ArrayRef::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::ArrayRef::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult<vortex_array::ArrayRef>

impl vortex_array::scalar_fn::ReduceNode for vortex_array::ArrayRef

Expand Down
9 changes: 8 additions & 1 deletion vortex-array/src/arrays/extension/compute/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ impl ArrayParentReduceRule<Extension> for ExtensionFilterPushDownRule {

#[cfg(test)]
mod tests {
use std::sync::LazyLock;

use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_mask::Mask;
use vortex_session::VortexSession;

use crate::IntoArray;
#[expect(deprecated)]
Expand Down Expand Up @@ -108,6 +111,10 @@ mod tests {
use crate::scalar::ScalarValue;
use crate::scalar_fn::fns::binary::Binary;
use crate::scalar_fn::fns::operators::Operator;
use crate::session::ArraySession;

static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());

#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
struct TestExt;
Expand Down Expand Up @@ -220,7 +227,7 @@ mod tests {
.try_new_array(3, Operator::Lt, [constant_ext, ext_array])
.unwrap();

let optimized = scalar_fn_array.optimize_recursive().unwrap();
let optimized = scalar_fn_array.optimize_recursive(&SESSION).unwrap();
let scalar_fn = optimized.as_opt::<ScalarFn>().unwrap();
let children = scalar_fn.children();
let constant = children[0]
Expand Down
27 changes: 16 additions & 11 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers).
//! 4. **`execute`** -- the encoding's own decode step (most expensive).
//!
//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack
//! The main entry point is [`ArrayRef::execute_until`], which uses an explicit work stack
//! to drive execution iteratively without recursion. Between steps, the optimizer runs
//! reduce/reduce_parent rules to fixpoint.
//! reduce/reduce_parent rules to fixpoint using the active [`ExecutionCtx`] session, so
//! session-registered optimizer kernels participate during execution.
//!
//! See <https://docs.vortex.dev/developer-guide/internals/execution> for a full description
//! of the model.
Expand Down Expand Up @@ -88,17 +89,21 @@ impl ArrayRef {
///
/// Each iteration proceeds through three steps in order:
///
/// 1. **Done / canonical check** if `current` satisfies the active done predicate or is
/// 1. **Done / canonical check** - if `current` satisfies the active done predicate or is
/// canonical, splice it back into the stacked parent (if any) and continue, or return.
/// 2. **`execute_parent` on children** try each child's `execute_parent` against `current`
/// 2. **`execute_parent` on children** - try each child's `execute_parent` against `current`
/// as the parent (e.g. `Filter(RunEnd)` → `FilterExecuteAdaptor` fires from RunEnd).
/// If there is a stacked parent frame, the rewritten child is spliced back into it so
/// that optimize and further `execute_parent` can fire on the reconstructed parent
/// (e.g. `Slice(RunEnd)` → `RunEnd` spliced into stacked `Filter` → `Filter(RunEnd)`
/// whose `FilterExecuteAdaptor` fires on the next iteration).
/// 3. **`execute`** call the encoding's own execute step, which either returns `Done` or
/// 3. **`execute`** - call the encoding's own execute step, which either returns `Done` or
/// `ExecuteSlot(i)` to push a child onto the stack for focused execution.
///
/// Optimizer calls in this loop use [`ExecutionCtx::session`], so kernels registered on the
/// session's [`ArrayKernels`](crate::optimizer::kernels::ArrayKernels) are visible between
/// execution steps.
///
/// Note: the returned array may not match `M`. If execution converges to a canonical form
/// that does not match `M`, the canonical array is returned since no further execution
/// progress is possible.
Expand All @@ -110,7 +115,7 @@ impl ArrayRef {
let mut stack: Vec<StackFrame> = Vec::new();

for _ in 0..max_iterations() {
// Step 1: done / canonical splice back into stacked parent or return.
// Step 1: done / canonical - splice back into stacked parent or return.
let is_done = stack
.last()
.map_or(M::matches as DonePredicate, |frame| frame.done);
Expand All @@ -121,7 +126,7 @@ impl ArrayRef {
return Ok(current);
}
Some(frame) => {
current = frame.put_back(current)?.optimize()?;
current = frame.put_back(current)?.optimize_ctx(ctx.session())?;
continue;
}
}
Expand All @@ -137,9 +142,9 @@ impl ArrayRef {
"execute_parent rewrote {} -> {}",
current, rewritten
));
current = rewritten.optimize()?;
current = rewritten.optimize_ctx(ctx.session())?;
if let Some(frame) = stack.pop() {
current = frame.put_back(current)?.optimize()?;
current = frame.put_back(current)?.optimize_ctx(ctx.session())?;
}
continue;
}
Expand All @@ -158,7 +163,7 @@ impl ArrayRef {
));
let frame = StackFrame::new(parent, i, done, &child);
stack.push(frame);
current = child.optimize()?;
current = child.optimize_ctx(ctx.session())?;
}
ExecutionStep::Done => {
ctx.log(format_args!("Done: {}", array));
Expand Down Expand Up @@ -523,7 +528,7 @@ macro_rules! require_child {
/// execution of child `$idx`.
///
/// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone
/// `$parent` it is moved into the early-return path.
/// `$parent` - it is moved into the early-return path.
///
/// ```ignore
/// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive);
Expand Down
117 changes: 117 additions & 0 deletions vortex-array/src/optimizer/kernels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Session-scoped registry for optimizer kernels.
//!
//! [`ArrayKernels`] stores function pointers that participate in array optimization without
//! adding rules to an encoding vtable. The optimizer currently consults it for parent-reduce
//! rewrites before the child encoding's static `PARENT_RULES`. A registered function can
//! therefore add a rule for an extension encoding or take precedence over a built-in rule.
//!
//! Kernel entries are addressed by `(outer_id, child_id, kind)`. For parent-reduce kernels,
//! `outer_id` is the id returned by the parent array's `encoding_id()` and `child_id` is the
//! child array's `encoding_id()`. For [`ScalarFn`](crate::arrays::ScalarFn) parents, the parent
//! id is the scalar function id.
//!
//! Sessions created by the top-level `vortex` crate install an empty registry by default. Other
//! sessions can add it with [`VortexSession::with`](vortex_session::VortexSession::with) or rely
//! on [`ArrayKernelsExt::kernels`] to insert the default value.

use std::hash::BuildHasher;
use std::sync::Arc;
use std::sync::LazyLock;

use vortex_error::VortexResult;
use vortex_session::Ref;
use vortex_session::SessionExt;
use vortex_session::registry::FnRegistry;
use vortex_session::registry::Id;
use vortex_utils::aliases::DefaultHashBuilder;

use crate::ArrayRef;

/// Shared hasher used to combine `(outer, child, FnKind)` tuples into [`FnRegistry`] keys.
static FN_HASHER: LazyLock<DefaultHashBuilder> = LazyLock::new(DefaultHashBuilder::default);

/// Function pointer for a plugin-provided parent-reduce rewrite.
///
/// The optimizer calls this with the matched `child`, its `parent`, and the slot index where the
/// child appears. Return `Ok(Some(new_parent))` to replace the parent, or `Ok(None)` when the
/// rewrite does not apply.
///
/// Implementations must preserve the parent's logical length and dtype, matching the invariant
/// required of static parent-reduce rules.
pub type ReduceParentFn =
fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>>;

/// Disambiguates kernel kinds that share the same `(outer, child)` id pair.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[expect(unused)]
enum FnKind {
Reduce,
ReduceParent,
ExecuteParent,
Execute,
}

/// Session-scoped registry of optimizer kernel functions.
///
/// Use the typed `register_*`, `find_*`, and `contains_*` methods rather than depending on the
/// internal hash format.
#[derive(Debug, Default)]
pub struct ArrayKernels {
registry: FnRegistry,
}

impl ArrayKernels {
/// Create an empty [`ArrayKernels`] with no kernels registered.
pub fn empty() -> Self {
Self::default()
}

/// Register a [`ReduceParentFn`] for `(outer, child)`.
///
/// The optimizer will invoke `f` when it sees a parent with encoding id `outer` holding a
/// child with encoding id `child` during a `reduce_parent` step, before trying the child
/// encoding's static `PARENT_RULES`. `outer` is usually the parent array's encoding id. For
/// `ScalarFnArray`, it is the scalar function id, for example `Cast.id()`.
///
/// Replaces any function already registered for the same pair.
pub fn register_reduce_parent(&self, outer: Id, child: Id, f: ReduceParentFn) {
self.registry
.register(self.hash_fn_ids(outer, child, FnKind::ReduceParent), f)
}

/// Look up the [`ReduceParentFn`] registered for `(outer, child)`.
///
/// Returns an owned [`Arc`] so the session-variable borrow can be dropped before invoking the
/// function.
pub fn find_reduce_parent(&self, outer: Id, child: Id) -> Option<Arc<ReduceParentFn>> {
self.registry
.find(self.hash_fn_ids(outer, child, FnKind::ReduceParent))
}
Comment on lines +89 to +92
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be more than one?


/// Return `true` if a [`ReduceParentFn`] is registered for `(outer, child)`.
pub fn contains_reduce_parent(&self, outer: Id, child: Id) -> bool {
self.registry
.contains(self.hash_fn_ids(outer, child, FnKind::ReduceParent))
}
Comment on lines +95 to +98
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want one registry per find?


/// Combine a typed kernel id tuple into the `u64` key expected by the underlying
/// [`FnRegistry`]. All typed helpers use this path so registration and lookup agree.
fn hash_fn_ids(&self, outer: Id, child: Id, fn_kind: FnKind) -> u64 {
FN_HASHER.hash_one((outer, child, fn_kind))
}
}

/// Extension trait for accessing optimizer kernels from a
/// [`VortexSession`](vortex_session::VortexSession).
pub trait ArrayKernelsExt: SessionExt {
/// Returns the [`ArrayKernels`] session variable, inserting a default-constructed one if
/// none has been registered on the session yet.
fn kernels(&self) -> Ref<'_, ArrayKernels> {
self.get::<ArrayKernels>()
}
}

impl<S: SessionExt> ArrayKernelsExt for S {}
Loading
Loading