Skip to content

refactor(executors): improve function execution chain#84

Merged
eywalker merged 8 commits intodevfrom
eywalker/plt-920-clean-up-the-logic-around-function-node-function-pod-packet
Mar 14, 2026
Merged

refactor(executors): improve function execution chain#84
eywalker merged 8 commits intodevfrom
eywalker/plt-920-clean-up-the-logic-around-function-node-function-pod-packet

Conversation

@eywalker
Copy link
Copy Markdown
Contributor

@eywalker eywalker commented Mar 14, 2026

Summary

Implements the function execution chain improvements outlined in PLT-920. Cleans up the logic around function node, function pod, packet function, and executor — making the executor receive raw callables rather than knowing about packet function internals, and separating result caching from pipeline provenance tracking.

Executor immutability & cleanup

  • with_options() always returns a new instance (executors are immutable value objects)
  • Pipeline always calls with_options() per node — executor decides what to copy vs share
  • Removed execution_engine_opts from FunctionNode — pipeline executor-assignment logic is the sole owner

Type-safe executor dispatch (Generic[E] + __init_subclass__)

  • PacketFunctionBase is now Generic[E] with _executor: E | None — resolves executor protocol once at class definition time
  • PythonFunctionExecutorProtocol extends PacketFunctionExecutorProtocol, adding execute_callable(fn, kwargs, executor_options) / async_execute_callable
  • PythonPacketFunction routes call()/async_call() through execute_callable, keeping packet construction in the function
  • PacketFunctionWrapper stays generic (PacketFunctionBase[E]), preserving executor type constraints

Shared ResultCache

  • New ResultCache class extracts shared lookup/store/conflict-resolution logic
  • ResultCache.lookup accepts additional_constraints dict — extensibility hook for future match tier support (DESIGN_ISSUES P6)
  • Both CachedPacketFunction and CachedFunctionPod delegate to a ResultCache instance — no duplicated caching logic

CachedFunctionPod + FunctionNode refactor

  • New CachedFunctionPod — pod-level caching wrapper that intercepts process_packet(), caches by input packet hash only (function output depends solely on packet, not tag)
  • FunctionNode uses CachedFunctionPod for result caching, with separate pipeline provenance records via add_pipeline_record
  • Pipeline entry_id = hash(tag + system_tags + input_packet_hash) — ensures different source entries with same user tags are tracked separately
  • iter_packets Phase 2 skip check uses pipeline entry_ids (not just packet hash)
  • add_pipeline_record explicitly saves source columns of input packets (no data columns)
  • compute_pipeline_entry_id() extracted as reusable method
  • CachedFunctionPod.async_process_packet() does sync DB caching + async computation
  • LocalExecutor.execute_callable handles nested event loops safely

Test plan

  • 2116 tests passing (56 net new)
  • ResultCache: lookup miss/hit, conflict resolution, additional_constraints filtering, store columns, auto-flush, get_all_records
  • Executor immutability: with_options returns new instance, preserves state
  • PythonFunctionExecutorProtocol conformance, execute_callable with sync/async functions
  • Generic[E] dispatch: protocol resolution, rejection of non-conforming executors, inactive function + executor
  • CachedFunctionPod: cache miss/hit, same packet different tags = cache hit (packet-only key), inactive function, dual caching, output_schema delegation
  • Pipeline + result DB interaction: same packet different tags -> 1 result record, N pipeline records
  • Phase 1/2 with pipeline entry_ids: existing yields from DB, novel entry_ids computed, same packet + new tag triggers Phase 2
  • System tag awareness: different system tags -> different pipeline entry_ids
  • Pipeline records include source columns, exclude data columns of input
  • Pipeline records for same packet reference same result UUID
  • Per-node executor copy (each node gets own instance via with_options)

Closes PLT-920

eywalker and others added 5 commits March 14, 2026 05:25
- Make executors immutable: with_options() always returns a new instance
  (copy.copy for base, new LocalExecutor() for local, RayExecutor already
  did this)
- Remove execution_engine_opts from FunctionNode — pipeline executor
  assignment logic is now the sole owner of per-node configuration
- Add type-safe executor dispatch via Generic[E] + __init_subclass__ on
  PacketFunctionBase — resolves executor protocol once at class definition
  time, validates at set_executor() instead of in the hot path
- Add PythonFunctionExecutorProtocol with execute_callable/
  async_execute_callable — executors receive raw callables + kwargs
  instead of packet_function + packet objects
- PythonPacketFunction now routes call()/async_call() through
  execute_callable, keeping packet construction in the function
- Add CachedFunctionPod — pod-level caching wrapper that intercepts
  process_packet() with tag+packet content hash as cache key
- Add pod_cache_database parameter to function_pod decorator

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Generic[E] dispatch: wrapper doesn't resolve, protocol rejection for
  non-conforming executor, inactive function + executor returns None,
  async_call routes through async_execute_callable
- LocalExecutor callable: async fn via execute_callable, sync/async fn
  via async_execute_callable
- CachedFunctionPod: same tag+different packet cached separately, same
  tag+same packet is cache hit, inactive function doesn't store,
  output_schema delegation, dual caching (result_database +
  pod_cache_database)
- Pipeline: no opts uses engine directly (no with_options call)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- CachedFunctionPod now aligns with CachedPacketFunction storage: stores
  function variation data, execution data, cache entry hash, and timestamp
- Cache entry hash computed from tag + system tags + input packet hash
  (matching pipeline record entry_id pattern), ensuring two rows with
  identical user tags but different system tags are cached separately
- FunctionNode.attach_databases() creates CachedFunctionPod wrapping the
  function pod instead of CachedPacketFunction wrapping the packet function
- FunctionNode.process_packet() delegates to CachedFunctionPod for result
  caching and separately records pipeline provenance entries
- CachedFunctionPod.async_process_packet() does sync DB caching + async
  computation via inner pod's async_process_packet
- add_pipeline_record() now explicitly extracts source columns using
  select() instead of rename-then-drop pattern
- iter_packets Phase 2 skip-check uses single cache entry hash
- Added DESIGN_ISSUES.md note (CFP1) about potential optimization of
  reusing entry_hash between CachedFunctionPod and add_pipeline_record

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CachedFunctionPod now caches by input packet hash only — the function
output depends solely on the packet, not the tag. Tag-level uniqueness
(tag + system tags + input packet hash) is handled by FunctionNode's
pipeline record (add_pipeline_record / compute_pipeline_entry_id).

iter_packets Phase 2 skip check now uses pipeline entry_ids (which
include tag + system tags + packet hash) retrieved from the pipeline
database, ensuring correct deduplication when the same packet appears
with different tags/system_tags.

Also:
- Extracted compute_pipeline_entry_id() as a reusable method
- Updated DESIGN_ISSUES CFP1: shared ResultCache refactor suggestion
- Added TODO notes for match tier support (aligned with P6)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New test file covering the dual-database caching architecture:

- compute_pipeline_entry_id: determinism, tag/packet sensitivity
- System tag awareness: identical user tags with different system tags
  produce different pipeline entry_ids
- Result DB vs pipeline DB record counts: same packet/different tags →
  1 result record, N pipeline records
- Phase 1/2 with pipeline entry_ids: Phase 1 yields existing, Phase 2
  skips matching entry_ids, processes only novel combinations
- Same packet + new tag triggers Phase 2 (novel entry_id) even though
  CachedFunctionPod has a result cache hit
- Pipeline records reference same result UUID for identical packets
- Pipeline records include source columns but not data columns of input

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 14, 2026 06:57
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the function execution and caching stack to make executors effectively immutable, move execution configuration ownership into the pipeline, and introduce pod-level result caching/provenance improvements (PLT-920).

Changes:

  • Adds PythonFunctionExecutorProtocol + Generic[E]-based executor dispatch and shifts PythonPacketFunction execution to callable/kwargs routing.
  • Introduces CachedFunctionPod and refactors FunctionNode persistence to cache by input packet hash while tracking pipeline provenance via (tag + system_tags + packet_hash) entry IDs.
  • Removes per-node execution_engine_opts and applies pipeline-level execution_engine_opts via with_options().

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
tests/test_pipeline/test_pipeline.py Updates pipeline DB/result path assertions; adds callable execution hooks and new pipeline engine opts tests.
tests/test_pipeline/test_node_descriptors.py Updates serialized descriptor expectations to use CachedFunctionPod record paths.
tests/test_core/test_regression_fixes.py Extends test executor to support execute_callable.
tests/test_core/packet_function/test_executor.py Adds executor immutability tests; adds protocol conformance + generic dispatch tests; updates call-routing expectations.
tests/test_core/function_pod/test_function_pod_node.py Updates result record path assertion to CachedFunctionPod.
tests/test_core/function_pod/test_function_node_caching.py New integration tests for pipeline entry IDs, phase1/phase2 behavior, and record-count semantics.
tests/test_core/function_pod/test_function_node_attach_db.py Updates attach-db behavior tests to assert CachedFunctionPod wrapping.
tests/test_core/function_pod/test_cached_function_pod.py New unit tests for CachedFunctionPod caching semantics and decorator integration.
src/orcapod/types.py Updates PipelineConfig docs for new execution_engine_opts semantics.
src/orcapod/protocols/core_protocols/executor.py Tightens with_options() contract; introduces PythonFunctionExecutorProtocol.
src/orcapod/protocols/core_protocols/init.py Exports PythonFunctionExecutorProtocol.
src/orcapod/pipeline/graph.py Removes per-node engine opt merging; applies pipeline options via with_options().
src/orcapod/core/packet_function.py Adds Generic[E] resolution; adds set_executor() validation; routes Python execution via execute_callable.
src/orcapod/core/nodes/function_node.py Replaces CachedPacketFunction persistence with CachedFunctionPod; adds entry-id-based two-phase iteration and pipeline record cleanup.
src/orcapod/core/function_pod.py Adds decorator support for pod-level caching via pod_cache_database.
src/orcapod/core/executors/ray.py Adds execute_callable / async_execute_callable implementations.
src/orcapod/core/executors/local.py Adds callable execution implementations; updates with_options() to return new instance.
src/orcapod/core/executors/base.py Makes with_options() return a new instance by default; adds default callable execution methods.
src/orcapod/core/cached_function_pod.py New pod-level caching wrapper aligning stored columns with CachedPacketFunction.
DESIGN_ISSUES.md Documents follow-up refactor opportunity to deduplicate cache logic.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/orcapod/protocols/core_protocols/executor.py
Comment thread src/orcapod/core/packet_function.py
Comment thread src/orcapod/core/executors/local.py
Comment thread src/orcapod/pipeline/graph.py
Comment thread tests/test_core/function_pod/test_function_node_caching.py Outdated
Comment thread tests/test_core/function_pod/test_function_node_caching.py
Comment thread src/orcapod/core/function_pod.py Outdated
Comment thread src/orcapod/core/packet_function.py
Comment thread tests/test_pipeline/test_pipeline.py
…on and CachedFunctionPod

New ResultCache class owns lookup, store, conflict resolution, and
auto-flush logic. Both CachedPacketFunction and CachedFunctionPod
delegate to a ResultCache instance.

ResultCache.lookup accepts additional_constraints dict — the hook for
future match tier support (P6). Default lookup matches on
INPUT_PACKET_HASH_COL only; additional constraints can narrow the
match (e.g. by function variation hash).

Resolves DESIGN_ISSUES CFP1.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 14, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 91.39785% with 24 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/core/executors/ray.py 23.07% 10 Missing ⚠️
src/orcapod/core/cached_function_pod.py 92.15% 4 Missing ⚠️
src/orcapod/core/executors/local.py 87.50% 3 Missing ⚠️
src/orcapod/core/executors/base.py 71.42% 2 Missing ⚠️
src/orcapod/core/packet_function.py 96.29% 2 Missing ⚠️
src/orcapod/protocols/core_protocols/executor.py 71.42% 2 Missing ⚠️
src/orcapod/core/result_cache.py 98.41% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

eywalker and others added 2 commits March 14, 2026 07:11
- Lookup: miss on empty DB, miss on different packet, hit returns
  correct result with RESULT_COMPUTED_FLAG=False, different record
  paths are isolated
- Conflict resolution: most recent timestamp wins
- Additional constraints: non-matching constraint filters out,
  matching constraint (e.g. function_name) returns result
- Store: input_packet_hash, variation, execution, timestamp, output
  data columns all present
- Auto flush: default true, set_auto_flush, constructor param
- get_all_records: empty returns None, includes/excludes system columns

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- PythonFunctionExecutorProtocol now extends PacketFunctionExecutorProtocol
  (fixes type bound mismatch and ensures supports()/supported_function_type_ids
  are always available)
- PacketFunctionBase._executor typed as E | None (type-safe access to
  execute_callable on PythonPacketFunction without casts)
- LocalExecutor.execute_callable handles nested event loops (mirrors
  PythonPacketFunction._call_async_function_sync pattern)
- Pipeline._apply_execution_engine always calls with_options() per node
  — executor decides what to copy vs share
- Fixed stale docstring (pod_cache_database: "input packet content hash"
  not "tag+packet hash")
- Fixed type annotations: list[str] | None in test helpers,
  list[Any] for mock executor call lists
- Updated pipeline tests to check node's executor (not original mock)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@eywalker eywalker merged commit adf3f82 into dev Mar 14, 2026
4 checks passed
@eywalker eywalker deleted the eywalker/plt-920-clean-up-the-logic-around-function-node-function-pod-packet branch March 14, 2026 07:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants