Skip to content

Gating excess preroll buffered media to reduce e2e latency#386

Merged
milos-lk merged 12 commits intomainfrom
trim-probes
Oct 27, 2025
Merged

Gating excess preroll buffered media to reduce e2e latency#386
milos-lk merged 12 commits intomainfrom
trim-probes

Conversation

@milos-lk
Copy link
Copy Markdown
Contributor

Preroll buffer could be filed with multiple (tens) seconds of media data. That data is then being processed as live - which increases the RTMP ingress streams latency.

Idea behind the solution is to drop excess data after decoding but before encoding (to ensure we don't have e.g key frame losses). To achieve it - a buffer pad probe is installed on ghost pads after decodebin. That probe measures rate of packets arrival and as long as it's greater (with some margin) than estimated wall-clock rate for multiple evaluation windows - it keeps dropping packets. The same process is applied to all input streams (pads) and as soon as they all stabilize the max offset is then used for all of them (to make sure streams are in sync). If not all pads stabilize - there is a fallback timer kicking after 3s which takes the max current offsets - sets it for all streams and stops gating process just to make sure ingress doesn't stall.

Tested with adding artificial delay in preroll buffer (at the beginning of SetWriter call) to simulate lateness. Used ffmpeg and OBS studio for sending streams.

Copy link
Copy Markdown

@boks1971 boks1971 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't understand it very well, but lgtm as far as I understood,

left a note about breaking it into a separate file, at least for me, makes it easier to have this start gate kind of checks in a separate file as it makes it easier to read and also write UTs.

Comment thread pkg/media/input.go Outdated

type OutputReadyFunc func(pad *gst.Pad, kind types.StreamKind)

type padTimingState struct {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to move this to a new file so that all the gate code lives there and makes it easier to read?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Contributor

@biglittlebigben biglittlebigben left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with a caveat: I believe this will not converge when streaming VOD files or HLS streams over HTTP. Even for live HLS, this may take a while to converge since we'll always buffer at least 1 segment worth of data. I think we should disable this logic entirely for HTTP pull sources (but not SRT)

Comment thread pkg/media/input.go Outdated
type padTimingState struct {
lastBufferWallClockTime time.Time
lastBufferPTS time.Duration
lastBufferDuration time.Duration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is lastBufferDuration used anywhere?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no - it's leftover from one of my previous iterations - dam I need these linters added here as well 🙈

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@milos-lk
Copy link
Copy Markdown
Contributor Author

LGTM with a caveat: I believe this will not converge when streaming VOD files or HLS streams over HTTP. Even for live HLS, this may take a while to converge since we'll always buffer at least 1 segment worth of data. I think we should disable this logic entirely for HTTP pull sources (but not SRT)

Thanks for calling that out - I added the filtering you suggested (please double check it).

I will run some sanity tests with all these inputs to make sure it works as expected

Copy link
Copy Markdown
Contributor

@biglittlebigben biglittlebigben left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Maybe just add a comment explaining why the gate is not enabled with http.

@milos-lk
Copy link
Copy Markdown
Contributor Author

After thinking about whip case more - I don't think this gate is a good option for it.
It will work nice for RTMP where we have data produced by the same muxer, fed into the same buffer (I assume same will be the case for SRT - still need to test it though)
However for whip we have independent RTP streams coming to the gst pipeline from 2 independent preroll buffers and app sources - we can't really rely on unique offset approach for this case.
For whip - it would be better option to have something similar to what we have done in egress - exposing pipeline running time and feeding it back to syncrhonizer so that it can make decisions on when to stop dropping RTP packets and request PLI - I will think more about that case - but would disable whip for the input gate approach now.

@biglittlebigben
Copy link
Copy Markdown
Contributor

After thinking about whip case more - I don't think this gate is a good option for it. It will work nice for RTMP where we have data produced by the same muxer, fed into the same buffer (I assume same will be the case for SRT - still need to test it though) However for whip we have independent RTP streams coming to the gst pipeline from 2 independent preroll buffers and app sources - we can't really rely on unique offset approach for this case. For whip - it would be better option to have something similar to what we have done in egress - exposing pipeline running time and feeding it back to syncrhonizer so that it can make decisions on when to stop dropping RTP packets and request PLI - I will think more about that case - but would disable whip for the input gate approach now.

Transcoded WHIP is a fairly small use case today. It would be nice to improve it as well but focusing on RTMP and SRT makes sense.

@milos-lk
Copy link
Copy Markdown
Contributor Author

just finished testing SRT - works as expected (same as for RTMP) - merging the PR now.

@milos-lk milos-lk merged commit 79abf5b into main Oct 27, 2025
6 checks passed
@milos-lk milos-lk deleted the trim-probes branch October 27, 2025 15:27
@biglittlebigben biglittlebigben mentioned this pull request Apr 22, 2026
biglittlebigben added a commit that referenced this pull request Apr 22, 2026
 # Changelog

 ## Added

- Allow passing `projectID` for URL requests (#429)
- Introduce linter configuration shared with egress, fixing a wide range of issues including unused params, deprecated APIs, unexported return types, typos, a `TrimLeft` bug, and a mutex-copy bug in a proto message (#425)
- Add segment start to adjusted buffer PTS; simplify event probe (#398)
- Log segment events to help diagnose non-~0s segments that could push buffer PTSs outside the segment range (#397)
- Observe buffer processing latency and expose it as a Prometheus metric (#392)
- Gate excess pre-roll buffered media to reduce end-to-end latency (#386)
- Ingress metrics (#383)
- Pass WHIP through to the SFU directly, with a config option to choose between SFU and native paths; includes WHIP HTTP header handling and RTC closing notify support (#372)
- `IngressID` and `ResourceID` attributes on ingress participants (#344)
- Custom `HandlerLogger` implementation (#339)
- Media watchdog at the tail of the pipeline (input of the Go SDK) to close the ingress if no media is received on any track for 1 minute — needed for cases like SRT where GStreamer may retry connecting forever without emitting any failure event (#334)
- Backpressure-aware synchronizer: monitor a queue between the appsink and the Go SDK; if it grows past 2 buffers, reduce the synchronizer wait time in 10ms steps until the queue shortens. Addresses a deadlock case in SRT where the reference/wait time was only ever increased, causing the input buffer to fill to its max. Disabled for HLS, where back pressure at the output is expected. (#337)
- Announce out-of-network splice events as participant attributes (derived from SCTE-35 tables in MPEG-TS streams, e.g. SRT). Implemented by monitoring SCTE-35-related GStreamer events pushed down the pipeline — message-bus-based events lack pipeline-timebase timing and the MPEG-TS demuxer doesn't expose the needed info to regenerate timestamps. Relies on a forked `go-gst` for now. (#326)
- Reject ingress if the `Enabled` flag is false (#319)

 ## Changed

- Log caps existence when connecting to caps notifier (#431)
- Replace deprecated `io/ioutil.ReadFile` with `os.ReadFile` for config loading (#424)
- Refactor ingress handler RPC server: remove PSRPC support from the ingress handler and move the full PSRPC server implementation into the server process; introduce a `StateNotifier` family of objects for injecting state-update behavior; add an (initially empty) `ProjectID` field to `StateNotifier` calls (#413)
- Only report packet loss if `trackStats` is set (#401)
- Update GStreamer to 1.26.7 (#396)
- Update Go to 1.25 (#388)
- Use `FeatureFlags` from `GetIngressInfoResponse` or `StartIngressRequest`; rename the `SFUTranscodingBypassedWHIP` config option to `WHIPProxyEnabled` (#382)
- Fix the format of the logging field in the sample config (#377)
- Delay deregistering the WHIP RPC handler to avoid SFU notify warnings (#373)
- Switch to the `livekit/gst-go` fork of `go-gst` (#367)
- Update CLI to `urfave/cli/v3` (#364)
- Set logging parameters on Pion; ignore Pion ICE candidate warning (#348)
- Throttle "too slow" logs (#340)
- Disable output queue-length monitoring for RTMP and WHIP (#338)
- Disable max-buffer limit on URL input queue; log SRT stats every minute (#335)
- Initialize URL ingress state with `BUFFERING` when created directly by the Ingress server (#333)
- Use logger utilities in ingress (#332)
- Remove `actions/cache` from `workflows/build.yaml` — the cloud-ingress build uses Docker, so caching Go modules from the host is pointless (#325)

 ## Fixed

- Fix `int` cast flagged by Copilot (#416)
- Ensure the logger is initialized before the output registers for EOS, preventing a race that could cause a panic if EOS arrived during creation (#415)
- Ignore all errors from `writeSample` once the output is already closed — not just EOF — so hitting the shutdown timeout no longer sets `pipelineError` and flips the pipeline to a failed state (#412)
- Do not treat `io.EOF` as a pipeline error during shutdown; `handleSample()` keeps returning `FlowOK` while samples are dropped so GStreamer can drain its queues without interpreting early `FlowEOS` as a mid-stream failure (#411)
- Make sure EOS reaches sinks: signal "EOS seen on source" out-of-band so the output can decide to wait briefly for remaining data (or cancel), preventing the pipeline from freezing when the appsink thread is stuck on a blocking push (#408)
- Safer fallback logic for latency reduction: if A/V arrival rates don't stabilize, skip applying offsets entirely instead of taking the current max of calculated offsets (#406)
- Initialize logger before the handler starts (#399)
- Fix superfluous `response.WriteHeader` call: only send a status code if data hasn't already been written (an implicit 200) (#395)
- Make sure an ingress session is terminated when sending on the app source doesn't cause the pipeline to emit an EOS event (#379)
- Propagate sink errors when the input error is (generic, consequential) `context.Canceled` (#381)
- Fix "disonnected" → "disconnected" typo in error messages (#360)
- Fix flaky/broken tests (#378, #374)

 ## Security

- Bump `golang.org/x/image` to v0.38.0 (#426)
- Bump `golang.org/x/net` from 0.35.0 to 0.38.0 (#358)
- Bump `golang.org/x/crypto` from 0.32.0 to 0.35.0 (#355)
- Bump `golang.org/x/net` from 0.31.0 to 0.33.0 (#321)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants