From 2c24cdd358b03b9131111a9d2ae4283aef1da717 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Mon, 17 Nov 2025 20:45:32 +0100 Subject: [PATCH 1/3] Add segment start to adjusted buffer PTS --- pkg/media/input.go | 2 +- pkg/media/input_gate.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/media/input.go b/pkg/media/input.go index 1de97059..43f62690 100644 --- a/pkg/media/input.go +++ b/pkg/media/input.go @@ -241,7 +241,7 @@ func (i *Input) onPadAdded(_ *gst.Element, pad *gst.Pad) { state := timingState i.registerGatePad(padName, state) i.addGateProbe(pad, padName, state) - i.addSegmentEventProbe(pad, padName) + i.addSegmentEventProbe(pad, padName, state) } // Gather bitrate stats & attach latency meta from the pipeline diff --git a/pkg/media/input_gate.go b/pkg/media/input_gate.go index 7864440e..4d5ed7ab 100644 --- a/pkg/media/input_gate.go +++ b/pkg/media/input_gate.go @@ -33,6 +33,8 @@ type padTimingState struct { windowStreamAccum time.Duration windowElapsedAccum time.Duration steadyWindowCount int + + segmentStart atomic.Duration } func (i *Input) addGateProbe(pad *gst.Pad, padName string, state *padTimingState) { @@ -96,7 +98,7 @@ func (i *Input) addGateProbe(pad *gst.Pad, padName string, state *padTimingState }) } -func (i *Input) addSegmentEventProbe(pad *gst.Pad, padName string) { +func (i *Input) addSegmentEventProbe(pad *gst.Pad, padName string, state *padTimingState) { logSegmentEvent := func(direction string, seg *gst.Segment) { if seg == nil { logger.Debugw("nil segment event received", "pad", padName, "direction", direction) @@ -126,6 +128,8 @@ func (i *Input) addSegmentEventProbe(pad *gst.Pad, padName string) { ) } + state.segmentStart.Store(time.Duration(seg.GetStart())) + logger.Debugw("segment event received", fields...) } @@ -292,6 +296,8 @@ func applyPadOffset(buffer *gst.Buffer, state *padTimingState, pts time.Duration return false } + adj += state.segmentStart.Load() + buffer.SetPresentationTimestamp(gst.ClockTime(adj)) return true } From 266504f364157da151e731f85700e4d6876cc02e Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Mon, 17 Nov 2025 21:39:51 +0100 Subject: [PATCH 2/3] Simplifying event probe --- pkg/media/input_gate.go | 53 ++++++++++++----------------------------- 1 file changed, 15 insertions(+), 38 deletions(-) diff --git a/pkg/media/input_gate.go b/pkg/media/input_gate.go index 4d5ed7ab..743c3dec 100644 --- a/pkg/media/input_gate.go +++ b/pkg/media/input_gate.go @@ -99,51 +99,28 @@ func (i *Input) addGateProbe(pad *gst.Pad, padName string, state *padTimingState } func (i *Input) addSegmentEventProbe(pad *gst.Pad, padName string, state *padTimingState) { - logSegmentEvent := func(direction string, seg *gst.Segment) { - if seg == nil { - logger.Debugw("nil segment event received", "pad", padName, "direction", direction) - return - } - - fields := []interface{}{ - "pad", padName, - "direction", direction, - "format", seg.GetFormat(), - "rate", seg.GetRate(), - "appliedRate", seg.GetAppliedRate(), - "base", seg.GetBase(), - "start", seg.GetStart(), - "stop", seg.GetStop(), - "time", seg.GetTime(), - "position", seg.GetPosition(), - } - - if seg.GetFormat() == gst.FormatTime { - fields = append(fields, - "baseDur", time.Duration(seg.GetBase()), - "startDur", time.Duration(seg.GetStart()), - "stopDur", time.Duration(seg.GetStop()), - "timeDur", time.Duration(seg.GetTime()), - "positionDur", time.Duration(seg.GetPosition()), - ) - } - - state.segmentStart.Store(time.Duration(seg.GetStart())) - - logger.Debugw("segment event received", fields...) - } - pad.AddProbe(gst.PadProbeTypeEventDownstream, func(_ *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { event := info.GetEvent() - if event == nil { + if event == nil || event.Type() != gst.EventTypeSegment { return gst.PadProbeOK } - switch event.Type() { - case gst.EventTypeSegment: - logSegmentEvent("downstream", event.ParseSegment()) + seg := event.ParseSegment() + if seg == nil { + logger.Debugw("segment event missing data", "pad", padName) + return gst.PadProbeOK } + state.segmentStart.Store(time.Duration(seg.GetStart())) + logger.Debugw("segment event received", + "pad", padName, + "format", seg.GetFormat(), + "start", seg.GetStart(), + "startDur", time.Duration(seg.GetStart()), + "time", seg.GetTime(), + "timeDur", time.Duration(seg.GetTime()), + "rate", seg.GetRate(), + ) return gst.PadProbeOK }) } From 76059ab605f060c8df35a729096c2b407a47a534 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Mon, 17 Nov 2025 21:45:20 +0100 Subject: [PATCH 3/3] A small comment on the flow --- pkg/media/input_gate.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/media/input_gate.go b/pkg/media/input_gate.go index 743c3dec..79484c33 100644 --- a/pkg/media/input_gate.go +++ b/pkg/media/input_gate.go @@ -273,6 +273,9 @@ func applyPadOffset(buffer *gst.Buffer, state *padTimingState, pts time.Duration return false } + // by design, every buffer must be preceded by SEGMENT (and SEGMENT is sticky), + // so downstream elements will push STREAM_START → CAPS → SEGMENT before any buffers. + // By the time buffers are processed, the segment start time will be set. adj += state.segmentStart.Load() buffer.SetPresentationTimestamp(gst.ClockTime(adj))