Skip to content

Feat/expr pickle#3

Draft
timsaucer wants to merge 11 commits into
feat/proto-codecsfrom
feat/expr-pickle
Draft

Feat/expr pickle#3
timsaucer wants to merge 11 commits into
feat/proto-codecsfrom
feat/expr-pickle

Conversation

@timsaucer
Copy link
Copy Markdown
Owner

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

timsaucer and others added 11 commits May 14, 2026 15:10
Builds on the codec consistency work in feat/proto-codecs. Python
scalar UDFs are cloudpickled inline into the proto `fun_definition`
field by PythonLogicalCodec / PythonPhysicalCodec, so a pickled Expr
that references a Python `udf()` reconstructs on the receiver with
no pre-registration. UDAFs, UDWFs, and FFI-imported UDFs still
resolve through the receiver's session.

Rust:
* `PythonFunctionScalarUDF` regains the `func()` / `input_fields()` /
  `return_field()` / `volatility()` / `from_parts()` accessors the
  codec needs.
* `crates/core/src/codec.rs` adds shared
  `try_encode_python_scalar_udf` / `try_decode_python_scalar_udf`
  helpers built on cloudpickle + pyarrow IPC for the input schema.
  Both `PythonLogicalCodec.try_encode_udf` and
  `PythonPhysicalCodec.try_encode_udf` consult the helper first and
  fall back to `inner` for non-Python UDFs (and the receiver's
  function registry on decode if the prefix does not match).

Python:
* `datafusion.ipc` module: thread-local `set_worker_ctx` /
  `clear_worker_ctx` / `get_worker_ctx` for installing a receiver
  `SessionContext` on a worker process. `_resolve_ctx` returns
  explicit > worker > fresh.
* `Expr.__reduce__` returns `(Expr._reconstruct, (self.to_bytes(),))`.
  `_reconstruct` calls `Expr.from_bytes(buf, ctx=None)` which
  consults the worker context.
* `Expr.from_bytes` signature switches to `(buf, ctx=None)` (was
  `(ctx, buf)`); no callers in main, only PR1 tests which are
  updated.
* `datafusion.ipc` exported from the top-level package.

Dependencies:
* `cloudpickle>=2.0` added as a runtime dep. Lazy-imported on the
  encode / decode hot paths — users who never pickle a plan or
  expression pay only the install footprint, not import-time cost.
* ruff `S301` added to the test-suite + examples ignore lists
  (legitimate `pickle.loads` use).

Tests:
* `test_pickle_expr.py` — 11 cases covering built-in expr pickle,
  scalar UDF self-contained blobs, closure-capturing UDFs, worker
  ctx lifecycle, thread-local isolation.
* `test_pickle_multiprocessing.py` + `_pickle_multiprocessing_helpers.py`
  — parametrized over `fork`/`forkserver`/`spawn` start methods. 9
  cases. Auto-skip when the sandbox blocks semaphore creation; CI
  runs the full matrix.
* `test_expr.py` — existing `from_bytes` tests updated to new
  signature.

1088 root tests pass (up from 1077), 13 skipped (up from 4, the new
mp cases skip locally under sandboxed semaphores).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Companion to the pickle work in the previous commit. Ships the
discoverable surface a user would actually reach for when they hit
"how do I distribute these expressions":

* `docs/source/user-guide/io/distributing_expressions.rst` — end-to-end
  user guide covering the recommended `Pool(initializer=...)` pattern,
  the worker context shape, what does and does not survive the
  round-trip (scalar UDFs yes, UDAF/UDWF/FFI by name), Python 3.14
  start-method change, and the cloudpickle security note.
* `examples/ray_pickle_expr.py` — runnable Ray actor demo using
  `set_worker_ctx` from an actor `__init__`.
* `examples/README.md` — links to the Ray example.
* `docs/source/user-guide/io/index.rst` — adds the new page to the
  IO TOC.
* `.github/workflows/test.yml` — 30-minute `timeout-minutes` backstop
  on the test matrix so a hung multiprocessing worker (e.g. during a
  pickle regression) does not block CI indefinitely.
* `python/datafusion/user_defined.py` — `ScalarUDF` / `AggregateUDF` /
  `WindowUDF` get a `.name` property surfacing the registered name.
  Useful for tests asserting an expression carries a specific UDF
  reference, and for users debugging worker registrations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`input_fields: Vec<Field>` and `volatility: Volatility` were added to
the struct so the codec could read them on encode. Both were
redundant:

* `Signature` already carries the `Vec<DataType>` (via
  `TypeSignature::Exact`) and `Volatility` — the constructor collapses
  the incoming `Vec<Field>` to `DataType`s on its way into the
  signature, so `Field`-level metadata (nullability, attached
  metadata) is never propagated anywhere on the local side.
* On decode, `from_parts` runs that same collapse again. Sender's
  `Signature` and receiver's `Signature` end up with the same
  `DataType`s and the same `Volatility`. The reconstructed
  `PythonFunctionScalarUDF` is functionally equivalent to the
  original without preserving the input-side `Field`s.

Revert the struct to its original 4-field shape (`name`, `func`,
`signature`, `return_field`). The codec now derives the input
`DataType`s from `signature.type_signature` and reads volatility from
`signature.volatility`. Input fields are still serialized into the
cloudpickle payload (with synthesized `arg_i` names) so the wire
format is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous round-trip went Rust Schema -> pyarrow Schema -> IPC bytes
-> cloudpickle tuple -> pyarrow Schema -> Rust Schema, for both the
input schema and the return Field. Two unnecessary pyarrow trips on
each side.

Replace with `StreamWriter::try_new(&mut buf, &schema)?.finish()?`
on the encoder and `StreamReader::try_new(cursor, None)?.schema()`
on the decoder. Both ends produce / consume the same Arrow IPC
stream bytes — arrow-rs writes a schema-only stream, arrow-rs reads
it back, no PyArrow involvement.

Tuple shape changes slightly: the fourth field is now a one-field
`return_schema_bytes` IPC blob instead of a pickled pyarrow `Field`.
Keeps everything in `Vec<u8>` form before cloudpickle picks it up.

`pyarrow.ipc.read_schema` and the `ToPyArrow` / `FromPyArrow` traits
on `Schema` / `Field` are no longer needed on the codec hot path,
shaving a noticeable chunk of pyarrow function dispatch from each
encode / decode call.

Pickle tests still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous wording for `Expr.to_bytes`, `Expr.__reduce__`, and the
`datafusion.ipc` module header referenced ``PythonLogicalCodec`` and
``cloudpickle`` to explain what survives the wire. Neither name is
importable from Python and the mechanism is irrelevant to the end
user — only the resulting contract matters.

Reword each docstring to describe the user-facing guarantee directly:

* Python scalar UDFs travel inside the pickle / serialized blob, no
  pre-registration needed on the receiver.
* Aggregate UDFs, window UDFs, and FFI-capsule UDFs travel by name
  only and require the receiver to have them registered (typically
  via `set_worker_ctx`).

The implementation can change underneath without invalidating these
docs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User-facing docs throughout this PR led with "pickle support":
filename-shaped headings, function docstrings describing how things
get cloudpickled into a Rust-side codec, etc. That's the
implementation pathway, not the user's goal.

The user's goal is to build an expression in a driver process and
ship it to worker processes for distributed evaluation. Pickle is the
mechanism Python provides to make that work; we hook into it. End
users typically don't care how the bytes are produced — they care
which references survive the trip and what they have to register on
each worker.

Reframe across user-facing surfaces:

* `docs/source/user-guide/io/distributing_expressions.rst` — leads
  with the worker-pool use case, drops `PythonUDFCodec` /
  cloudpickle vocabulary, presents "what travels with the
  expression" as the user contract.
* `datafusion.ipc` module docstring + `set_worker_ctx` /
  `clear_worker_ctx` / `get_worker_ctx` — describes what the user
  installs and why, not internal lookup details.
* `Expr.to_bytes` / `from_bytes` / `__reduce__` — describes what's
  shipped vs what travels by name; cross-references the user guide
  instead of repeating the codec story.
* `examples/ray_pickle_expr.py` header + comment + README entry —
  goal-first wording.
* Pickle test module docstrings — drop the dangling reference to
  `PythonUDFCodec` (also a stale name post-PR1).

Code behavior unchanged. 1088 tests still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Window UDFs no longer need worker-side pre-registration. The codec
serializes the Python evaluator factory into the wire format and the
receiver reconstructs the UDF from bytes alone, same as scalar UDFs.

Refactor `MultiColumnWindowUDF` to store the Python evaluator
callable directly (`evaluator: Py<PyAny>`) instead of a
`PartitionEvaluatorFactory` closure. The factory closure was a
boxed `Fn` that captured the Python state opaquely, with nothing for
the codec to downcast back to. Now the named struct holds the
`Py<PyAny>` and builds a partition evaluator inside
`partition_evaluator()` on demand.

`PyWindowUDF::new` constructs `MultiColumnWindowUDF` directly with
the evaluator. `to_rust_partition_evaluator` is replaced by
`instantiate_partition_evaluator`, called from the trait method.

Codec wiring:
* `crates/core/src/codec.rs` adds `try_encode_python_window_udf` /
  `try_decode_python_window_udf` plus the `DFPYUDW1` magic prefix.
* `PythonLogicalCodec.try_encode_udwf` / `try_decode_udwf` and the
  matching `PythonPhysicalCodec` methods consult the helpers first
  and fall back to `inner` for non-Python window UDFs.

Test coverage in `test_pickle_expr.py::TestWindowUDFCodec` mirrors
the scalar UDF cases: self-contained blob, decode into fresh
context, decode via pickle with no worker context.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Aggregate UDFs no longer need worker-side pre-registration. The codec
serializes the Python accumulator factory + state schema into the
wire format and the receiver reconstructs the UDF from bytes alone.

New `PythonFunctionAggregateUDF` named struct (in `crates/core/src/udaf.rs`)
holds `accumulator: Py<PyAny>` plus signature, return type, and state
fields directly. Full `AggregateUDFImpl` impl mirroring upstream
`SimpleAggregateUDF`: `as_any`, `name`, `signature`, `return_type`,
`accumulator`, `state_fields`. `accumulator()` lazily instantiates a
fresh accumulator per partition via the new
`instantiate_accumulator()` helper.

`PyAggregateUDF::new` now constructs `PythonFunctionAggregateUDF`
directly via `AggregateUDF::new_from_impl(...)` instead of routing
through `create_udaf(...)` + `to_rust_accumulator(...)`. The closure-
based factory path is gone; the Python state stays addressable.

Codec wiring:
* `crates/core/src/codec.rs` adds `try_encode_python_agg_udf` /
  `try_decode_python_agg_udf` plus the `DFPYUDA1` magic prefix.
  Tuple shape: `(name, accumulator, input_schema_bytes,
  return_schema_bytes, state_schema_bytes, volatility_str)`.
* `PythonLogicalCodec.try_encode_udaf` / `try_decode_udaf` and the
  matching `PythonPhysicalCodec` methods consult the helpers first
  and fall back to `inner` for non-Python aggregate UDFs.

Test coverage in `test_pickle_expr.py::TestAggregateUDFCodec` mirrors
the scalar / window UDF cases.

1094 root tests pass (up from 1088, plus 3 new UDAF cases and 3 new
UDWF cases from the prior commit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With aggregate UDFs and window UDFs now reconstructable from bytes
alone, the user-facing contract simplifies to:

* Built-in functions and **all** Python UDFs (scalar, aggregate,
  window) travel inside the shipped expression. No worker-side
  pre-registration.
* Only UDFs imported via the FFI capsule protocol travel by name and
  require pre-registration via `set_worker_ctx`.

Update each user-facing surface:

* `docs/source/user-guide/io/distributing_expressions.rst` — drop the
  "aggregate/window UDFs travel by name only" caveat; rename the
  practical-considerations entry that called out the limitation.
* `python/datafusion/ipc.py` module + `clear_worker_ctx` — explicitly
  list scalar, aggregate, and window as inline-portable.
* `python/datafusion/expr.py` — `to_bytes` and `__reduce__`
  docstrings updated.
* Test module docstrings updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…inds

Two fixes in the intro paragraph:

* Link to the standard library pickle docs rather than relying on the
  reader's familiarity with `pickle.dumps` / `pickle.loads`.
* "Python scalar UDFs ride along" only covered scalar UDFs. With
  aggregate and window UDFs now also traveling inline, the line is
  reworded to call out all three kinds.

Also updates the inline code comment in the worker-pool example.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Pool / Ray-actor examples called `pickle.dumps` on the sender
and `pickle.loads` on the worker explicitly. That's not what real
user code looks like — `multiprocessing.Pool.starmap`, Ray's
`@ray.remote`, and similar frameworks serialize their function
arguments automatically. Showing the manual wrapping makes the API
look more involved than it is and obscures the point: users hand a
DataFusion `Expr` to their distribution framework like any other
Python object, and it Just Works.

Rewrites:

* User guide worker-pool example switches from
  `pool.map(evaluate, [(blob, batch), ...])` (where `blob =
  pickle.dumps(expr)`) to `pool.starmap(evaluate, [(expr, batch),
  ...])`. `evaluate(expr, batch)` receives the reconstructed
  expression directly.
* Ray example drops the `pickle.dumps(expr)` / `pickle.loads(blob)`
  pair; `evaluate(expr, batch)` takes a typed `Expr`. Drops the
  unused `pickle` import.
* Worker-context narrative updated: "expressions reconstructed by
  pickle.loads" -> "expressions arriving from the driver".
* Security warning reworded to mention pickle as the underlying
  mechanism while still framing the contract in user terms (only
  accept expressions from trusted sources).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant