Skip to content

feat(ENG-374): schema-driven execution context with datagram-based storage#132

Merged
eywalker merged 14 commits intodevfrom
eywalker/eng-374-audit-and-verify-execution-context-captured-by-packet
Apr 8, 2026
Merged

feat(ENG-374): schema-driven execution context with datagram-based storage#132
eywalker merged 14 commits intodevfrom
eywalker/eng-374-audit-and-verify-execution-context-captured-by-packet

Conversation

@eywalker
Copy link
Copy Markdown
Contributor

@eywalker eywalker commented Apr 8, 2026

Summary

  • Execution context schema: PythonPacketFunction.get_execution_data() now returns structured metadata (executor_type, executor_info: dict[str, str], python_version, extra_info: dict[str, str]) with a declared schema
  • Datagram-based ResultCache: store() accepts Datagrams instead of raw dicts, using the universal converter for Arrow type handling (including dict[str, str] as Arrow map columns)
  • Executor cleanup: Renamed LocalExecutorLocalPythonFunctionExecutor, renamed get_execution_data()get_executor_data() on executors, added schema methods to protocols, removed unused ExecutionEngineProtocol
  • Arrow round-trip fixes: Use universal converter (not raw to_pylist()) for Arrow→Python conversion in Datagram._init_from_table, preserve precise schema types in with_meta_columns/drop_meta_columns, pass Arrow-derived schema in FunctionNode.as_table()
  • Diagnostics: Warning when arrow_type_to_python_type falls back to Any, improved error hint in python_type_to_arrow_type

Fixes ENG-374

Related follow-up issues:

  • ENG-387 — universal converter: native datetime ↔ Arrow timestamp support
  • ENG-388 — arrow_schema_to_python_schema produces nullable types for all fields
  • ENG-389 — type inference: empty containers infer as dict[Any, Any] / list[Any]

Test plan

  • All 3190 existing tests pass (0 failures, 56 skipped)
  • New TestExecutionDataSchema tests verify execution data shape and schema
  • New TestStoreDictColumns tests verify executor_info and extra_info map columns in stored results
  • Wrapper delegation tests for schema methods
  • Integration: CachedPacketFunction and CachedFunctionPod store/retrieve with dict-typed columns
  • FunctionNode.as_table() with partial DB cache (the regression that caught the round-trip issue)

🤖 Generated with Claude Code

eywalker and others added 13 commits April 7, 2026 20:52
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…, fix scope accuracy

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ction execution data

Add get_function_variation_data_schema() and get_execution_data_schema()
as abstract methods on PacketFunctionBase, with concrete implementations
on PythonPacketFunction, PacketFunctionWrapper, and PacketFunctionProxy.

Replace the old flat get_execution_data() return (python_version +
execution_context) with a richer schema that includes executor_type,
executor_info, python_version, and extra_info. Update ResultCache to
JSON-serialize non-string execution data values.

Fix pre-existing test references to get_execution_data() on executor
objects (should be get_executor_data()).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ction helper

Replace dict-based variation_data/execution_data parameters in
ResultCache.store() with DatagramProtocol objects. The store method now
calls .as_table() on each datagram and prefixes column names, preserving
native Arrow types (including dict columns like executor_info) instead of
stringifying everything to large_string.

Add _build_metadata_datagrams() helper to CachedPacketFunction and update
all three callers (call, async_call, record_packet).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tion

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Arrow stores dict[str, str] as list[struct[key, value]]. When read back
via to_pylist(), the Python value is a list of dicts, not a dict. This
broke schema inference and struct conversion in several paths:

- Datagram._init_from_table: use arrow_schema_to_python_schema to
  preserve precise types; merge with inference for Any-typed fields
- Datagram.with_meta_columns: preserve existing schema types for
  unchanged fields; keep materialized meta table
- FunctionNode.as_table: derive Python schema from Arrow schema,
  excluding dict and Any-typed fields from struct conversion

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Three related fixes for dict[str, str] columns stored as Arrow maps:

1. Datagram._init_from_table: use universal converter (not raw
   to_pylist()) for Arrow→Python conversion so map columns become
   proper Python dicts. Derive meta schema from Arrow schema (not
   inference) to preserve precise types for empty containers.

2. Datagram.with_meta_columns/drop_meta_columns: preserve existing
   schema types for unchanged fields instead of re-inferring.

3. ResultCache.store: store POD_TIMESTAMP as ISO 8601 string instead
   of native pa.timestamp, since the universal converter does not yet
   support timestamp round-trip. (TODO comment marks this for revert.)

4. FunctionNode.as_table: derive Python schema from Arrow schema when
   available, avoiding re-inference of empty dicts as dict[Any, Any].

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…) return types

remote_opts is dict[str, Any] (values can be ints, nested dicts),
not dict[str, str]. The PacketFunction layer handles stringification.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tracts

- Rename LocalExecutor → LocalPythonFunctionExecutor for clarity
- Rename get_execution_data() → get_executor_data() on executor protocol/base
- Add get_executor_data_schema() to executor protocol and base
- Add get_function_variation_data_schema() and get_execution_data_schema()
  to PacketFunctionProtocol
- Remove unused ExecutionEngineProtocol
- Update all test references

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…with cwd fallback

- pipeline/graph.py: update LocalExecutor → LocalPythonFunctionExecutor
- git_utils: add try_cwd fallback for functions not in a git repo,
  track git_source ("function" vs "cwd") in returned metadata

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n types

arrow_type_to_python_type silently returned typing.Any for unsupported
Arrow types, causing cryptic "Unsupported Python type: typing.Any"
errors downstream. Now logs a warning with the original Arrow type.
The downstream error also includes a hint about common causes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements ENG-374 by making execution-context metadata schema-driven and ensuring cached-result storage supports structured metadata (notably dict[str, str]) via datagram-based serialization and improved Arrow↔Python conversion.

Changes:

  • Add explicit schemas for packet function “variation” and “execution” metadata; execution data now includes executor_type, executor_info, python_version, and extra_info.
  • Update caching to pass metadata as Datagrams into ResultCache.store() so Arrow type handling is delegated to the universal converter (enabling map columns for dict-typed fields).
  • Rename/clean up executor APIs (LocalExecutorLocalPythonFunctionExecutor, get_execution_data()get_executor_data()), plus several Arrow round-trip fixes and diagnostics.

Reviewed changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
tests/test_pipeline/test_status_observer_integration.py Update tests to use LocalPythonFunctionExecutor after executor rename.
tests/test_pipeline/test_logging_observer_integration.py Update logging observer integration tests for executor rename.
tests/test_pipeline/test_composite_observer.py Update composite observer tests for executor rename.
tests/test_core/test_result_cache.py Update ResultCache.store() tests for datagram-based metadata; add coverage for dict-typed execution columns.
tests/test_core/test_regression_fixes.py Update regression tests for executor rename and get_executor_data() behavior.
tests/test_core/packet_function/test_packet_function.py Update/extend tests for the new execution data shape and execution-data schema.
tests/test_core/packet_function/test_executor.py Update executor tests for new names, defaults, and get_executor_data() API.
tests/test_core/nodes/test_function_node_iteration.py Update node-iteration tests to use LocalPythonFunctionExecutor.
superpowers/specs/2026-04-07-execution-context-schema-design.md Add design spec describing schema-driven execution context and datagram-based metadata storage.
superpowers/plans/2026-04-07-eng374-execution-context-schema.md Add implementation plan detailing step-by-step changes for ENG-374.
src/orcapod/utils/git_utils.py Enhance git metadata discovery with optional CWD fallback and additional environment fields.
src/orcapod/semantic_types/universal_converter.py Improve diagnostics: better error hints and warnings when Arrow→Python type mapping falls back to Any.
src/orcapod/protocols/core_protocols/packet_function.py Extend packet function protocol to require schema methods for variation and execution metadata.
src/orcapod/protocols/core_protocols/executor.py Rename executor metadata method to get_executor_data() and add get_executor_data_schema().
src/orcapod/protocols/core_protocols/execution_engine.py Remove unused ExecutionEngineProtocol.
src/orcapod/pipeline/graph.py Update default executor selection to LocalPythonFunctionExecutor.
src/orcapod/core/result_cache.py Change store() to accept variation/execution Datagrams; adjust timestamp storage to string for converter limitations.
src/orcapod/core/packet_function.py Add schema methods, implement schema-driven execution metadata, default to local executor, and wire datagram-based cache storage.
src/orcapod/core/packet_function_proxy.py Add schema-method delegation/empty-schema behavior when proxy is unbound.
src/orcapod/core/nodes/function_node.py Preserve precise types by deriving Python schema from Arrow schema when available during table materialization.
src/orcapod/core/executors/ray.py Rename and restructure executor metadata reporting; add schema for executor metadata.
src/orcapod/core/executors/local.py Rename LocalExecutor to LocalPythonFunctionExecutor.
src/orcapod/core/executors/base.py Rename executor metadata method to get_executor_data() and add a default schema method.
src/orcapod/core/executors/init.py Re-export renamed local executor.
src/orcapod/core/datagrams/datagram.py Fix Arrow→Python conversion to use universal converter; preserve schema types across meta mutations.
src/orcapod/core/cached_function_pod.py Update pod-level caching writes to build metadata datagrams and call the new ResultCache.store() signature.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 78 to +84
env_info = {}
env_info["git_commit_hash"] = git_info.get("commit_hash")
env_info["git_repo_status"] = "dirty" if git_info.get("is_dirty") else "clean"
env_info["has_untracked_files"] = (
"true" if git_info.get("has_untracked_files") else "false"
)
env_info["git_source"] = git_source
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_git_info_for_python_object sets has_untracked_files from git_info.get("has_untracked_files"), but get_git_info() never populates that key, so this will always be reported as false and the -untracked suffix logic in PythonPacketFunction will never trigger. Consider adding has_untracked_files to get_git_info() (e.g., via repo.untracked_files) or removing this field from env_info to avoid misleading metadata.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added has_untracked_files to get_git_info() via repo.untracked_files.

Comment thread src/orcapod/core/executors/local.py Outdated
Comment on lines 131 to 133
"""Return a new ``LocalExecutor``.

``LocalExecutor`` carries no state, so options are ignored.
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The with_options docstring still references LocalExecutor, but the class is now LocalPythonFunctionExecutor. Updating the docstring avoids confusion and keeps the public API docs consistent with the rename.

Suggested change
"""Return a new ``LocalExecutor``.
``LocalExecutor`` carries no state, so options are ignored.
"""Return a new ``LocalPythonFunctionExecutor``.
``LocalPythonFunctionExecutor`` carries no state, so options are ignored.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — updated docstring to reference LocalPythonFunctionExecutor.

Comment thread src/orcapod/pipeline/graph.py Outdated
@@ -312,9 +312,9 @@ def compile(self) -> None:
# Default to LocalExecutor so capture/logging works
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment refers to LocalExecutor, but the default executor class is now LocalPythonFunctionExecutor. Please update the comment to match the renamed class to avoid confusion for future readers.

Suggested change
# Default to LocalExecutor so capture/logging works
# Default to LocalPythonFunctionExecutor so capture/logging works

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — comment updated to LocalPythonFunctionExecutor.

Comment on lines +148 to +149
pf = PythonPacketFunction(sometimes_fail, output_keys="result")
pf.executor = LocalExecutor() # sets executor (LocalExecutor.supports_concurrent_execution is False)
pf.executor = LocalPythonFunctionExecutor() # sets executor (LocalExecutor.supports_concurrent_execution is False)
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline comment still references LocalExecutor.supports_concurrent_execution; since the executor class was renamed, update the comment to LocalPythonFunctionExecutor (or just “local executor”) to keep the test description accurate.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — simplified the comment to just describe the behavior without referencing the old class name.

Comment on lines +339 to +348
class TestStoreDictColumns:
def test_executor_info_column_stored(self):
cache, db = _make_cache()
pf = _make_pf()
_compute_and_store(cache, pf, Packet({"x": 10}))

records = db.get_all_records(cache.record_path)
assert records is not None
exec_info_col = f"{constants.PF_EXECUTION_PREFIX}executor_info"
assert exec_info_col in records.column_names
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests only assert the presence of executor_info/extra_info columns, but not that they are stored with the intended Arrow type (e.g., pa.map_(pa.large_string(), pa.large_string())). Adding an assertion on records.schema.field(exec_info_col).type would ensure the dict→Arrow map conversion is actually covered and prevent regressions back to string columns.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — added pa.types.is_large_list() assertion to verify the column is stored as an Arrow map type (large_list of key-value structs), not a plain string.

Comment thread tests/test_core/test_result_cache.py Outdated
Comment on lines +350 to +358
def test_extra_info_column_stored(self):
cache, db = _make_cache()
pf = _make_pf()
_compute_and_store(cache, pf, Packet({"x": 10}))

records = db.get_all_records(cache.record_path)
assert records is not None
extra_info_col = f"{constants.PF_EXECUTION_PREFIX}extra_info"
assert extra_info_col in records.column_names
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to executor_info, this test currently checks only that the extra_info column exists. Consider also asserting the stored Arrow type is a map of strings so the dict-typed column behavior is truly validated.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same fix applied — added Arrow type assertion for extra_info column.

… assertions

- Add has_untracked_files to get_git_info() so the -untracked suffix
  logic in PythonPacketFunction actually triggers
- Update stale LocalExecutor references in docstrings and comments
- Strengthen TestStoreDictColumns to assert Arrow map type, not just
  column presence

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
skip_duplicates: bool = False,
) -> PacketProtocol:
"""Record the output packet against the input packet in the result store."""
var_dg, exec_dg = self._build_metadata_datagrams()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the point of record_packet method if this is not what's used in call and async_call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record_packet() is a public API for manually recording a pre-computed result into the cache without going through the full call() flow — no cache lookup, no RESULT_COMPUTED_FLAG meta. It's useful for external callers who compute a result outside the normal path and want to insert it into the cache.

call() and async_call() inline the store logic directly because they also handle cache lookup, skip_cache_insert, and the RESULT_COMPUTED_FLAG meta column — concerns that don't apply to record_packet().

That said, it's currently only used in tests. If we want to consolidate, we could have call()/async_call() delegate to record_packet() for the store step. Worth considering but not in scope for this PR.

@eywalker eywalker merged commit 15f9593 into dev Apr 8, 2026
8 of 9 checks passed
@eywalker eywalker deleted the eywalker/eng-374-audit-and-verify-execution-context-captured-by-packet branch April 8, 2026 01:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants