Skip to content

feat(player): unify Consumer across container formats#1359

Merged
kixelated merged 4 commits into
moq-dev:mainfrom
ksletmoe-aws:fix/cmaf-sequential-decode
May 4, 2026
Merged

feat(player): unify Consumer across container formats#1359
kixelated merged 4 commits into
moq-dev:mainfrom
ksletmoe-aws:fix/cmaf-sequential-decode

Conversation

@ksletmoe-aws
Copy link
Copy Markdown
Contributor

@ksletmoe-aws ksletmoe-aws commented Apr 28, 2026

Summary

Replace the two separate consumer implementations (Legacy and CMAF) with a single generic Consumer class that accepts a ContainerFormat strategy for frame parsing. This mirrors the Rust moq-mux Consumer<F: Container> pattern and eliminates ~90% code duplication between the two existing consumers.

Additionally, add a sequential delivery mode flag to fix audio stuttering caused by inter-group serialization. When sequential: false, frames are delivered from any buffered group as soon as available (lowest-PTS first), avoiding the stall at group boundaries where next() would block waiting for the transport to signal group EOF. Audio codecs (AAC/Opus) are all-keyframe and don't need ordering guarantees, so this is safe. Video retains sequential: true (the default) for keyframe dependency correctness.

Changes

New files

  • container/format.tsContainerFormat interface and DecodedFrame type
  • container/consumer.ts — Unified Consumer class with sequential/non-sequential delivery
  • container/cmaf/format.tsCmafFormat implementation (delegates to decodeDataSegment)
  • container/types.ts — Shared Frame, BufferedRanges, mergeBufferedRanges
  • container/consumer.test.ts — 25 tests covering both formats and edge cases

Modified files

  • container/legacy.ts — Added LegacyFormat, removed old Consumer class (kept Producer)
  • container/cmaf/index.ts — Re-export CmafFormat
  • container/index.ts — Re-export Consumer, ConsumerProps, shared types
  • watch/audio/decoder.ts — Use Consumer with sequential: false for both legacy and CMAF
  • watch/audio/mse.ts — Use Consumer
  • watch/video/decoder.ts — Use Consumer (default sequential: true)
  • watch/video/mse.ts — Use Consumer

Deleted files

  • container/legacy.test.ts — Tests migrated to unified consumer.test.ts

Design decisions

  • sequential flag over separate classes: One consumer with a flag is better than two classes sharing 90% of their code. The flag controls whether next() gates on #active group completion or returns frames from any group.
  • Non-sequential uses lowest-PTS selection: When multiple groups have buffered frames, next() picks the frame with the lowest PTS. This preserves temporal ordering without requiring inter-group serialization.
  • Keyframe override at index 0: Protocol invariant — groups always start at a keyframe. Enforced regardless of what the format reports. Subsequent frames trust the format's keyframe detection.
  • Error handling drops entire group: On any decode error, the group's frames are cleared. Matches CMAF behavior where partial groups are unrecoverable.

Net impact

+997 / -1,157 lines across 13 files. The unified consumer is ~290 lines; the two old consumers totaled ~450 lines combined (plus ~700 lines of duplicate tests).

@ksletmoe-aws ksletmoe-aws marked this pull request as ready for review April 28, 2026 22:07
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 28, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Audio and video CMAF decoding were refactored to consume frames via a new Container.Cmaf.Consumer using its async next() interface; decoding loops now decode EncodedAudioChunk/frames and update receipt timestamps and bytes received, exiting when decoders close. Buffered-range merging was centralized to Container.mergeBufferedRanges and the prior local helper removed. A new Consumer class and CmafConsumerProps were added, container types (Frame, BufferedRange, BufferedRanges) and mergeBufferedRanges were introduced, CMAF consumer tests were added, and container index files were updated to re-export the new modules. Public Consumer surface: buffered getter, next(), and close().

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The pull request description clearly describes a refactoring that unifies two separate consumer implementations into a single generic Consumer class and adds sequential delivery mode support, which directly aligns with the changeset of adding Consumer, ContainerFormat, and related types while removing duplicate implementations.
Title check ✅ Passed The PR title 'feat(player): unify Consumer across container formats' accurately describes the main architectural change: introducing a unified Consumer pattern (CMAF Consumer) across container formats by extracting shared type definitions and refactoring both legacy and CMAF paths to use a common interface.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get your free trial and get 200 agent minutes per Slack user (a $50 value).


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
js/watch/src/audio/decoder.ts (1)

292-324: Consider extracting the sequential CMAF group walker.

This recvGroup()readFrame()finally { group.close(); } flow now exists here and in js/watch/src/video/decoder.ts. Pulling it into a shared helper would make future ordering/cancellation fixes much harder to drift out of sync.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/audio/decoder.ts` around lines 292 - 324, The recvGroup() →
readFrame() → finally { group.close(); } sequential CMAF group walking logic is
duplicated in audio and video decoders; extract it into a shared helper (e.g.,
walkCmafGroups or CmafGroupWalker) that accepts the same cancellation token
(effect.cancel) and a per-segment callback, and have
js/watch/src/audio/decoder.ts and js/watch/src/video/decoder.ts call that
helper; ensure the helper preserves ordering, calls group.close() in a finally
block, exposes the segment objects for Container.Cmaf.decodeDataSegment and the
creation/decoding of EncodedAudioChunk/EncodedVideoChunk remains in the callers,
and uses the same Promise.race cancellation pattern with recvGroup() and
readFrame() to avoid changing runtime behavior.
js/watch/src/video/decoder.ts (1)

380-427: Please add a single-large-frame CMAF regression test.

This ordering bug only shows up when each group resolves immediately, so it is easy to miss in broader playback coverage. A focused test that proves group N+1 cannot reach decoder.decode() before group N’s keyframe would lock this fix in.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/video/decoder.ts` around lines 380 - 427, Add a focused
regression test that constructs a CMAF stream composed of multiple MoQ groups
where each group’s segment promise resolves immediately (to reproduce the race),
then subscribes to the decoder path that uses sub.recvGroup(),
group.readFrame(), Container.Cmaf.decodeDataSegment and ultimately calls
decoder.decode; assert that decoder.decode is never invoked for any frame from
group N+1 before a keyframe from group N has been delivered (e.g., by
instrumenting or mocking decoder.decode calls with timestamps and ensuring the
first decoded timestamp per group is a keyframe and that ordering across groups
is preserved), and ensure the test covers the single-large-frame case so groups
resolve quickly to reproduce the original bug.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@js/watch/src/audio/decoder.ts`:
- Around line 292-324: The recvGroup() → readFrame() → finally { group.close();
} sequential CMAF group walking logic is duplicated in audio and video decoders;
extract it into a shared helper (e.g., walkCmafGroups or CmafGroupWalker) that
accepts the same cancellation token (effect.cancel) and a per-segment callback,
and have js/watch/src/audio/decoder.ts and js/watch/src/video/decoder.ts call
that helper; ensure the helper preserves ordering, calls group.close() in a
finally block, exposes the segment objects for Container.Cmaf.decodeDataSegment
and the creation/decoding of EncodedAudioChunk/EncodedVideoChunk remains in the
callers, and uses the same Promise.race cancellation pattern with recvGroup()
and readFrame() to avoid changing runtime behavior.

In `@js/watch/src/video/decoder.ts`:
- Around line 380-427: Add a focused regression test that constructs a CMAF
stream composed of multiple MoQ groups where each group’s segment promise
resolves immediately (to reproduce the race), then subscribes to the decoder
path that uses sub.recvGroup(), group.readFrame(),
Container.Cmaf.decodeDataSegment and ultimately calls decoder.decode; assert
that decoder.decode is never invoked for any frame from group N+1 before a
keyframe from group N has been delivered (e.g., by instrumenting or mocking
decoder.decode calls with timestamps and ensuring the first decoded timestamp
per group is a keyframe and that ordering across groups is preserved), and
ensure the test covers the single-large-frame case so groups resolve quickly to
reproduce the original bug.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5afa873e-6973-4896-af46-38461a360fa0

📥 Commits

Reviewing files that changed from the base of the PR and between 216c9f1 and 8f19866.

📒 Files selected for processing (2)
  • js/watch/src/audio/decoder.ts
  • js/watch/src/video/decoder.ts

@kixelated
Copy link
Copy Markdown
Collaborator

I think we need a generic OrderedConsumer. The problem is that recvGroup (and MoQ in general) returns groups out-of-order.

The idea behind OrderedConsumer is that we skip groups based on the target latency, which requires timestamp information unfortunately. Otherwise we would end up skipping groups immediately (latency = 0).

@ksletmoe-aws
Copy link
Copy Markdown
Contributor Author

Ah, that makes sense. I'll do that, thanks!

@kixelated
Copy link
Copy Markdown
Collaborator

Ah, that makes sense. I'll do that, thanks!

On the Rust side, I made an interface to parse the timestamp out of each frame. Then OrderedConsumer can be reusable.

ksletmoe-aws added a commit to ksletmoe-aws/moq that referenced this pull request Apr 29, 2026
…skipping

Replace the sequential recvGroup() loops in the video and audio CMAF
decoder paths with a new Container.Cmaf.Consumer that provides:
- Ordered group delivery (reorders out-of-order arrivals)
- Latency-based group skipping (drops stale groups when buffer exceeds target)
- Network jitter buffer signal (merged with decode buffer for UI)
- Decoder state guard to prevent 'decode on closed codec' errors

Also extracts shared types (Frame, BufferedRanges, mergeBufferedRanges)
into container/types.ts for reuse across container formats.

Supersedes moq-dev#1359.
@ksletmoe-aws ksletmoe-aws force-pushed the fix/cmaf-sequential-decode branch from e36e253 to 58b8c20 Compare April 29, 2026 18:50
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
js/hang/src/container/cmaf/consumer.ts (1)

234-243: Consider noting that close() may double-close groups.

Both the #signals.cleanup() callback (lines 40-46) and close() iterate over #groups and call group.consumer.close(). This is safe since Moq.Group.close() is idempotent, but the redundancy could be simplified by having close() only call #signals.close() and letting the cleanup callback handle group cleanup.

♻️ Optional: Rely solely on cleanup callback
 	close(): void {
 		this.#signals.close();
-
-		for (const group of this.#groups) {
-			group.consumer.close();
-			group.frames.length = 0;
-		}
-
-		this.#groups.length = 0;
 	}

This simplification works because #signals.close() triggers the cleanup callback which already performs the same operations.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/hang/src/container/cmaf/consumer.ts` around lines 234 - 243, The close()
method currently both calls this.#signals.close() and then iterates this.#groups
to call group.consumer.close() and clear frames/array, duplicating work
performed by the `#signals` cleanup callback; simplify by removing the for-loop
and final this.#groups.length = 0 from close() so close() only calls
this.#signals.close(), relying on the cleanup callback to invoke
group.consumer.close() and clear groups (leave Moq.Group.close() calls to the
existing cleanup handler).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@js/hang/src/container/cmaf/consumer.ts`:
- Around line 234-243: The close() method currently both calls
this.#signals.close() and then iterates this.#groups to call
group.consumer.close() and clear frames/array, duplicating work performed by the
`#signals` cleanup callback; simplify by removing the for-loop and final
this.#groups.length = 0 from close() so close() only calls
this.#signals.close(), relying on the cleanup callback to invoke
group.consumer.close() and clear groups (leave Moq.Group.close() calls to the
existing cleanup handler).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9539a45e-afad-4d0c-b9a3-7ca4d1653c00

📥 Commits

Reviewing files that changed from the base of the PR and between 8f19866 and 58b8c20.

📒 Files selected for processing (8)
  • js/hang/src/container/cmaf/consumer.test.ts
  • js/hang/src/container/cmaf/consumer.ts
  • js/hang/src/container/cmaf/index.ts
  • js/hang/src/container/index.ts
  • js/hang/src/container/legacy.ts
  • js/hang/src/container/types.ts
  • js/watch/src/audio/decoder.ts
  • js/watch/src/video/decoder.ts
✅ Files skipped from review due to trivial changes (1)
  • js/hang/src/container/index.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • js/watch/src/video/decoder.ts

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
js/hang/src/container/cmaf/consumer.ts (2)

22-33: Add docs for the public API contract

Consumer, buffered, next(), and close() are exported/public but undocumented. next() especially has non-obvious behavior ({ frame: undefined, group } boundary marker and single-caller restriction) that should be explicit.

As per coding guidelines, "Document public APIs with clear docstrings or comments".

Also applies to: 181-220, 248-257

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/hang/src/container/cmaf/consumer.ts` around lines 22 - 33, Add
documentation comments for the public API: document the Consumer class and its
public members buffered, next(), and close() to explain their contract and
non-obvious behaviors — specifically: describe what the buffered Getter holds
(semantics of BufferedRanges), explain next()’s possible return shapes including
the boundary marker { frame: undefined, group }, the single-caller restriction
and how callers should handle awaiting/locking, and the lifecycle/guarantees of
close() (what it does to pending next() calls and buffered state). Place these
docstrings above the Consumer class and above the buffered, next, and close
declarations (and relevant internal signal usage if helpful) so the exported API
is clearly documented for callers.

111-113: Don’t silently swallow decode/read failures

Dropping a group with an empty catch makes production debugging very hard (corrupt segment vs parser bug vs transport issue). Log sequence + error at least once before dropping.

💡 Proposed fix
-		} catch (_err) {
-			// Drop the entire group on any error.
+		} catch (err) {
+			// Drop the entire group on any error, but keep observability.
+			console.warn(`dropping CMAF group ${group.consumer.sequence}`, err);
 		} finally {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/hang/src/container/cmaf/consumer.ts` around lines 111 - 113, The empty
catch swallowing errors around "catch (_err) { ... }" should be replaced with a
one-line log that reports the error and the group's identifying info before
dropping it: inside the catch, call the module's logger (e.g., logger.error or
this.logger.error; fallback to console.error) and include both the caught error
(_err) and the group's identifier (use the available variable like sequence,
sequenceNumber, group.id or the group object) so you record "failed to
decode/read group <id|sequence>: <error>" once, then continue to dropping logic
in finally.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@js/hang/src/container/cmaf/consumer.ts`:
- Around line 134-171: The latency-skip loop in `#checkLatency`() only runs when
this.#groups.length >= 2, which can deadlock next() if there is a sequence gap
and only one newer group is buffered; change the loop to run when
this.#groups.length >= 1 so a single newer out-of-order group can be evaluated
and skipped after the latency threshold; keep the existing timestamp/now logic
and the shift/update of this.#active and ensure removed.consumer.close() and
frames clearing remain executed so next() can make progress.

---

Nitpick comments:
In `@js/hang/src/container/cmaf/consumer.ts`:
- Around line 22-33: Add documentation comments for the public API: document the
Consumer class and its public members buffered, next(), and close() to explain
their contract and non-obvious behaviors — specifically: describe what the
buffered Getter holds (semantics of BufferedRanges), explain next()’s possible
return shapes including the boundary marker { frame: undefined, group }, the
single-caller restriction and how callers should handle awaiting/locking, and
the lifecycle/guarantees of close() (what it does to pending next() calls and
buffered state). Place these docstrings above the Consumer class and above the
buffered, next, and close declarations (and relevant internal signal usage if
helpful) so the exported API is clearly documented for callers.
- Around line 111-113: The empty catch swallowing errors around "catch (_err) {
... }" should be replaced with a one-line log that reports the error and the
group's identifying info before dropping it: inside the catch, call the module's
logger (e.g., logger.error or this.logger.error; fallback to console.error) and
include both the caught error (_err) and the group's identifier (use the
available variable like sequence, sequenceNumber, group.id or the group object)
so you record "failed to decode/read group <id|sequence>: <error>" once, then
continue to dropping logic in finally.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7a4652c0-6d8c-4485-b385-3f1f1648d540

📥 Commits

Reviewing files that changed from the base of the PR and between 58b8c20 and 43b5e0e.

📒 Files selected for processing (3)
  • js/hang/src/container/cmaf/consumer.ts
  • js/watch/src/audio/decoder.ts
  • js/watch/src/video/decoder.ts
✅ Files skipped from review due to trivial changes (1)
  • js/watch/src/video/decoder.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • js/watch/src/audio/decoder.ts

Comment thread js/hang/src/container/cmaf/consumer.ts
@ksletmoe-aws ksletmoe-aws force-pushed the fix/cmaf-sequential-decode branch from 43b5e0e to 986cab4 Compare April 29, 2026 21:27
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
js/hang/src/container/cmaf/consumer.ts (1)

134-171: ⚠️ Potential issue | 🔴 Critical

Latency-skip loop can still deadlock on a single buffered out-of-order group.

At Line 134, requiring this.#groups.length >= 2 prevents skip evaluation when exactly one newer group is buffered; next() can then wait forever behind a missing sequence.

Proposed fix
-		while (this.#groups.length >= 2) {
+		while (this.#groups.length > 0) {
 			const first = this.#groups[0];
 			const firstPts = first.frames.at(0)?.timestamp;
 			if (firstPts === undefined) break;
@@
 			} else {
+				// PTS-span fallback needs at least two groups.
+				if (this.#groups.length < 2) break;
 				// Fallback: compare PTS span across buffered groups.
 				const thresholdMicro = Moq.Time.Micro.fromMilli(threshold);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/hang/src/container/cmaf/consumer.ts` around lines 134 - 171, The loop
currently checks this.#groups.length >= 2 which prevents latency-driven skipping
when exactly one newer out-of-order group is buffered; change the loop to run
while (this.#groups.length > 0) so the same latency checks run even with a
single buffered group, keep the existing PTS/now logic, then after removing a
group update this.#active = this.#groups[0]?.consumer.sequence (or undefined if
none) and proceed to call removed.consumer.close(), clear removed.frames, and
set skipped = true as before so next() won't deadlock waiting for the missing
sequence.
🧹 Nitpick comments (2)
js/watch/src/audio/decoder.ts (1)

306-311: Consider extracting shared frame bookkeeping into a small helper.

The sync.received(...) and bytesReceived updates are duplicated between legacy and CMAF decode loops, which is easy to drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/audio/decoder.ts` around lines 306 - 311, Duplicate bookkeeping
for frames (calling this.source.sync.received(...) and updating
this.#stats.bytesReceived) should be extracted into a small helper to avoid
drift between the legacy and CMAF decode loops; add a private method (e.g.,
recordReceivedFrame or updateFrameStats) that accepts the frame (or timestamp
and byteLength), calls Time.Milli.fromMicro(frame.timestamp) then
this.source.sync.received(timestamp, "audio"), and performs
this.#stats.update(...) to add frame.data.byteLength, and replace the duplicated
blocks in both decode loops with calls to that helper.
js/hang/src/container/cmaf/consumer.ts (1)

22-33: Add doc comments for the public Consumer API semantics.

next() returning { frame: undefined, group } as a boundary signal (and throwing on concurrent calls) is non-obvious for callers and should be explicitly documented.

As per coding guidelines "Document public APIs with clear docstrings or comments".

Also applies to: 181-220, 248-257

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/hang/src/container/cmaf/consumer.ts` around lines 22 - 33, Add clear doc
comments to the public Consumer API (class Consumer) describing semantics for
callers: document the behavior of next() including that it may return an object
with frame === undefined and a group to signal a group-boundary, that next()
throws if called concurrently, and what callers should do on those boundary
values; also document buffered (readonly buffered getter), groups and any public
signals accessed by consumers so callers know concurrency and lifecycle
expectations (refer to Consumer.next, Consumer.buffered, and Consumer.#groups in
the comments). Ensure these docstrings appear where Consumer is declared and
additionally annotate the method implementations/usage sites around lines
181-220 and 248-257 to match the guidelines.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@js/hang/src/container/cmaf/consumer.ts`:
- Around line 111-114: The catch block currently leaves previously pushed frames
in group.frames so they can still be emitted; inside the catch handler (the
catch block around the decode/processing loop where group.done is later set)
explicitly clear or discard the group's queued frames (e.g., set group.frames =
[] or mark a group.dropped flag and ensure emitters check it) before exiting,
and keep group.done = true in the finally; update any emission logic to respect
the new dropped flag if used so no frames from a failed group are emitted.

---

Duplicate comments:
In `@js/hang/src/container/cmaf/consumer.ts`:
- Around line 134-171: The loop currently checks this.#groups.length >= 2 which
prevents latency-driven skipping when exactly one newer out-of-order group is
buffered; change the loop to run while (this.#groups.length > 0) so the same
latency checks run even with a single buffered group, keep the existing PTS/now
logic, then after removing a group update this.#active =
this.#groups[0]?.consumer.sequence (or undefined if none) and proceed to call
removed.consumer.close(), clear removed.frames, and set skipped = true as before
so next() won't deadlock waiting for the missing sequence.

---

Nitpick comments:
In `@js/hang/src/container/cmaf/consumer.ts`:
- Around line 22-33: Add clear doc comments to the public Consumer API (class
Consumer) describing semantics for callers: document the behavior of next()
including that it may return an object with frame === undefined and a group to
signal a group-boundary, that next() throws if called concurrently, and what
callers should do on those boundary values; also document buffered (readonly
buffered getter), groups and any public signals accessed by consumers so callers
know concurrency and lifecycle expectations (refer to Consumer.next,
Consumer.buffered, and Consumer.#groups in the comments). Ensure these
docstrings appear where Consumer is declared and additionally annotate the
method implementations/usage sites around lines 181-220 and 248-257 to match the
guidelines.

In `@js/watch/src/audio/decoder.ts`:
- Around line 306-311: Duplicate bookkeeping for frames (calling
this.source.sync.received(...) and updating this.#stats.bytesReceived) should be
extracted into a small helper to avoid drift between the legacy and CMAF decode
loops; add a private method (e.g., recordReceivedFrame or updateFrameStats) that
accepts the frame (or timestamp and byteLength), calls
Time.Milli.fromMicro(frame.timestamp) then this.source.sync.received(timestamp,
"audio"), and performs this.#stats.update(...) to add frame.data.byteLength, and
replace the duplicated blocks in both decode loops with calls to that helper.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d95a7d4e-cf59-47cf-bd06-50a2aad117e5

📥 Commits

Reviewing files that changed from the base of the PR and between 43b5e0e and 986cab4.

📒 Files selected for processing (8)
  • js/hang/src/container/cmaf/consumer.test.ts
  • js/hang/src/container/cmaf/consumer.ts
  • js/hang/src/container/cmaf/index.ts
  • js/hang/src/container/index.ts
  • js/hang/src/container/legacy.ts
  • js/hang/src/container/types.ts
  • js/watch/src/audio/decoder.ts
  • js/watch/src/video/decoder.ts
✅ Files skipped from review due to trivial changes (2)
  • js/hang/src/container/index.ts
  • js/hang/src/container/legacy.ts
🚧 Files skipped from review as they are similar to previous changes (3)
  • js/hang/src/container/cmaf/index.ts
  • js/hang/src/container/types.ts
  • js/watch/src/video/decoder.ts

Comment thread js/hang/src/container/consumer.ts
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
js/hang/src/container/cmaf/consumer.ts (1)

252-262: Minor redundancy with registered cleanup callback.

The cleanup callback registered in the constructor (lines 44-50) already closes consumers and clears #groups when #signals.close() is called. The explicit loop here is redundant unless you need to ensure frames arrays are cleared before the cleanup callback runs (for eager memory release).

Consider consolidating cleanup logic in one place to avoid potential confusion about ordering.

♻️ Option 1: Remove redundant code if cleanup is synchronous
 close(): void {
     this.#signals.close();
-
-    for (const group of this.#groups) {
-        group.consumer.close();
-        group.frames.length = 0;
-    }
-
-    this.#groups.length = 0;
 }
♻️ Option 2: Move all cleanup to close() if explicit frame clearing is needed
 constructor(track: Moq.Track, props: CmafConsumerProps) {
     // ...
     this.#signals.spawn(this.#run.bind(this));
-    this.#signals.cleanup(() => {
-        this.#track.close();
-        for (const group of this.#groups) {
-            group.consumer.close();
-        }
-        this.#groups.length = 0;
-    });
+    this.#signals.cleanup(() => this.#cleanup());
 }

+#cleanup(): void {
+    this.#track.close();
+    for (const group of this.#groups) {
+        group.consumer.close();
+        group.frames.length = 0;
+    }
+    this.#groups.length = 0;
+}
+
 close(): void {
     this.#signals.close();
-
-    for (const group of this.#groups) {
-        group.consumer.close();
-        group.frames.length = 0;
-    }
-
-    this.#groups.length = 0;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/hang/src/container/cmaf/consumer.ts` around lines 252 - 262, The close()
method currently calls this.#signals.close() and then manually iterates over
this.#groups to close each group.consumer and clear group.frames before clearing
this.#groups, which duplicates the cleanup already registered in the constructor
(the cleanup callback bound to signal close in the constructor). Decide one
approach and consolidate: either remove the explicit loop in close() and rely on
the constructor-registered cleanup (leave only this.#signals.close() and
this.#groups.length = 0 if needed), or move all cleanup logic into close()
(remove the constructor cleanup registration) so close() alone handles closing
group.consumer and zeroing group.frames and then clears this.#groups; update
only the close(), constructor cleanup registration, and any references to
`#groups` or the registered callback to keep a single authoritative cleanup path.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@js/hang/src/container/cmaf/consumer.ts`:
- Around line 252-262: The close() method currently calls this.#signals.close()
and then manually iterates over this.#groups to close each group.consumer and
clear group.frames before clearing this.#groups, which duplicates the cleanup
already registered in the constructor (the cleanup callback bound to signal
close in the constructor). Decide one approach and consolidate: either remove
the explicit loop in close() and rely on the constructor-registered cleanup
(leave only this.#signals.close() and this.#groups.length = 0 if needed), or
move all cleanup logic into close() (remove the constructor cleanup
registration) so close() alone handles closing group.consumer and zeroing
group.frames and then clears this.#groups; update only the close(), constructor
cleanup registration, and any references to `#groups` or the registered callback
to keep a single authoritative cleanup path.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dccf22cd-5586-4155-846a-e78f64ed3301

📥 Commits

Reviewing files that changed from the base of the PR and between 986cab4 and d78ef53.

📒 Files selected for processing (1)
  • js/hang/src/container/cmaf/consumer.ts

@ksletmoe-aws ksletmoe-aws marked this pull request as draft April 30, 2026 01:11
@ksletmoe-aws ksletmoe-aws force-pushed the fix/cmaf-sequential-decode branch 3 times, most recently from ec03c72 to 965549e Compare April 30, 2026 01:42
@ksletmoe-aws
Copy link
Copy Markdown
Contributor Author

Addressed the duplicate cleanup nitpick — close() now just calls this.#signals.close(), which triggers the constructor-registered cleanup callback. Single authoritative cleanup path, no redundancy.

@ksletmoe-aws ksletmoe-aws changed the title fix(watch): process CMAF groups sequentially in WebCodecs decoder feat(hang): unify OrderedConsumer across container formats Apr 30, 2026
@ksletmoe-aws ksletmoe-aws changed the title feat(hang): unify OrderedConsumer across container formats feat(hang): unify Consumer across container formats Apr 30, 2026
@ksletmoe-aws ksletmoe-aws changed the title feat(hang): unify Consumer across container formats feat(player): unify Consumer across container formats Apr 30, 2026
@ksletmoe-aws
Copy link
Copy Markdown
Contributor Author

ksletmoe-aws commented Apr 30, 2026

This PR grew a bit from the original fix — I took the opportunity to create a unified Consumer that mirrors the Rust Consumer<F: Container> pattern.

What started it: The CMAF decoder paths were processing groups via parallel effect.spawn() calls with no ordering guarantees. This worked for audio (all keyframes) but was fragile for video (keyframe dependencies). Luke's comment about needing a generic OrderedConsumer with timestamp-based group skipping was the nudge to do it properly.

What it does now:

  • Single Consumer class that accepts a ContainerFormat strategy (CMAF or Legacy), replacing ~450 lines of duplicate consumer code across two implementations
  • sequential flag (default true) controls whether frame delivery is serialized across groups. Video uses sequential delivery for keyframe correctness; audio uses non-sequential since every frame is independently decodable
  • Wall-clock latency skipping for all paths — the old CMAF audio path had none (there was a TODO)
  • Non-sequential mode picks the lowest-PTS frame across all buffered groups, which is actually better cross-group ordering than the old fire-and-forget spawns

Testing: I noticed frequent audio underflows and late frames during integration testing and initially suspected the Consumer changes. After A/B testing against main, the pattern is identical on both branches — it's a pre-existing issue with the postMessage audio buffer fallback when SharedArrayBuffer is unavailable (the dev server doesn't serve the required COOP/COEP headers). At ~20ms target latency the postMessage path can't keep up; bumping to 70-80ms eliminates it. Separate issue to fix. All 25 unit tests pass, both @moq/hang and @moq/watch type-check clean.

Validated all three consumer paths against main:

CMAF (just pub bbb) — fMP4 via ffmpeg, exercises CmafFormat through Consumer
Legacy (just pub h264 bbb) — H.264 Annex B, exercises LegacyFormat through Consumer
HLS/MSE (just pub hls bbb) — HLS fMP4 passthrough, exercises the MSE consumer paths

All three play back identically across both branches. Same audio/video underflow patterns on main vs the PR. 25 unit tests pass, @moq/hang and @moq/watch type-check clean.

Net: +997 / -1,157 lines across 13 files. Ready for review.

@ksletmoe-aws ksletmoe-aws marked this pull request as ready for review April 30, 2026 21:16
…ivery and CMSF audio fixes

Replace two separate consumer implementations (Legacy and CMAF) with a
single Consumer class that accepts a ContainerFormat strategy for frame
parsing, mirroring the Rust Consumer<F: Container> pattern.

Consumer changes:
- ContainerFormat interface with CmafFormat and LegacyFormat impls
- 'sequential' flag (default true): video uses sequential delivery for
  keyframe correctness, audio uses non-sequential since every frame is
  independently decodable
- Non-sequential mode returns lowest-PTS frame from any buffered group
- Wall-clock latency skipping for all paths (was missing for CMAF audio)
- Shared types (Frame, BufferedRanges, mergeBufferedRanges)

CMSF/MSF audio fixes:
- Extract AudioSpecificConfig from esds descriptor chain instead of
  returning the full esds payload (fixes AAC AudioDecoder.configure()
  rejection on the MSF catalog path)
- Omit description for Opus across all decoder paths (dOps is not a
  valid OGG Identification Header for WebCodecs)

+997 / -1,157 lines across 13 files. 25 unit tests.
@ksletmoe-aws ksletmoe-aws force-pushed the fix/cmaf-sequential-decode branch from a23f967 to cf38ffd Compare April 30, 2026 22:09
@ksletmoe-aws
Copy link
Copy Markdown
Contributor Author

Sorry for the churn on this one — the commit history is messier than it should be. In hindsight I should have closed the original PR and opened a fresh one once the scope expanded from a targeted fix to a full consumer unification. Lesson learned for next time.

The latest commits add two fixes found during CMSF integration testing:

esds descriptor parsing — extractDescription was returning the full esds box payload instead of just the AudioSpecificConfig. Chrome's AudioDecoder.configure() expects the raw 2-byte AAC-LC config, not the entire ES_Descriptor chain. Added proper parsing to walk the descriptor tags and extract the DecoderSpecificInfo (tag 0x05) payload.

Opus description omission — Opus in CMAF uses raw packets, not OGG-wrapped. The dOps box from the init segment is not a valid OGG Identification Header, so passing it as description to AudioDecoder.configure() causes a rejection. Now omitted for Opus across all decoder paths.

Both of these only affect the MSF/CMSF catalog path (the hang catalog path already had correct descriptions from the Rust muxer). I squashed down to one commit. This is now actually ready to review.

@kixelated
Copy link
Copy Markdown
Collaborator

No worries, I'll take a look at it soon.

Copy link
Copy Markdown
Collaborator

@kixelated kixelated left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to avoid changes to the Consumer logic for now. Otherwise definitely the right direction

Comment thread js/hang/src/container/consumer.ts Outdated
format: ContainerFormat;
latency?: Signal<Time.Milli> | Time.Milli;
/** Returns the PTS that should be rendering right now, or undefined if unknown. */
now?: () => Time.Milli | undefined;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to have this, because to use it "correctly" requires signals and sleeping. The existing consumer effectively uses latency as a max_buffer_size instead.

A separate PR would make sense if you want to change the algorithm but I'm scared. This is Consumer was really hard to write and has gone through multiple revisions. We should just make the container generic (via an interface) instead of redesigning it.

Comment thread js/hang/src/container/consumer.ts Outdated
now?: () => Time.Milli | undefined;
/** When false, frames are delivered from any group as soon as available (no inter-group serialization).
* Useful for audio where every frame is independently decodable. Default: true. */
sequential?: boolean;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this? I don't know what happens when you feed an AudioDecoder out of order and I don't want to find out.

Comment thread js/hang/src/container/format.ts Outdated
@@ -0,0 +1,12 @@
import type { Time } from "@moq/lite";

export interface DecodedFrame {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Just call it Frame or something? This could be used for encoding too.

Comment thread js/watch/src/audio/decoder.ts Outdated
// TODO include JITTER_UNDERHEAD
const consumer = new Container.Legacy.Consumer(sub, {
const consumer = new Container.Consumer(sub, {
format: new Container.Legacy.LegacyFormat(),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legacy.LegacyFormat should be avoided IMO.

@@ -0,0 +1,33 @@
import { Time } from "@moq/lite";

export interface Frame {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reuse Frame and DecodedFrame

…eview

Address Luke's review on PR moq-dev#1359:
- Revert Consumer to main's logic (PTS-span latency, sequential-only delivery)
- Add ContainerFormat strategy interface (CmafFormat, Legacy.Format)
- Remove now() wall-clock prop and sequential flag
- Consolidate DecodedFrame into Frame, rename LegacyFormat to Format
- Keep: keyframe index-0 override, catch-block frame cleanup, esds/Opus fixes
…eview

Address Luke's review on PR moq-dev#1359:
- Revert Consumer to main's logic (PTS-span latency, sequential-only delivery)
- Add ContainerFormat strategy interface (CmafFormat, Legacy.Format)
- Remove now() wall-clock prop and sequential flag
- Consolidate DecodedFrame into Frame, rename LegacyFormat to Format
- Keep: keyframe index-0 override, catch-block frame cleanup, esds/Opus fixes
- Fix: relax duration validation in decodeDataSegment for single-sample CMAF fragments
@ksletmoe-aws ksletmoe-aws force-pushed the fix/cmaf-sequential-decode branch from 59f44f3 to 648a7b2 Compare May 4, 2026 20:25
@ksletmoe-aws
Copy link
Copy Markdown
Contributor Author

Made one last change specifically for CMSF playback -- PR should be ready to re-review when you have time. Thanks!

@kixelated
Copy link
Copy Markdown
Collaborator

I'll have Claude fix a few nits and then merge.

Container.ContainerFormat -> Container.Format, Container.Cmaf.CmafFormat
-> Container.Cmaf.Format, matching the existing Container.Legacy.Format
style so the namespace carries the qualifier.

Also restore legacy behavior on partial decode errors: keep the frames
that decoded successfully before the error instead of dropping the whole
group. Stream RESET or a corrupt tail frame no longer wipes the rest of
the GoP.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kixelated kixelated enabled auto-merge (squash) May 4, 2026 21:13
@kixelated kixelated merged commit cbffd39 into moq-dev:main May 4, 2026
1 check passed
@ksletmoe-aws ksletmoe-aws deleted the fix/cmaf-sequential-decode branch May 4, 2026 21:25
kixelated added a commit that referenced this pull request May 4, 2026
Resolves the player-side conflicts from #1359 (unified Consumer across
container formats) against this PR's catalog change (Container::Cmaf
stores the base64 init segment instead of a raw timescale field).

- Container.Cmaf.Format now takes a parsed InitSegment instead of a bare
  timescale, since the catalog no longer carries timescale separately.
- audio/video decoders decode the init from config.container.init and
  pass the parsed init to Container.Cmaf.Format.
- Kept main's Opus-CMAF special case (description omitted because raw
  Opus packets aren't OGG-wrapped) and the sample-duration validation in
  decodeDataSegment.
- Updated consumer.test.ts to construct a synthetic InitSegment.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants