Skip to content

moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358

Open
kixelated wants to merge 1 commit intomainfrom
origin-rewrite
Open

moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358
kixelated wants to merge 1 commit intomainfrom
origin-rewrite

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

@kixelated kixelated commented Apr 28, 2026

Summary

Replaces the OriginNode/NotifyNode tree, per-publish web_async::spawn cleanup, and per-consumer mpsc fan-out with a flat HashMap<PathOwned, Entry> behind a Mutex plus per-consumer queues. Consumers register a single conducer::Waiter on both the shared state and each tracked broadcast's poll_closed, so a broadcast closing wakes its consumers directly — no spawned cleanup tasks, no tokio::time::sleep(1ms) in tests, no 127-message tokio-mpsc bug.

Active/Ended semantics, shortest-hop preference, is_clone dedup, and "newer wins ties" are all preserved. Active selection stays in the producer so relay forwarders don't have to reimplement the bookkeeping.

API renames (one-liner migrations applied across every workspace caller):

Before After
publish_broadcast publish
create_broadcast create
consume_broadcast dropped (use wait_for_broadcast / try_next)
publish_only scope
consume_only scope (on OriginConsumer)
announced next (returns OriginUpdate enum)
try_announced try_next
announced_broadcast wait_for_broadcast

Also exposes BroadcastConsumer::poll_closed and is_closed so callers that need to compose close-detection without spawning have a primitive.

Test plan

  • cargo test -p moq-lite --lib model::origin — 34/34 origin tests pass
  • cargo test --workspace — all unit + integration tests pass
  • cargo build --workspace
  • just check
  • Spot-check relay + cli end-to-end (manual)

🤖 Generated with Claude Code

Replace the OriginNode/NotifyNode tree, per-publish web_async::spawn
cleanup, and per-consumer mpsc fan-out with a flat HashMap behind a
parking-lot-style Mutex plus per-consumer queues. Wakers register on
both the global state and each tracked broadcast's `poll_closed`, so
broadcast closures wake consumers directly — no spawned cleanup tasks
and no more `tokio::time::sleep(1ms)` in tests.

Renames (with one-line migrations across the workspace):
  publish_broadcast    -> publish
  create_broadcast     -> create
  consume_broadcast    -> dropped (use wait_for_broadcast / try_next)
  publish_only         -> scope
  consume_only         -> scope (on OriginConsumer)
  announced            -> next  (returns OriginUpdate enum)
  try_announced        -> try_next
  announced_broadcast  -> wait_for_broadcast

Active/Ended semantics, shortest-hop preference, is_clone dedup, and
"newer wins ties" all preserved. Active selection still lives in the
producer so relay forwarders don't have to reimplement it.

Also exposes BroadcastConsumer::poll_closed and is_closed so callers
that need to compose close-detection without spawning have a primitive.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kixelated kixelated marked this pull request as ready for review April 30, 2026 17:50
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 30, 2026

Walkthrough

A comprehensive refactoring of the MoQ-Lite origin model replaces the tuple-based broadcast publishing and announcement mechanisms with a stream-based OriginUpdate enum that carries Active(path, broadcast) and Ended(path) variants. The OriginProducer API consolidates publish_broadcast, create_broadcast, and permission-filtering methods into publish, create, and scope operations. The OriginConsumer API replaces announced() with next()/try_next() iteration, and adds wait_for_broadcast, scope, and broadcast closure tracking (is_closed, poll_closed). All call sites across examples, servers, and utilities are updated to use the new stream-based patterns and scoped access control.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the main change: rewriting Origin using a poll-driven, conducer-based model instead of the previous tree-based architecture.
Description check ✅ Passed The description comprehensively explains the architectural changes, API renames, and test plan, directly relating to the changeset scope and objectives.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch origin-rewrite
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch origin-rewrite
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch origin-rewrite

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
rs/moq-ffi/src/origin.rs (1)

118-123: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

This no longer waits for the exact broadcast path.

with_root(path) subscribes to the whole subtree under path, and Announced::available() returns the first Active update it sees. That means announced_broadcast("foo") can resolve with foo/bar if the nested broadcast arrives first. Please drive this through OriginConsumer::wait_for_broadcast(path) or store the requested path and filter for an exact match.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-ffi/src/origin.rs` around lines 118 - 123, The function
announced_broadcast currently uses self.inner.clone().with_root(path) which
subscribes to the whole subtree and returns the first Active update (via
Announced::available), allowing nested paths (e.g., "foo/bar") to satisfy a
request for "foo". Change announced_broadcast to obtain the exact broadcast
origin for the requested path by calling the consumer method that waits for an
exact match (OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping
with_root, capture and store the requested path and filter Announced::available
events until the update.path == requested_path; then construct the
MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and
return it as before.
rs/moq-lite/src/lite/subscriber.rs (1)

223-232: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle rejected origin publishes before registering the announce locally.

OriginProducer::publish() can return false here, but this path still keeps the entry in producers and spawns run_broadcast. If the publish was rejected, the announce becomes locally “active” even though no origin consumer can ever see it, and a later real announce on the same path will trip the duplicate-path check. Roll back the inserted producer and skip spawning when publish() returns false.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/subscriber.rs` around lines 223 - 232, The code
currently inserts the producer and then calls OriginProducer::publish(...); if
publish returns false you must roll back that insertion and avoid spawning
run_broadcast. Change the flow so after creating `dynamic = broadcast.dynamic()`
you call `let published = self.origin.as_mut().unwrap().publish(path.clone(),
broadcast.consume());` and if `published` is false remove the just-inserted
producer from `self.producers` (or undo whatever created the local announce) and
do not call `web_async::spawn(self.clone().run_broadcast(path, dynamic));` —
only spawn run_broadcast when publish returns true. Ensure the rollback targets
the same producer entry created earlier so duplicate-path checks remain correct.
rs/moq-lite/src/ietf/subscriber.rs (1)

415-423: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't treat a rejected publish() as a successful announce.

This migration now calls OriginProducer::publish(...), but the return value is ignored. If the origin rejects the publish, state.broadcasts still records the path and run_broadcast still starts, so later announces on that path can be rejected as duplicates even though nothing was actually published. Please fail or roll back the entry when publish() returns false.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 415 - 423, The Vacant-branch
currently ignores OriginProducer::publish(...)’s boolean result so a rejected
publish still inserts a BroadcastState and returns a Broadcast; update the
Entry::Vacant handling (the block that creates Broadcast::new().produce(), calls
origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast)
to check the publish return value and abort/roll back on false: if publish(...)
returns false, do not insert into the map (do not create or record
BroadcastState in state.broadcasts), do not start run_broadcast for that path,
and return an error or a None/appropriate failure value to the caller instead of
the broadcast; ensure you reference OriginProducer::publish, BroadcastState,
Entry::Vacant and run_broadcast when making the change so the insert and
broadcast-start are skipped/undone on rejection.
rs/moq-boy/src/input.rs (1)

72-97: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid emitting ViewerLeft directly on OriginUpdate::Ended.

This can produce duplicate/spurious offline events during path handoff (Ended may be immediately followed by Active), and the spawned command task already emits ViewerLeft when the stream actually ends.

Suggested adjustment
-		let (path, broadcast) = match update {
-			moq_lite::OriginUpdate::Active(p, b) => (p, Some(b)),
-			moq_lite::OriginUpdate::Ended(p) => (p, None),
-		};
+		let (path, broadcast) = match update {
+			moq_lite::OriginUpdate::Active(p, b) => (p, b),
+			moq_lite::OriginUpdate::Ended(p) => {
+				tracing::debug!(viewer_id = %p, "viewer broadcast ended");
+				continue;
+			}
+		};

 		let viewer_id = path.to_string();

-		if let Some(broadcast) = broadcast {
+		{
 			tracing::info!(%viewer_id, "viewer connected");
 			let cmd_tx = cmd_tx.clone();
 			let vid = viewer_id.clone();
 			tokio::spawn(async move {
 				if let Err(e) = handle_viewer_commands(&vid, broadcast, &cmd_tx).await {
 					tracing::warn!(viewer_id = %vid, error = %e, "viewer command error");
 				}
 				tracing::info!(viewer_id = %vid, "viewer disconnected");
 				let _ = cmd_tx.send(Command::ViewerLeft { viewer_id: vid }).await;
 			});
-		} else {
-			tracing::info!(%viewer_id, "viewer went offline");
-			let _ = cmd_tx
-				.send(Command::ViewerLeft {
-					viewer_id: viewer_id.clone(),
-				})
-				.await;
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-boy/src/input.rs` around lines 72 - 97, Don't send Command::ViewerLeft
when matching moq_lite::OriginUpdate::Ended; that causes duplicate/offline
events because the spawned task from the Active case already sends ViewerLeft
when the stream closes. In the match/if block around OriginUpdate::Active/Ended,
keep the existing tokio::spawn + handle_viewer_commands flow for Active
(including the send in the spawned task), but for the Ended branch remove the
cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log
(tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft
is only emitted by the spawned task handling the broadcast and prevents spurious
duplicate events.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Around line 113-126: The current scoped-consumer usage can return a descendant
(e.g., "foo/bar") first and incorrectly fail; after creating the consumer via
origin.consume().scope(&[path.as_path()]) you must drain the consumer (loop over
consumer.try_next()) until you find an OriginUpdate::Active where p.as_path() ==
path and then return Ok(b), or until try_next() returns None in which case
return Err(Error::BroadcastNotFound); keep the same origin.consume(),
scope(&[path.as_path()]), and matching on moq_lite::OriginUpdate::Active(p, b)
but iterate instead of a single try_next() call.
- Around line 135-137: The current wrapper in origin.rs calls
self.active.get_mut(origin) and then origin.publish(path, broadcast) but always
returns Ok(()), which masks rejections signaled by OriginProducer::publish()
returning false; update the code to check the boolean result of
OriginProducer::publish(path, broadcast) and propagate failure by returning an
Err variant (e.g., Err(Error::PublishRejected) or an appropriate existing Error)
when publish returns false, otherwise return Ok(()). Ensure you reference the
same origin variable from self.active.get_mut(origin) and preserve existing
error handling for get_mut (ok_or(Error::OriginNotFound)?).

---

Outside diff comments:
In `@rs/moq-boy/src/input.rs`:
- Around line 72-97: Don't send Command::ViewerLeft when matching
moq_lite::OriginUpdate::Ended; that causes duplicate/offline events because the
spawned task from the Active case already sends ViewerLeft when the stream
closes. In the match/if block around OriginUpdate::Active/Ended, keep the
existing tokio::spawn + handle_viewer_commands flow for Active (including the
send in the spawned task), but for the Ended branch remove the
cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log
(tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft
is only emitted by the spawned task handling the broadcast and prevents spurious
duplicate events.

In `@rs/moq-ffi/src/origin.rs`:
- Around line 118-123: The function announced_broadcast currently uses
self.inner.clone().with_root(path) which subscribes to the whole subtree and
returns the first Active update (via Announced::available), allowing nested
paths (e.g., "foo/bar") to satisfy a request for "foo". Change
announced_broadcast to obtain the exact broadcast origin for the requested path
by calling the consumer method that waits for an exact match
(OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping with_root,
capture and store the requested path and filter Announced::available events
until the update.path == requested_path; then construct the
MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and
return it as before.

In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 415-423: The Vacant-branch currently ignores
OriginProducer::publish(...)’s boolean result so a rejected publish still
inserts a BroadcastState and returns a Broadcast; update the Entry::Vacant
handling (the block that creates Broadcast::new().produce(), calls
origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast)
to check the publish return value and abort/roll back on false: if publish(...)
returns false, do not insert into the map (do not create or record
BroadcastState in state.broadcasts), do not start run_broadcast for that path,
and return an error or a None/appropriate failure value to the caller instead of
the broadcast; ensure you reference OriginProducer::publish, BroadcastState,
Entry::Vacant and run_broadcast when making the change so the insert and
broadcast-start are skipped/undone on rejection.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 223-232: The code currently inserts the producer and then calls
OriginProducer::publish(...); if publish returns false you must roll back that
insertion and avoid spawning run_broadcast. Change the flow so after creating
`dynamic = broadcast.dynamic()` you call `let published =
self.origin.as_mut().unwrap().publish(path.clone(), broadcast.consume());` and
if `published` is false remove the just-inserted producer from `self.producers`
(or undo whatever created the local announce) and do not call
`web_async::spawn(self.clone().run_broadcast(path, dynamic));` — only spawn
run_broadcast when publish returns true. Ensure the rollback targets the same
producer entry created earlier so duplicate-path checks remain correct.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3a30bf84-46cf-4da8-a44d-256ee6e5983f

📥 Commits

Reviewing files that changed from the base of the PR and between 68b0795 and 773c0cc.

📒 Files selected for processing (23)
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/libmoq/src/origin.rs
  • rs/moq-boy/src/input.rs
  • rs/moq-boy/src/main.rs
  • rs/moq-cli/src/client.rs
  • rs/moq-cli/src/server.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-ffi/src/origin.rs
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-gst/src/source/imp.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-native/examples/chat.rs
  • rs/moq-native/tests/backend.rs
  • rs/moq-native/tests/broadcast.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/web.rs

Comment thread rs/libmoq/src/origin.rs
Comment on lines +113 to +126
use moq_lite::AsPath;
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
// TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait
// for gossip instead of racing it.
#[allow(deprecated)]
origin.consume().consume_broadcast(path).ok_or(Error::BroadcastNotFound)
// TODO: expose an async variant so FFI callers can wait for gossip instead of racing it.
// Scope a fresh consumer to the requested path; the constructor's replay puts the
// currently-active broadcast (if any) at the head of the queue.
let path = path.as_path();
let mut consumer = origin
.consume()
.scope(&[path.as_path()])
.ok_or(Error::BroadcastNotFound)?;
match consumer.try_next() {
Some(moq_lite::OriginUpdate::Active(p, b)) if p.as_path() == path => Ok(b),
_ => Err(Error::BroadcastNotFound),
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Scoped replay is not an exact-path lookup.

scope(&[path]) still includes descendants, and the replay queue is filled in HashMap iteration order. If both foo and foo/bar are active, try_next() can yield foo/bar first and this returns BroadcastNotFound even though foo exists. Drain the scoped queue until you either find Active(p == path) or exhaust it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/libmoq/src/origin.rs` around lines 113 - 126, The current scoped-consumer
usage can return a descendant (e.g., "foo/bar") first and incorrectly fail;
after creating the consumer via origin.consume().scope(&[path.as_path()]) you
must drain the consumer (loop over consumer.try_next()) until you find an
OriginUpdate::Active where p.as_path() == path and then return Ok(b), or until
try_next() returns None in which case return Err(Error::BroadcastNotFound); keep
the same origin.consume(), scope(&[path.as_path()]), and matching on
moq_lite::OriginUpdate::Active(p, b) but iterate instead of a single try_next()
call.

Comment thread rs/libmoq/src/origin.rs
Comment on lines 135 to 137
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
origin.publish_broadcast(path, broadcast);
origin.publish(path, broadcast);
Ok(())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Propagate failed publishes instead of always returning success.

OriginProducer::publish() now signals rejection with false, but this wrapper always returns Ok(()). That turns authorization/loop-detection failures into false success for the libmoq API.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/libmoq/src/origin.rs` around lines 135 - 137, The current wrapper in
origin.rs calls self.active.get_mut(origin) and then origin.publish(path,
broadcast) but always returns Ok(()), which masks rejections signaled by
OriginProducer::publish() returning false; update the code to check the boolean
result of OriginProducer::publish(path, broadcast) and propagate failure by
returning an Err variant (e.g., Err(Error::PublishRejected) or an appropriate
existing Error) when publish returns false, otherwise return Ok(()). Ensure you
reference the same origin variable from self.active.get_mut(origin) and preserve
existing error handling for get_mut (ok_or(Error::OriginNotFound)?).

kixelated added a commit that referenced this pull request May 5, 2026
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant