libmoq: auto-reconnect sessions; conducer-based Reconnect notifications#1544
Conversation
moq_session_connect established a single session that died permanently when the connection dropped. Wrap the connect in moq_native's Reconnect helper so the session reconnects with exponential backoff. Origins outlive the connection, so published broadcasts are re-announced and consumers re-subscribed automatically on each reconnect. The on_status callback now fires a negative code only when reconnection permanently gives up (backoff timeout exceeded); it no longer fires 0 on connect, since pub/sub is driven by the origins rather than the callback. close() still aborts the reconnect loop cleanly via drop. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis PR updates moq_session_connect docs for automatic exponential-backoff reconnect and re-announcement/resubscription semantics; refactors Session::connect_run to drive a reconnect(url) loop that reports positive epoch-based Connected codes and surfaces terminal failures as negative codes; moves reconnect lifecycle signaling to a conducer-shared State with a new public Status and Reconnect::status polling API; ensures session on_status callbacks are invoked outside the global lock; and re-exports conducer from moq-net. 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Restore libmoq's status-0-on-connect callback without forking the reconnect loop. Reconnect now exposes an epoch-based connected() that resolves on each successful connect; libmoq selects it against closed() and re-fires the callback. The epoch lets callers avoid missing rapid reconnects by passing the last value they observed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Address PR review feedback: - Rebuild Reconnect on conducer instead of tokio::sync::watch, matching the rest of moq-net. State (connect/disconnect counters + terminal error) lives in a conducer channel; closing the channel is the only terminal signal. Expose poll_connected/poll_disconnected/poll_closed with async wrappers, following the poll_* convention. - Re-export conducer from moq-net. Its public API already leaks conducer::Waiter (poll_* methods), so callers need to name the type; moq-native now uses moq_net::conducer rather than a direct dep. - Add Reconnect::disconnected, analogous to connected. Disconnects are not fatal; the loop keeps retrying. - libmoq: chain reconnect() onto the client builder, and report state through the status callback via the code sign: positive = connection epoch (1 first, 2+ reconnect), 0 = transiently disconnected, negative = terminal give-up. Copy the callback out before releasing the lock so the C callback never runs while the global state is held. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/libmoq/src/session.rs (1)
43-45:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winSnapshot the callback so
State::lock()is dropped before invoking user code
State::lock()in theif letscrutinee stays alive through theifbody, soentry.callback.call(res)can run while holding the global lock (contrary to the “lock is dropped before the callback is invoked” comment), risking deadlock if the callback re-enters libmoq.Suggested fix
- if let Some(entry) = State::lock().session.task.remove(id).flatten() { - entry.callback.call(res); - } + let callback = State::lock() + .session + .task + .remove(id) + .flatten() + .map(|entry| entry.callback); + + if let Some(callback) = callback { + callback.call(res); + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/libmoq/src/session.rs` around lines 43 - 45, The code currently holds the global lock across the `if` body so `entry.callback.call(res)` can run while the lock is still held; change the logic to snapshot (move) the callback out while the lock is held and then drop the lock before invoking user code: use `State::lock().session.task.remove(id).flatten()` only to extract the entry, immediately move `entry.callback` into a local variable (e.g., `callback`) while still under the lock, let the lock scope end, then call `callback.call(res)` outside that lock scope so `State::lock()` is not held during the callback invocation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/libmoq/src/session.rs`:
- Around line 73-87: The current tokio::select! on reconnect.connected(connects)
and reconnect.disconnected(disconnects) can yield disconnected(0) before the
matching connected epoch because the two counters are independent; replace the
dual-branch await with a single ordered event stream so events are serialized.
Concretely, add or use a single producer method on the reconnect state (e.g.
reconnect.next_event() or reconnect.events() returning an async Stream/async fn
that yields an enum like ReconnectEvent::Connected(epoch) |
ReconnectEvent::Disconnected(epoch)), then await that single source inside the
loop and call Self::notify(task_id, epoch_or_0) based on the event; this ensures
ordering (use State.connects / State.disconnects internally in reconnect to
produce ordered events) and removes the separate reconnect.connected(...) and
reconnect.disconnected(...) branches from the tokio::select!.
In `@rs/moq-native/src/reconnect.rs`:
- Around line 153-161: poll_connected (and similarly poll_disconnected)
currently maps Poll::Ready(Err(_)) to Pending, which drops the last recorded
epoch if the reconnect task terminated; instead, on Poll::Ready(Err(_)) read the
last stored epoch from the shared state and return Poll::Ready(epoch) when that
epoch > since (otherwise return Pending). Concretely, inside
poll_connected/poll_disconnected change the Poll::Ready(Err(_)) arm to fetch the
final state (via the same shared state accessor used in the closure—e.g.,
inspect self.state's stored connects/disconnects) and compare to since,
returning Poll::Ready(last_epoch) when newer, otherwise Poll::Pending; do the
same mirrored change for poll_disconnected.
---
Outside diff comments:
In `@rs/libmoq/src/session.rs`:
- Around line 43-45: The code currently holds the global lock across the `if`
body so `entry.callback.call(res)` can run while the lock is still held; change
the logic to snapshot (move) the callback out while the lock is held and then
drop the lock before invoking user code: use
`State::lock().session.task.remove(id).flatten()` only to extract the entry,
immediately move `entry.callback` into a local variable (e.g., `callback`) while
still under the lock, let the lock scope end, then call `callback.call(res)`
outside that lock scope so `State::lock()` is not held during the callback
invocation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f893df7f-8952-4df3-af88-8f234bd67001
📒 Files selected for processing (4)
rs/libmoq/src/api.rsrs/libmoq/src/session.rsrs/moq-native/src/reconnect.rsrs/moq-net/src/lib.rs
✅ Files skipped from review due to trivial changes (1)
- rs/libmoq/src/api.rs
Address PR review feedback: - connects/disconnects strictly alternate, so await only the transition that can come next (select! `if` guards on a `connected` flag). This keeps the status notifications in true order even when the background reconnect loop has already raced several cycles ahead, instead of letting select! pick a ready branch arbitrarily. - Snapshot the status callback out of the slab before the State lock guard drops, so the C callback never runs while the global lock is held (matches the connect_run notify path). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Replace the separate connected()/disconnected() epoch methods with a single Reconnect::status() -> Result<Status> that returns transitions in order from an internal cursor. Connects and disconnects strictly alternate, so a single transition counter plus parity gives the kind, and the caller can't reorder or miss events (fixes the select! race where disconnected(0) could fire before the matching connected epoch). Err is the terminal signal (give-up error, or generic on close/drop). closed()/poll_closed remain for callers that only await termination. libmoq drives status() in a simple loop; the terminal Err propagates via `?`. On the (unreachable) i32 epoch overflow it now returns an error instead of clamping to i32::MAX. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-native/src/reconnect.rs`:
- Around line 178-196: The code returns the terminal error as soon as
consumer.poll returns Err(_) and thus may drop pending transitions; modify the
polling closure used in conducer::wait (the closure around consumer.poll /
state.epoch / since) so that on Poll::Ready(Err(_)) you still inspect the latest
state. If state.epoch > since treat it as a successful advance (return
Poll::Ready(true) / equivalent) so pending transitions are surfaced, otherwise
return the terminal error as before (the code path that currently calls
self.outcome().err().unwrap_or_else(...)). Update the logic in reconnect.rs
around the consumer.poll match and the advanced handling so channel-close errors
only short-circuit when no new epoch > since exists.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: da62c84e-29a7-4372-9ad6-9e0018b37389
📒 Files selected for processing (2)
rs/libmoq/src/session.rsrs/moq-native/src/reconnect.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rs/libmoq/src/session.rs
…isconnect Address PR review feedback: - Reconnect state is now the current status (Option<Status>) rather than a transition counter. status() compares it against the handle's last reported status and returns when they differ, so it tracks the current state instead of replaying every edge. A flip-and-flip-back between polls collapses to a single report. - Make Reconnect cloneable for multiple handles. The abort handle is wrapped in Arc<AbortOnDrop>, so the background loop stops once the last handle drops (replacing the explicit close()). - libmoq no longer notifies on disconnect: a separate change reserves status 0 for "closed", so only positive (connected epoch) and negative (terminal) codes are reported. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
close() was removed when Reconnect became cloneable (the loop now stops on last-handle drop), but Client::reconnect's doc still linked to it, breaking `cargo doc -D warnings` in CI. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Merge main (conducer was renamed to kio) and address review feedback: - Defer Clone/AbortOnDrop to a later PR: Reconnect is a single handle again with a plain Drop that aborts the task. - Add poll_status (the sync counterpart) using ready!/?, with status() as a thin kio::wait wrapper. Simplify poll_closed the same way. - Drop the needless Arc around the terminal error (anyhow::Error isn't Clone, but we only ever read it behind a shared ref and format it). - libmoq: move the status loop into report(), which returns anyhow::Result<()> so `?` flows directly; connect_run maps the terminal error to Connect once. Use .context() for the epoch overflow. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Integrates #1544 (auto-reconnect sessions) with the terminal-callback lifetime contract. session.rs now combines both: - connect_run/report drive moq_native::Reconnect and report a positive connection epoch (1 = first connect, >1 = reconnect) per (re)connect. - The contract's terminal rules apply: close() signals shutdown (Option sender), the task delivers exactly one final callback (0 = clean close, < 0 = reconnect gave up), then removes its own entry. user_data stays valid until that terminal callback. - report() takes the callback by value and calls it without holding the lock, replacing the task_id slab-lookup + notify() revoke dance (close no longer revokes, so the lookup was redundant). The moq_session_connect docs merge the reconnect/epoch wording with the terminal-0-on-close contract.
Reconcile main into dev. Key conflict resolutions: - conducer crate renamed to kio (main #1547): applied across all of dev's newer code; dropped the stale conducer path-dep, kept dev's new flate2 dep. - moq-mux: kept dev's thiserror Result (#1495); dropped main's CatalogSource as dead code since dev's catalog::Consumer already unifies Hang/MSF. - moq-net: kept dev's OriginConsumer/AnnounceConsumer split (#1434) and the TrackConsumer end_at cap; kept dev's non-optional auto-created origins on the lite session/publisher (#e770). - stats: combined main's StatsConfig + liveness retention (#1537, #1548) with dev's AnnounceConsumer usage. - libmoq + moq-native: kept main's auto-reconnect (#1544), terminal-callback contract (#1546), and consume_announced (#1552), adapted to dev's AnnounceConsumer and OriginProducer connect API. Restored the InitFailed error variant and made moq-rtc handle the now-fallible Log::init. cargo check/clippy/test all pass on the merged workspace.
Summary
moq_session_connectestablished a single session that died permanently when the connection dropped. It now drivesmoq_native'sReconnecthelper (the same one used bymoq-cli,moq-boy, and thehang/moq-nativeexamples) so libmoq sessions reconnect with exponential backoff. Origins outlive the connection, so published broadcasts are re-announced and consumers re-subscribed automatically on each reconnect.Reconnectrebuilt onkioReconnectstores its state (current status + terminal error) in akiochannel instead oftokio::sync::watch, matching the rest of moq-net. It exposes:status(&mut self) -> Result<Status>/poll_status— returns the currentStatus(Connected/Disconnected) whenever it differs from what this handle last reported. Level-triggered: it tracks the current state, so a flip-and-flip-back between polls collapses to a single report.Erris the terminal signal (give-up error, or generic once the handle is dropped).closed() -> Result<()>/poll_closed— for callers that only want to await termination (unchanged for existing consumers).kiois now re-exported from moq-net (pub use kio). Its public poll_* API already exposeskio::Waiter, so downstream callers need to name the type; moq-native usesmoq_net::kiorather than a direct dep. (MakingReconnectcloneable for multiple handles is deferred to a follow-up PR.)libmoq callback contract
moq_session_connect'son_statusreports connection state via the sign of the code:Transient disconnects are not reported (a separate change reserves 0 for "closed"). The status loop lives in
report(), which returnsanyhow::Result<()>so?flows;connect_runmaps the terminal error toConnectonce. The callback is copied out before the global lock is released, so the C callback never runs while the lock is held. The function signature is unchanged (no wire/ABI break), so this targetsmain.Follow-up (not in this PR)
rs/moq-ffi/src/session.rshas the identical single-shot.connect()gap for the Python/Swift/Kotlin/Go bindings. The same.connect()->.reconnect()swap applies there.Test plan
cargo clippy --all-targets --workspace -- -D warningscleancargo test -p libmoq -p moq-native -p moq-netpassRUSTDOCFLAGS="-D warnings" cargo doc --no-deps --workspacecleanmoq-cli/moq-boy/hang/moq-nativeconsumers + examples still build🤖 Generated with Claude Code
(Written by Claude)