Skip to content
19 changes: 19 additions & 0 deletions DESIGN_ISSUES.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,25 @@ which column groups (meta, source, system_tags) are returned.

---

## `src/orcapod/core/cached_function_pod.py` / `src/orcapod/core/packet_function.py`

### CFP1 — Extract shared result caching logic from CachedPacketFunction and CachedFunctionPod
**Status:** resolved
**Severity:** medium

`CachedPacketFunction` and `CachedFunctionPod` implement nearly identical result caching
logic: DB lookup by `INPUT_PACKET_HASH_COL`, conflict resolution by most-recent timestamp,
record storage with variation/execution/timestamp columns, and a `RESULT_COMPUTED_FLAG`
meta column. The match tier / matching policy design (P6) will also need to apply to both.

**Fix:** Extracted `ResultCache` class (`src/orcapod/core/result_cache.py`) that owns the DB,
record path, lookup (with `additional_constraints` for future match tiers), store, conflict
resolution, and auto-flush logic. Both `CachedPacketFunction` and `CachedFunctionPod` now
delegate to a `ResultCache` instance. The match tier strategy (P6) can be implemented once
on `ResultCache.lookup` via `additional_constraints`.

---

## `src/orcapod/core/nodes/function_node.py`

### FN1 — `FunctionNode.async_execute` Phase 2 was fully sequential
Expand Down
154 changes: 154 additions & 0 deletions src/orcapod/core/cached_function_pod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""CachedFunctionPod — pod-level caching wrapper that intercepts process_packet()."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

from orcapod.core.function_pod import WrappedFunctionPod
from orcapod.core.result_cache import ResultCache
from orcapod.protocols.core_protocols import (
FunctionPodProtocol,
PacketProtocol,
StreamProtocol,
TagProtocol,
)
from orcapod.protocols.database_protocols import ArrowDatabaseProtocol

if TYPE_CHECKING:
import pyarrow as pa

logger = logging.getLogger(__name__)


class CachedFunctionPod(WrappedFunctionPod):
"""Pod-level caching wrapper that intercepts ``process_packet()``.

Caches at the ``process_packet(tag, packet)`` level using only the
**input packet content hash** as the cache key — the output of a
packet function depends solely on the packet, not the tag.

Tag-level provenance tracking (tag + system tags + packet hash) is
handled separately by ``FunctionNode.add_pipeline_record``.

Uses a shared ``ResultCache`` for lookup/store/conflict-resolution
logic (same mechanism as ``CachedPacketFunction``).
"""

# Expose RESULT_COMPUTED_FLAG from the shared ResultCache
RESULT_COMPUTED_FLAG = ResultCache.RESULT_COMPUTED_FLAG

def __init__(
self,
function_pod: FunctionPodProtocol,
result_database: ArrowDatabaseProtocol,
record_path_prefix: tuple[str, ...] = (),
auto_flush: bool = True,
**kwargs,
) -> None:
super().__init__(function_pod, **kwargs)
self._record_path_prefix = record_path_prefix
self._cache = ResultCache(
result_database=result_database,
record_path=record_path_prefix + self.uri,
auto_flush=auto_flush,
)

@property
def _result_database(self) -> ArrowDatabaseProtocol:
"""The underlying result database (for FunctionNode access)."""
return self._cache.result_database

@property
def record_path(self) -> tuple[str, ...]:
"""Return the path to the cached records in the result store."""
return self._cache.record_path

def process_packet(
self, tag: TagProtocol, packet: PacketProtocol
) -> tuple[TagProtocol, PacketProtocol | None]:
"""Process a packet with pod-level caching.

The cache key is the input packet content hash only — the function
output depends solely on the packet, not the tag. The output
packet carries a ``RESULT_COMPUTED_FLAG`` meta value: ``True`` if
freshly computed, ``False`` if retrieved from cache.

Args:
tag: The tag associated with the packet.
packet: The input packet to process.

Returns:
A ``(tag, output_packet)`` tuple; output_packet is ``None``
if the inner function filters the packet out.
"""
cached = self._cache.lookup(packet)
if cached is not None:
logger.info("Pod-level cache hit")
return tag, cached

tag, output = self._function_pod.process_packet(tag, packet)
if output is not None:
pf = self._function_pod.packet_function
self._cache.store(
packet,
output,
variation_data=pf.get_function_variation_data(),
execution_data=pf.get_execution_data(),
)
output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True})
return tag, output

async def async_process_packet(
self, tag: TagProtocol, packet: PacketProtocol
) -> tuple[TagProtocol, PacketProtocol | None]:
"""Async counterpart of ``process_packet``.

DB lookup and store are synchronous (DB protocol is sync), but the
actual computation uses the inner pod's ``async_process_packet``
for true async execution.
"""
cached = self._cache.lookup(packet)
if cached is not None:
logger.info("Pod-level cache hit")
return tag, cached

tag, output = await self._function_pod.async_process_packet(tag, packet)
if output is not None:
pf = self._function_pod.packet_function
self._cache.store(
packet,
output,
variation_data=pf.get_function_variation_data(),
execution_data=pf.get_execution_data(),
)
output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True})
return tag, output

def get_all_cached_outputs(
self, include_system_columns: bool = False
) -> "pa.Table | None":
"""Return all cached records from the result store for this pod."""
return self._cache.get_all_records(
include_system_columns=include_system_columns
)

def process(
self, *streams: StreamProtocol, label: str | None = None
) -> StreamProtocol:
"""Invoke the inner pod but with pod-level caching on process_packet.

The stream returned uses *this* pod's ``process_packet`` (which
includes caching) rather than the inner pod's.
"""
from orcapod.core.function_pod import FunctionPodStream

# Validate and prepare the input stream
input_stream = self._function_pod.handle_input_streams(*streams)
self._function_pod.validate_inputs(*streams)

return FunctionPodStream(
function_pod=self,
input_stream=input_stream,
label=label,
)
51 changes: 46 additions & 5 deletions src/orcapod/core/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import copy
from abc import ABC, abstractmethod
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
Expand Down Expand Up @@ -77,13 +79,52 @@ def supports_concurrent_execution(self) -> bool:
return False

def with_options(self, **opts: Any) -> "PacketFunctionExecutorBase":
"""Return an executor configured with the given per-node options.
"""Return a **new** executor instance configured with the given per-node options.

The default implementation ignores *opts* and returns *self*.
Subclasses that support resource options (e.g. ``RayExecutor``)
should override to return a new instance with the merged options.
The default implementation returns a shallow copy of *self*.
Subclasses that carry mutable state (e.g. ``RayExecutor``) should
override to produce a properly configured new instance.
"""
return self
return copy.copy(self)

# ------------------------------------------------------------------
# Callable-level execution (PythonFunctionExecutorProtocol)
# ------------------------------------------------------------------

def execute_callable(
self,
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
"""Synchronously execute *fn* with *kwargs*.

Default implementation calls ``fn(**kwargs)`` in-process.
Subclasses should override for remote/distributed execution.

Args:
fn: The Python callable to execute.
kwargs: Keyword arguments to pass to *fn*.
executor_options: Optional per-call options.

Returns:
The raw return value of *fn*.
"""
return fn(**kwargs)

async def async_execute_callable(
self,
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
"""Asynchronously execute *fn* with *kwargs*.

Default implementation delegates to ``execute_callable``
synchronously. Subclasses should override for truly async
execution.
"""
return self.execute_callable(fn, kwargs, executor_options)

def get_execution_data(self) -> dict[str, Any]:
"""Return metadata describing the execution environment.
Expand Down
48 changes: 47 additions & 1 deletion src/orcapod/core/executors/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

from typing import TYPE_CHECKING
import asyncio
import inspect
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

from orcapod.core.executors.base import PacketFunctionExecutorBase

Expand Down Expand Up @@ -35,3 +38,46 @@ async def async_execute(
packet: PacketProtocol,
) -> PacketProtocol | None:
return await packet_function.direct_async_call(packet)

# -- PythonFunctionExecutorProtocol --

def execute_callable(
self,
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
if inspect.iscoroutinefunction(fn):
return self._run_async_sync(fn, kwargs)
return fn(**kwargs)
Comment thread
eywalker marked this conversation as resolved.

@staticmethod
def _run_async_sync(fn: Callable[..., Any], kwargs: dict[str, Any]) -> Any:
"""Run an async function synchronously, handling nested event loops."""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(fn(**kwargs))
else:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(1) as pool:
return pool.submit(lambda: asyncio.run(fn(**kwargs))).result()

async def async_execute_callable(
self,
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
if inspect.iscoroutinefunction(fn):
return await fn(**kwargs)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: fn(**kwargs))

def with_options(self, **opts: Any) -> "LocalExecutor":
"""Return a new ``LocalExecutor``.

``LocalExecutor`` carries no state, so options are ignored.
"""
return LocalExecutor()
29 changes: 29 additions & 0 deletions src/orcapod/core/executors/ray.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

from orcapod.core.executors.base import PacketFunctionExecutorBase
Expand Down Expand Up @@ -157,6 +158,34 @@ async def async_execute(
raw_result = await asyncio.wrap_future(ref.future())
return pf._build_output_packet(raw_result)

# -- PythonFunctionExecutorProtocol --

def execute_callable(
self,
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
import ray

self._ensure_ray_initialized()
remote_fn = ray.remote(**self._build_remote_opts())(fn)
ref = remote_fn.remote(**kwargs)
return ray.get(ref)

async def async_execute_callable(
self,
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
import ray

self._ensure_ray_initialized()
remote_fn = ray.remote(**self._build_remote_opts())(fn)
ref = remote_fn.remote(**kwargs)
return await asyncio.wrap_future(ref.future())

def with_options(self, **opts: Any) -> "RayExecutor":
"""Return a new ``RayExecutor`` with the given options merged in.

Expand Down
18 changes: 16 additions & 2 deletions src/orcapod/core/function_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ def function_pod(
version: str = "v0.0",
label: str | None = None,
result_database: ArrowDatabaseProtocol | None = None,
pod_cache_database: ArrowDatabaseProtocol | None = None,
executor: PacketFunctionExecutorProtocol | None = None,
**kwargs,
) -> Callable[..., CallableWithPod]:
Expand All @@ -662,7 +663,11 @@ def function_pod(
function_name: Name of the function pod; defaults to ``func.__name__``.
version: Version string for the packet function.
label: Optional label for tracking.
result_database: Optional database for caching results.
result_database: Optional database for packet-level caching
(wraps the packet function in ``CachedPacketFunction``).
pod_cache_database: Optional database for pod-level caching
(wraps the pod in ``CachedFunctionPod``, which caches at the
``process_packet`` level using input packet content hash).
executor: Optional executor for running the packet function.
**kwargs: Forwarded to ``PythonPacketFunction``.

Expand Down Expand Up @@ -692,10 +697,19 @@ def decorator(func: Callable) -> CallableWithPod:
)

# Create a simple typed function pod
pod = FunctionPod(
pod: _FunctionPodBase = FunctionPod(
packet_function=packet_function,
)

# if pod_cache_database is provided, wrap in CachedFunctionPod
if pod_cache_database is not None:
from orcapod.core.cached_function_pod import CachedFunctionPod

pod = CachedFunctionPod(
function_pod=pod,
result_database=pod_cache_database,
)

@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
Expand Down
Loading
Loading