Skip to content

feat!: Python callback streaming, std::sync::mpsc removed (refs #482)#492

Merged
userFRM merged 1 commit into
mainfrom
feat/482c-python-callback
May 6, 2026
Merged

feat!: Python callback streaming, std::sync::mpsc removed (refs #482)#492
userFRM merged 1 commit into
mainfrom
feat/482c-python-callback

Conversation

@userFRM
Copy link
Copy Markdown
Owner

@userFRM userFRM commented May 6, 2026

Summary

Stage 3 of issue #482: migrate the PyO3 binding to the SSOT StreamingDispatcher. The C ABI rewrite landed in PR #490; this PR brings Python in line. TypeScript follows in PR D, C++ in PR E.

Removed Python methods

  • tdx.next_event(timeout_ms) -- poll-style entry point.
  • tdx.next_event_typed(timeout_ms) -- alias for the above.

New method signature

tdx.start_streaming(callback=handler)
# `handler(event)` runs on the dispatcher's drain thread under the GIL

The PyO3 method:

fn start_streaming(&self, callback: Py<PyAny>) -> PyResult<()>

The wrapper struct stores ONLY the user's callable:

callback: Mutex<Option<Py<PyAny>>>,

EventRx, the rx field, the dropped_events: Arc<AtomicU64>, and every std::sync::mpsc reference are gone:

$ rg "std::sync::mpsc" sdks/python/ --type rust
$ # zero matches

Drop counter is now exposed via tdx.dropped_event_count(), forwarded to thetadatadx::ThetaDataDx::dropped_event_count. The value matches every other binding and survives reconnect because it lives on the SSOT dispatcher.

Migration

# Before
tdx.start_streaming()
while True:
    event = tdx.next_event(100)
    if event:
        process(event)

# After
def handler(event):
    process(event)
tdx.start_streaming(callback=handler)

GIL-acquire-in-dispatcher contract

Per-event flow:

  1. FPSS reader thread decodes the wire frame and calls the dispatcher's send-side closure (cheap clone onto the bounded crossbeam_channel(8192) queue). Reader never blocks on user code.
  2. Dispatcher drain thread pops the event, runs Python::attach(|py| { ... }), builds the typed pyclass via buffered_event_to_typed, and calls dispatch_cb.call1(py, (typed,)).
  3. Exceptions raised inside callback are routed through PyErr::write_unraisable(py, None) so a buggy callback cannot kill the streaming thread.
  4. Queue overflow drops the event and ticks the SSOT counter; consumers observe it via dropped_event_count().

Why no Python start_streaming_inline

GIL acquisition can block. A slow Python callable on the FPSS reader thread would:

  1. Block the reader on Python::attach.
  2. Fill the kernel TCP receive buffer.
  3. Trigger TCP backpressure on the vendor side.
  4. Cause the FPSS server to disconnect the session.

The Python binding deliberately does NOT expose the inline opt-in path. C ABI consumers can; Python cannot. Documented in the start_streaming docstring and in this PR's changelog entry.

Test plan

  • cargo fmt --all -- --check
  • cargo clippy --workspace --all-targets -- -D warnings
  • cargo test --workspace
  • cargo deny check
  • cargo run -p thetadatadx --bin generate_sdk_surfaces --features config-file -- --check
  • cargo check --manifest-path tools/mcp/Cargo.toml --locked
  • cargo clippy --manifest-path tools/mcp/Cargo.toml --all-targets -- -D warnings
  • cargo test --manifest-path tools/mcp/Cargo.toml --no-run
  • cargo check --manifest-path tools/server/Cargo.toml --locked
  • cargo check --manifest-path sdks/python/Cargo.toml --locked
  • cargo check --manifest-path sdks/typescript/Cargo.toml --locked
  • Final scrub: rg "std::sync::mpsc" sdks/python/ --type rust returns zero matches.
  • Live FPSS smoke (pytest sdks/python/tests/test_dropped_events.py with THETADX_TEST_CREDS set) -- requires market access; runs in the surfaces CI job.

Out of scope

  • Version bump. Cargo.tomls stay at 8.0.29. The single coherent breaking bump happens at PR E (C++) once every binding has migrated.
  • TypeScript and C++ wrapper migrations -- PRs D and E.
  • Wider docs-site streaming guide overhaul. The Python-specific accessor reference (tools/server.md) is updated; cross-language tables stay until TS / C++ migrate.

Refs #482.

Migrate the PyO3 binding to the SSOT `StreamingDispatcher`. The
new public API is:

    def handler(event):
        process(event)
    tdx.start_streaming(callback=handler)

The dispatcher's drain thread acquires the GIL via `Python::attach`
to call the user callable for every typed FPSS event. The reader
thread itself never touches Python.

Removed Python methods:
- `tdx.next_event(timeout_ms)` -- the poll-style entry point.
- `tdx.next_event_typed(timeout_ms)` -- alias for the above.

Removed wrapper internals:
- `EventRx` type alias and the `rx: Mutex<Option<Arc<...>>>` field.
- The closure-local `dropped_events: Arc<AtomicU64>` counter.
- The `(tx, rx) = std::sync::mpsc::channel::<BufferedEvent>()`
  shim in `start_streaming` and `reconnect`, and the matching
  `recv_timeout` poll loop with `RecvTimeoutError` arms.

`std::sync::mpsc` is gone from `sdks/python/`:
    rg "std::sync::mpsc" sdks/python/ --type rust  -> 0 matches.

Drop counter is now exposed via `tdx.dropped_event_count()`,
forwarded to `thetadatadx::ThetaDataDx::dropped_event_count` so the
value matches every other binding and survives reconnect.

Why no Python `start_streaming_inline`: GIL acquisition can block,
and a slow Python callable on the FPSS reader thread would fill
the kernel TCP receive buffer and trigger a vendor-side disconnect.
The Python binding deliberately does NOT expose the inline opt-in
path. C ABI / TS / future C++ migrations can; Python cannot.

Tests rewritten to register a callback and assert
`dropped_event_count()` is monotonically non-decreasing across
`start_streaming -> reconnect -> stop_streaming`. Two new
guard-rail tests cover non-callable inputs and `reconnect()`
without a prior callback.

Generator template rewrites cover both `start_streaming` and
`reconnect`. The `MethodKind::NextEvent` arm in
`build_support/sdk_surface/python.rs` is now a panic stub --
`python_unified` was dropped from the kind's allowed target list
in `spec.rs`, so the panic only fires if someone reintroduces
poll-style Python streaming without re-implementing the body.

TypeScript still exposes poll-style `next_event` via its internal
mpsc shim until PR D. C++ migrates in PR E.

No version bump: `Cargo.toml`s stay at 8.0.29. The single coherent
breaking version bump rides PR E once every binding has migrated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@userFRM userFRM merged commit 76b1ea1 into main May 6, 2026
31 checks passed
@userFRM userFRM deleted the feat/482c-python-callback branch May 6, 2026 13:09
userFRM added a commit that referenced this pull request May 6, 2026
Migrate the C++ wrapper to the callback C ABI shipped in PR #490.
`tdx::FpssClient` gains two header-only methods that wrap
`tdx_fpss_set_callback` / `tdx_fpss_set_inline_callback`:

    void set_callback(std::function<void(const FpssEvent&)> fn);
    void set_inline_callback(std::function<void(const FpssEvent&)> fn);

The Client owns a `unique_ptr<std::function<...>>` -- a stable
address survives moves of the owning client and is handed to the C
ABI as the `void* ctx`. A free `extern "C"` shim recovers the
function from `ctx` and invokes it with `const FpssEvent&`,
swallowing any propagating exception so unwinding cannot cross the
Rust/C boundary. The destructor's call to `tdx_fpss_shutdown` runs
before the function storage is freed, so the dispatcher / reader
threads cannot dereference stale state.

`fpss_smoke.cpp` is restored on the callback path -- `#error`
directive deleted, example rewritten to subscribe, register a
queued callback, print events for five seconds, and exit cleanly.
The CMake target builds clean.

`sdks/cpp/README.md` streaming section now documents callback
registration as the only entry point with a dedicated note on the
inline opt-in's microsecond-budget contract.

Version bump 8.0.29 -> 8.0.30 via `scripts/bump_version.py`. The
[Unreleased] CHANGELOG block becomes `## [8.0.30] - 2026-05-06` --
the single coherent #482 release that bundles the dispatcher core
(#489), C ABI callback (#490), and C++ wrapper migration. Python
(#492) and TypeScript (#493) entries land when those PRs merge.

Closes #482.
userFRM added a commit that referenced this pull request May 6, 2026
* feat!: C++ callback wrapper + v8.0.30 release (closes #482)

Migrate the C++ wrapper to the callback C ABI shipped in PR #490.
`tdx::FpssClient` gains two header-only methods that wrap
`tdx_fpss_set_callback` / `tdx_fpss_set_inline_callback`:

    void set_callback(std::function<void(const FpssEvent&)> fn);
    void set_inline_callback(std::function<void(const FpssEvent&)> fn);

The Client owns a `unique_ptr<std::function<...>>` -- a stable
address survives moves of the owning client and is handed to the C
ABI as the `void* ctx`. A free `extern "C"` shim recovers the
function from `ctx` and invokes it with `const FpssEvent&`,
swallowing any propagating exception so unwinding cannot cross the
Rust/C boundary. The destructor's call to `tdx_fpss_shutdown` runs
before the function storage is freed, so the dispatcher / reader
threads cannot dereference stale state.

`fpss_smoke.cpp` is restored on the callback path -- `#error`
directive deleted, example rewritten to subscribe, register a
queued callback, print events for five seconds, and exit cleanly.
The CMake target builds clean.

`sdks/cpp/README.md` streaming section now documents callback
registration as the only entry point with a dedicated note on the
inline opt-in's microsecond-budget contract.

Version bump 8.0.29 -> 8.0.30 via `scripts/bump_version.py`. The
[Unreleased] CHANGELOG block becomes `## [8.0.30] - 2026-05-06` --
the single coherent #482 release that bundles the dispatcher core
(#489), C ABI callback (#490), and C++ wrapper migration. Python
(#492) and TypeScript (#493) entries land when those PRs merge.

Closes #482.

* fix(cpp): address review on PR #494 -- memory safety on set_callback / move-assign

set_callback / set_inline_callback now stage the new std::function into a
local unique_ptr and only adopt it into callback_ after tdx_fpss_set_callback
returns 0. The C ABI rejects subsequent registrations with -1 while keeping
the previously installed (callback, ctx) live, so overwriting callback_
before checking the return code dangled the Rust-side ctx into freed
storage.

operator=(FpssClient&&) now drains the existing handle via tdx_fpss_shutdown
before dropping the old callback_. tdx_fpss_shutdown stops the FPSS reader
and joins the dispatcher drain thread before returning, so once it
completes no thread can still observe the old ctx pointer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant