Skip to content

feat(transport-contract): bounded queue observability, typed events, frame lifecycle#2072

Open
pird32 wants to merge 2 commits intopedroSG94:masterfrom
pird32:fork/rootencoder-transport-contract
Open

feat(transport-contract): bounded queue observability, typed events, frame lifecycle#2072
pird32 wants to merge 2 commits intopedroSG94:masterfrom
pird32:fork/rootencoder-transport-contract

Conversation

@pird32
Copy link
Copy Markdown

@pird32 pird32 commented Apr 16, 2026

Problem

Sender queues in RootEncoder expose raw getCacheSize() / getItemsInCache() integers with no semantic meaning. Additionally:

  1. getCacheSize() in BaseSender has a bug: it always returns the initial value (400) even after resizeCache(newSize) is called — the cacheSize field is never updated on resize.
  2. There is no way to observe queue pressure as a typed event — callers must duplicate threshold math (soft 70%, hard 85%) and cannot react to overflow without string-matching onConnectionFailed.
  3. There is no lifecycle callback for frame consumption, so callers cannot implement buffer-pool strategies to reduce heap allocations from ByteBuffer.clone().

Solution

This PR adds three capabilities to BaseSender (common module) without changing any existing sender behavior:

1. Fix getCacheSize() bug

resizeCache() now updates the cacheSize field, so getCacheSize() returns the actual queue capacity after a resize.

2. QueueSnapshot + getQueueSnapshot()

New data class QueueSnapshot(capacity, items, softThresholdPercent, hardThresholdPercent) with usageRatio and summary(). All senders inherit getQueueSnapshot() from BaseSender.

3. TransportEvent sealed class + ConnectChecker.onTransportEvent()

New TransportEvent sealed class with two variants:

  • QueueOverflow(frameType, droppedTotal, queueCapacity, queueSize) — emitted (rate-limited, 1.5 s cooldown) when sendMediaFrame() drops a frame due to full queue.
  • NetworkSendError(message, cause) — emitted from sender dispatch loops in addition to the existing onConnectionFailed() call.

ConnectChecker.onTransportEvent() is a default no-op method — existing implementations are not required to override it.

4. FrameLifecycleListener interface

New fun interface FrameLifecycleListener { fun onFrameConsumed(frame: MediaFrame) }. Added as BaseSender.frameLifecycleListener: FrameLifecycleListener? (default null). Called from each sender's dispatch loop after the frame has been fully written to the network socket. Enables callers to implement buffer pool strategies.

Changes

  • common/: QueueSnapshot.kt (new), TransportEvent.kt (new), FrameLifecycleListener.kt (new), ConnectChecker.kt (+onTransportEvent default), base/BaseSender.kt (getCacheSize fix, getQueueSnapshot, lifecycle, overflow event)
  • rtmp/RtmpSender.kt, srt/SrtSender.kt, rtsp/RtspSender.kt, udp/UdpSender.kt: call notifyFrameConsumed(frame) after dispatch, emit NetworkSendError typed event alongside existing onConnectionFailed.

Backward compatibility

  • onTransportEvent is a default method: existing ConnectChecker implementors are unaffected.
  • frameLifecycleListener defaults to null: existing senders behave identically if no listener is registered.
  • getCacheSize() fix is transparent: callers that relied on the buggy value (always 400) now receive the actual capacity.

Minimal reproduction (getCacheSize bug)

val sender: BaseSender = RtmpSender(checker, manager)
sender.resizeCache(128)
assert(sender.getCacheSize() == 128) // was: 400 (bug)

Minimal reproduction (QueueSnapshot)

val snapshot = sender.getQueueSnapshot()
Log.d(TAG, "queue: ${snapshot.summary()}") // e.g. "12/128 (9%)"

pird32 added 2 commits April 16, 2026 17:51
- BaseSender: fix getCacheSize() to track actual capacity after resizeCache()
  (was always returning initial 400 even after resizeCache(128))
- BaseSender: add getQueueSnapshot() returning QueueSnapshot(capacity, items)
- BaseSender: add frameLifecycleListener for pooled-copy buffer recycling
- BaseSender: emit rate-limited ConnectChecker.onTransportEvent(QueueOverflow)
  when sendMediaFrame() drops a frame (cooldown: 1500 ms, no flooding)
- New: QueueSnapshot data class with usageRatio and summary()
- New: TransportEvent sealed class (QueueOverflow, NetworkSendError)
- New: FrameLifecycleListener fun interface for buffer pool integration
- ConnectChecker: add onTransportEvent(TransportEvent) default no-op method

Upstream-friendliness: onTransportEvent is a default method; existing
ConnectChecker implementors are not required to override it.

Made-with: Cursor
…all senders

All four protocol senders (RtmpSender, SrtSender, RtspSender, UdpSender) now:
- Call notifyFrameConsumed(mediaFrame) after the dispatch loop finishes
  processing each frame (after network write). Wired to frameLifecycleListener
  so buffer pools can release slots at the correct point in the lifecycle.
- Emit ConnectChecker.onTransportEvent(NetworkSendError) on send errors in
  addition to the existing onConnectionFailed(String) call (backward compatible).

Made-with: Cursor
@pedroSG94
Copy link
Copy Markdown
Owner

Hello,

Thank you for the PR.
The change is interested but you did like 4 PRs with the same code. Remove the others and I will start testing it.

@pird32
Copy link
Copy Markdown
Author

pird32 commented Apr 16, 2026

Per your request, I closed the other 3 PRs and kept only this consolidated PR for review/testing. Thank you!

@pedroSG94
Copy link
Copy Markdown
Owner

Hello,

I was testing it and I didn't like execute the code in the same thread that send the frames so I decided to do this change:
99c4fc1

Basically, now the callback is called in other thread to avoid block the main sender thread and since you can't know the last frame type, I did a little modification to send video and audio dropped frames instead of only send one type.

What do you think about this change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants