moq-ffi: streaming media import + cross-language interop smoke test#1529
Conversation
We have unit tests but nothing that exercises the wire protocol across language implementations, which is the whole point of the polyglot wrappers. A catalog/container regression or a binding that fails to move frames would pass CI today. Add `just smoke`: it stands up a real moq-relay, then for each publisher language publishes an H.264 broadcast and fans out every subscriber in parallel, asserting each one sees non-zero frame data before a timeout. We check that bytes move end to end, not that H.264 decodes, so platforms without a decoder just count bytes. ffmpeg encodes a short deterministic clip once per run (moq-cli/moq-ffi only frame and forward); a manifest of NAL units lets every language replay the same media without codec logic. Phase 1 covers Rust and Python publishers/subscribers (the languages with working native clients) plus a negative control that proves the harness can fail. Swift/Kotlin/Go and a headless-browser JS client come in later phases. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis pull request adds a Rust UniFFI streaming producer (MoqMediaStreamProducer) that accepts unframed byte writes and buffers/decodes access units, exposes it to Python with a MediaStreamProducer wrapper and BroadcastProducer.publish_media_stream, provides a Python publish/subscribe example that uses stdin H.264 Annex‑B, introduces a bash smoke-test harness and relay config to exercise cross-language publisher/subscriber combinations, refactors test justfiles, adjusts rs CI wiring, and adds a GitHub Actions workflow to run the smoke test. 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (4)
py/moq-rs/examples/interop.py (3)
56-56: ⚡ Quick winDocument or name the
1_000parameter.The literal
1_000passed tosubscribe_medialacks context. What unit is this (milliseconds? buffer size? max frames?)? As per coding guidelines, use named constants instead of magic numbers, or at minimum add an inline comment explaining the parameter.🔍 Suggested clarification
If this is a buffer size or timeout, add a constant or comment:
+ # Subscribe with 1000ms lookahead buffer media = consumer.subscribe_media(track_name, video.container, 1_000)Or define as a constant if used elsewhere:
+SUBSCRIBE_LOOKAHEAD_MS = 1_000 # milliseconds of lookahead buffer + async def subscribe(url: str, broadcast: str, timeout: float) -> None: # ... - media = consumer.subscribe_media(track_name, video.container, 1_000) + media = consumer.subscribe_media(track_name, video.container, SUBSCRIBE_LOOKAHEAD_MS)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@py/moq-rs/examples/interop.py` at line 56, The literal 1_000 passed to consumer.subscribe_media(track_name, video.container, 1_000) is a magic number—document its unit or replace it with a named constant to clarify meaning (e.g., TIMEOUT_MS, BUFFER_SIZE, MAX_FRAMES). Update the call to use that constant (or add an inline comment) and define the constant near the top of the module or next to related constants so callers of subscribe_media clearly understand the parameter's unit/semantics.
40-40: ⚡ Quick winAdd
strict=Truetozip()for safety.The
zip()call lacks an explicitstrictparameter. Whileframesandtimestampsare currently derived from the same source, addingstrict=Truewould catch potential asset format inconsistencies. As per coding guidelines, prefer defensive coding practices.🛡️ Proposed fix
- for payload, ts in zip(frames, timestamps): + for payload, ts in zip(frames, timestamps, strict=True):Note: This requires Python 3.10+. If targeting earlier versions, verify compatibility or skip this change.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@py/moq-rs/examples/interop.py` at line 40, The for-loop iteration using zip(frames, timestamps) should be made strict to detect length mismatches: update the zip call in the loop inside interop.py (the line with "for payload, ts in zip(frames, timestamps)") to pass strict=True (i.e., zip(frames, timestamps, strict=True)) so a ValueError is raised if the iterables differ; ensure the project Python version supports Python 3.10+ or guard/conditionalize the change if older Python must be supported.
21-43: ⚡ Quick winExtract magic numbers to named constants.
Lines 27 and 29 contain magic numbers (
30for default FPS,1_000_000for microseconds per second) that would be clearer as named module constants. As per coding guidelines, avoid magic numbers and use named constants instead.♻️ Proposed refactor
Add constants after the imports (around line 19):
import moq +# Time and media constants +DEFAULT_FPS = 30 +MICROSECONDS_PER_SECOND = 1_000_000 + async def publish(url: str, broadcast: str, asset_dir: str) -> None:Then use them:
- fps = int(meta.get("fps", 30)) + fps = int(meta.get("fps", DEFAULT_FPS)) frame_dt = 1.0 / fps - loop_us = timestamps[-1] + 1_000_000 // fps + loop_us = timestamps[-1] + MICROSECONDS_PER_SECOND // fps🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@py/moq-rs/examples/interop.py` around lines 21 - 43, Introduce named module-level constants (e.g., DEFAULT_FPS and MICROS_PER_SECOND) right after the imports and replace the magic numbers in publish: use DEFAULT_FPS instead of the literal 30 when computing fps (int(meta.get("fps", DEFAULT_FPS))) and use MICROS_PER_SECOND instead of 1_000_000 when computing loop_us (loop_us = timestamps[-1] + MICROS_PER_SECOND // fps); update any comments or docstring to reflect the constants.demo/smoke/extract_asset.py (1)
39-40: ⚡ Quick winAdd docstring to document the Annex-B format.
The
annexbhelper is part of the public API but lacks documentation explaining its purpose. As per coding guidelines, public APIs should have clear docstrings.📚 Proposed addition
def annexb(nal: bytes) -> bytes: + """Wrap NAL unit with 4-byte Annex-B start code (0x00000001).""" return b"\x00\x00\x00\x01" + nal🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@demo/smoke/extract_asset.py` around lines 39 - 40, The public helper function annexb lacks a docstring describing its purpose and the Annex‑B format; add a concise docstring to the annexb function that documents the parameter (nal: bytes), describes that it prepends the 4‑byte Annex‑B start code (0x00 0x00 0x00 0x01) to a NAL unit, states the return type (bytes), any expectations about input (e.g., a raw NAL unit without start code), and an example usage/outcome; place this docstring immediately above the annexb definition so it becomes part of the public API documentation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@demo/smoke/extract_asset.py`:
- Around line 19-20: The docstring for function nal_units is misleading: it says
"Yield" but nal_units returns a list; update the docstring to use an appropriate
verb (e.g., "Return") and briefly describe the return value (list of NAL units
without start codes and with trailing zeros stripped) so it accurately matches
the implementation in nal_units.
- Around line 62-86: Introduce named H.264 constants and use them instead of
literals: define NAL_UNIT_TYPE_MASK = 0x1F and NAL_SPS = 7, NAL_PPS = 8,
NAL_SLICE = 1, NAL_IDR = 5, NAL_SEI = 6, NAL_AUD = 9 (or similar names) near the
top of the module, change nal_type(nal) to return nal[0] & NAL_UNIT_TYPE_MASK,
and replace all numeric comparisons in the loop (t == 7, t == 8, t in (1, 5), t
in (6, 9)) with the corresponding named constants (e.g., t == NAL_SPS, t ==
NAL_PPS, t in (NAL_SLICE, NAL_IDR), t in (NAL_SEI, NAL_AUD)) referencing the
nal_type function and the loop variables so behavior is unchanged.
In `@demo/smoke/run.sh`:
- Around line 205-208: The pub publisher PID is left set after reaping, risking
signaling a recycled PID later in cleanup(); after the wait that reaps the child
in the block that calls kill_tree "$pub_pid" and wait "$pub_pid", clear the PID
variable (unset or set pub_pid="" and also clear the global PUB_PID if used
elsewhere) so subsequent cleanup() cannot reuse a stale PID.
- Around line 21-44: The case block handling --publishers, --subscribers, and
--timeout uses $2 without checking it; modify the while/case logic (the branch
handling these flags in run.sh) to first verify that a non-empty next argument
exists and does not start with '-' before assigning to PUBLISHERS, SUBSCRIBERS
or TIMEOUT, and if the value is missing emit a clear CLI error and exit with
non-zero status instead of dereferencing $2 (leave the --negative branch
unchanged).
---
Nitpick comments:
In `@demo/smoke/extract_asset.py`:
- Around line 39-40: The public helper function annexb lacks a docstring
describing its purpose and the Annex‑B format; add a concise docstring to the
annexb function that documents the parameter (nal: bytes), describes that it
prepends the 4‑byte Annex‑B start code (0x00 0x00 0x00 0x01) to a NAL unit,
states the return type (bytes), any expectations about input (e.g., a raw NAL
unit without start code), and an example usage/outcome; place this docstring
immediately above the annexb definition so it becomes part of the public API
documentation.
In `@py/moq-rs/examples/interop.py`:
- Line 56: The literal 1_000 passed to consumer.subscribe_media(track_name,
video.container, 1_000) is a magic number—document its unit or replace it with a
named constant to clarify meaning (e.g., TIMEOUT_MS, BUFFER_SIZE, MAX_FRAMES).
Update the call to use that constant (or add an inline comment) and define the
constant near the top of the module or next to related constants so callers of
subscribe_media clearly understand the parameter's unit/semantics.
- Line 40: The for-loop iteration using zip(frames, timestamps) should be made
strict to detect length mismatches: update the zip call in the loop inside
interop.py (the line with "for payload, ts in zip(frames, timestamps)") to pass
strict=True (i.e., zip(frames, timestamps, strict=True)) so a ValueError is
raised if the iterables differ; ensure the project Python version supports
Python 3.10+ or guard/conditionalize the change if older Python must be
supported.
- Around line 21-43: Introduce named module-level constants (e.g., DEFAULT_FPS
and MICROS_PER_SECOND) right after the imports and replace the magic numbers in
publish: use DEFAULT_FPS instead of the literal 30 when computing fps
(int(meta.get("fps", DEFAULT_FPS))) and use MICROS_PER_SECOND instead of
1_000_000 when computing loop_us (loop_us = timestamps[-1] + MICROS_PER_SECOND
// fps); update any comments or docstring to reflect the constants.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8a5a1055-04ff-483d-8a96-1ed78a39466c
📒 Files selected for processing (6)
demo/smoke/extract_asset.pydemo/smoke/gen-asset.shdemo/smoke/run.shdemo/smoke/smoke.tomljustfilepy/moq-rs/examples/interop.py
- run.sh: validate option values before reading $2 (clean error under set -u
instead of an abort), and clear PUB_PID after reaping so cleanup() can't
signal a recycled PID.
- extract_asset.py: name the H.264 NAL-type constants instead of magic numbers,
and fix the nal_units docstring ("Return", not "Yield").
- interop.py: name DEFAULT_FPS / MICROSECONDS_PER_SECOND / MAX_LATENCY_MS and
pass strict=True to zip().
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Actionable comments posted: 0 |
moq-ffi previously only exposed import::Framed (publish_media + per-frame write_frame with explicit timestamps), so a publisher had to pre-split the bitstream into access units. The smoke test did this with a separate extract_asset.py step that parsed H.264 into a NAL manifest. Expose moq-mux's import::Stream instead: publish_media_stream(format) returns a producer whose write(bytes) feeds the streaming importer, which infers frame boundaries from a raw byte stream (Annex-B H.264, fMP4, …). It buffers any partial trailing frame between calls. With that, every native publisher just consumes the same `ffmpeg -re … -f h264 -` Annex-B stream on stdin (moq-cli already did this; the Python client now does too via publish_media_stream). Drops extract_asset.py and gen-asset.sh and the asset-manifest plumbing entirely. The uniffi bindings for Swift/Kotlin/Go/Python regenerate automatically from the proc-macro exports; libmoq (hand-written C) and doc/lib/* streaming variants are left as follow-up since no C client consumes the API yet. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The cross-language smoke test is too slow and integration-flaky to gate every PR via just ci: it cold-builds moq-relay/moq-cli + moq-ffi (maturin) and runs the matrix in real time. Give it its own workflow triggered by workflow_dispatch and a nightly schedule instead, so dependency/toolchain drift gets caught without sitting on the PR critical path. Delegates to the existing `just smoke` recipe. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…e.py The orchestrator is cross-cutting glue, not a demo, so move demo/smoke -> smoke/. The per-language client stays in its language folder; rename the Python one from examples/interop.py to examples/smoke.py to match. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@py/moq-rs/examples/interop.py`:
- Around line 32-38: The loop is using stdin.read(READ_CHUNK) which batches
until READ_CHUNK bytes are available; change the blocking call in the asyncio
executor to use stdin.read1(READ_CHUNK) (or sys.stdin.buffer.read1 if stdin is
the text wrapper) so bytes are returned as soon as available and media.write
gets streamed in real time; update the loop.run_in_executor call that invokes
stdin.read to call read1, keep the existing media.write(media_chunk) and
media.finish() behavior, and optionally add a small fallback to read if read1 is
not present on the stdin object.
In `@rs/moq-ffi/src/producer.rs`:
- Around line 380-391: The finish() implementation currently calls
decoder.finish() after taking media but never hands the leftover media.buffer to
the decoder, so the final Annex‑B access unit can be dropped; fix by, after
guard.take() and before calling decoder.finish(), checking media.buffer for any
remaining bytes and feeding them into the decoder (e.g. via the existing
decode_stream API or the decoder's streaming/decode method) so the last access
unit is emitted, then call decoder.finish(); ensure you operate on media.buffer
and decoder (symbols: finish(), media.buffer, decoder, decode_stream) while
holding the lock or otherwise synchronizing, and preserve expected
ownership/cleanup semantics after flushing.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1d97fbc0-d791-4441-a130-414cf1ee6560
📒 Files selected for processing (4)
demo/smoke/run.shpy/moq-rs/examples/interop.pypy/moq-rs/moq/publish.pyrs/moq-ffi/src/producer.rs
Fold the cross-language unit-test aggregator and the smoke harness into a top-level test/ module: `just test` still runs unit tests across js/rs/py (its default recipe), and `just test smoke` runs the smoke test. `set fallback` lets the module reach the root's per-language modules. Moves smoke/ -> test/ and run.sh -> test/smoke.sh; the smoke workflow now calls `just test smoke`. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- smoke.py: use stdin.read1 instead of read so bytes flow as ffmpeg produces them, rather than batching a full chunk before the first write_frame. - moq-ffi: the streaming importer emits each access unit when the next start code arrives, so the final undelimited frame at EOF is not emitted (same as moq-cli's stdin path). Correct the finish() doc which over-promised a flush. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
py/moq-rs/examples/smoke.py (1)
32-35:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse
read1()here so the publisher actually streams in real time.
stdin.read(READ_CHUNK)can buffer until a large chunk is available, which defeats theffmpeg -repacing and can turn this into a burst-at-EOF publisher. That makes the smoke test flaky because subscribers may time out before any bytes are written.Proposed fix
while True: # Blocking read off the event loop so the client keeps flushing. - chunk = await loop.run_in_executor(None, stdin.read, READ_CHUNK) + read_chunk = getattr(stdin, "read1", stdin.read) + chunk = await loop.run_in_executor(None, read_chunk, READ_CHUNK) if not chunk: break media.write(chunk)Does Python's `io.BufferedReader.read(n)` on `sys.stdin.buffer` potentially wait for a full buffered chunk, and does `read1(n)` return as soon as bytes are available for pipe-like streams?🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@py/moq-rs/examples/smoke.py` around lines 32 - 35, The loop is using stdin.read via loop.run_in_executor which can block until a full buffer and cause burst-at-EOF; replace the call to stdin.read (the call passed into loop.run_in_executor in the while loop surrounding READ_CHUNK) with stdin.read1 so the publisher streams bytes as they arrive; update the call site where chunk = await loop.run_in_executor(None, stdin.read, READ_CHUNK) to use stdin.read1 (preserving READ_CHUNK) so reads return immediately when data is available for pipe-like stdin.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@py/moq-rs/examples/smoke.py`:
- Around line 32-35: The loop is using stdin.read via loop.run_in_executor which
can block until a full buffer and cause burst-at-EOF; replace the call to
stdin.read (the call passed into loop.run_in_executor in the while loop
surrounding READ_CHUNK) with stdin.read1 so the publisher streams bytes as they
arrive; update the call site where chunk = await loop.run_in_executor(None,
stdin.read, READ_CHUNK) to use stdin.read1 (preserving READ_CHUNK) so reads
return immediately when data is available for pipe-like stdin.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: eba3fbb5-2b83-4cbe-8281-a0805cba99bd
📒 Files selected for processing (6)
.github/workflows/smoke.ymljustfilepy/moq-rs/examples/smoke.pytest/justfiletest/smoke.shtest/smoke.toml
💤 Files with no reviewable changes (1)
- test/smoke.toml
rs/justfile runs with working-directory '..', so `just test --all-features` in its `ci` recipe resolved to the root `test` recipe. That recipe became the `test` module (`just test smoke`), so the bare call broke with "Justfile does not contain recipe `test --all-features`". Point it at the rs module's own test recipe; this also drops the accidental redundancy of rs CI running js/py tests (they run in their own ci lanes). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
sys.stdin.buffer is typed BinaryIO, which doesn't declare read1 (it lives on BufferedReader), so pyright failed `just py check`. Use getattr(stdin, "read1", stdin.read): keeps the real-time read1 path, types as Any, and falls back to read if a stream lacks read1. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Summary
We have solid unit tests and one in-process Rust e2e test, but nothing exercises the wire protocol across language implementations — which is the whole reason the repo is polyglot (Rust core +
moq-ffiwrappers, plus the independent JS/WebCodecs stack). A catalog/container regression, or a binding that fails to move frames, would sail through CI today.This adds
just test smoke: a cross-language media interop smoke test, plus a smallmoq-ffiaddition that makes it clean.moq-ffi: streaming media importmoq-ffipreviously only exposedimport::Framed(publish_media+ per-framewrite_framewith explicit timestamps), so a publisher had to pre-split the bitstream into access units. This exposesmoq-mux'simport::Streaminstead:BroadcastProducer.publish_media_stream(format)→ a producer whosewrite(bytes)feeds the streaming importer, which infers frame boundaries from a raw byte stream (Annex-B H.264, fMP4, …) and buffers any partial trailing frame between calls.So a publisher can just pipe an encoder straight in — no manifest, no NAL splitting in the client.
The smoke test
moq-relay, then for each publisher language publishes an H.264 broadcast and fans out every subscriber in parallel, asserting each sees non-zero frame data before a timeout.ffmpeg -re … -f h264 -Annex-B stream on stdin (moq-clialready did this; the Python client now does too viapublish_media_stream).ffmpegis the only encoder; the clients only frame-and-forward.--negativecontrol (no publisher) proves the harness can actually fail.Phase 1 (this PR)
Rust and Python publishers/subscribers — the languages with working native clients today.
Later phases (not in this PR)
@moq/publishfake-device publisher,@moq/watchsubscriber). The@moq/netnode subscriber was intentionally dropped — it can't negotiate a version on the ws fallback (the JS client never offers versioned subprotocols, even after moq-relay: stop downgrading WebSocket clients to moq-lite-02 #1523 fixed the relay + Rust client), and the WebTransport polyfill segfaults Bun at teardown. JS coverage moves to the browser.moq-ffi..github/workflows/smoke.yml(Linux, then macOS).Files
rs/moq-ffi/src/producer.rs—publish_media_stream+MoqMediaStreamProducer(write/finish)py/moq-rs/moq/publish.py—BroadcastProducer.publish_media_stream+MediaStreamProducerwrapperpy/moq-rs/examples/smoke.py— Python interop client (publisher streams Annex-B from stdin)test/smoke.sh— orchestrator (--publishers/--subscribers/--timeout/--negative)test/smoke.toml— relay configtestjustfile module (just test smoke)Cross-package sync
The
moq-ffichange is additive. The uniffi bindings for Swift/Kotlin/Go/Python regenerate automatically from the proc-macro exports, so no manual wrapper edits are needed.libmoq(hand-written C) anddoc/lib/*streaming variants are left as follow-up since no C client consumes the API yet — flagging here per the sync table rather than silently skipping.Notes for reviewers
set -eabort in the negative-control path, unreliable teardown (moq-cliignores SIGTERM →kill_treeuses SIGKILL +wait),cleanupreaping the publisher, publisher-death diagnostics, a tool preflight, and CLI arg-value guards.just test smoke(needs the nix devShell for ffmpeg/uv/cargo).just test smoke --negativefor the negative control.Test plan
just test smoke --publishers rust,python --subscribers rust,python→ all 4 cells PASSjust test smoke --negative→ both subscribers correctly report no data, exit 0cargo clippy -p moq-ffi -- -D warnings,cargo fmt -p moq-ffi --checkcleannix develop🤖 Generated with Claude Code
(Written by Claude)