Skip to content

Commit

Permalink
Extended type for RTP timestamp. (#2001)
Browse files Browse the repository at this point in the history
  • Loading branch information
boks1971 committed Aug 27, 2023
1 parent 55d5edc commit 3b30f49
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 157 deletions.
4 changes: 2 additions & 2 deletions pkg/rtc/wrappedreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 {
return 0
}

func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer)
}
return 0, errors.New("receiver not available")
}
20 changes: 11 additions & 9 deletions pkg/sfu/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type ExtPacket struct {
VideoLayer
Arrival time.Time
ExtSequenceNumber uint32
ExtTimestamp uint64
Packet *rtp.Packet
Payload interface{}
KeyFrame bool
Expand Down Expand Up @@ -414,21 +415,21 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
return
}

extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime)
flowState := b.updateStreamState(&rtpPacket, arrivalTime)
b.processHeaderExtensions(&rtpPacket, arrivalTime)
if !isOutOfOrder && len(rtpPacket.Payload) == 0 {
if !flowState.IsOutOfOrder && len(rtpPacket.Payload) == 0 {
// drop padding only in-order packet
b.snRangeMap.IncValue(1)
return
}

// add to RTX buffer using sequence number after accounting for dropped padding only packets
snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber)
snAdjustment, err := b.snRangeMap.GetValue(flowState.ExtSequenceNumber)
if err != nil {
b.logger.Errorw("could not get sequence number adjustment", err)
return
}
rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment)
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment)
_, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber)
if err != nil {
if err != bucket.ErrRTXPacket {
Expand All @@ -441,7 +442,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {

b.doReports(arrivalTime)

ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime)
ep := b.getExtPacket(&rtpPacket, arrivalTime, flowState)
if ep == nil {
return
}
Expand Down Expand Up @@ -499,7 +500,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) {
}
}

func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) {
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState {
flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime)

if b.nacker != nil {
Expand All @@ -513,7 +514,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32
}
}

return flowState.ExtSeqNumber, flowState.IsOutOfOrder
return flowState
}

func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
Expand Down Expand Up @@ -546,10 +547,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
}
}

func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket {
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket {
ep := &ExtPacket{
Arrival: arrivalTime,
ExtSequenceNumber: extSeqNumber,
ExtSequenceNumber: flowState.ExtSequenceNumber,
ExtTimestamp: flowState.ExtTimestamp,
Packet: rtpPacket,
VideoLayer: VideoLayer{
Spatial: InvalidLayerSpatial,
Expand Down
57 changes: 29 additions & 28 deletions pkg/sfu/buffer/rtpstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type RTPFlowState struct {

IsOutOfOrder bool

ExtSeqNumber uint32
ExtSequenceNumber uint32
ExtTimestamp uint64
}

type IntervalStats struct {
Expand Down Expand Up @@ -152,8 +153,7 @@ type RTPStats struct {

lock sync.RWMutex

initialized bool
resyncOnNextPacket bool
initialized bool

startTime time.Time
endTime time.Time
Expand Down Expand Up @@ -245,7 +245,6 @@ func (r *RTPStats) Seed(from *RTPStats) {
}

r.initialized = from.initialized
r.resyncOnNextPacket = from.resyncOnNextPacket

r.startTime = from.startTime
// do not clone endTime as a non-zero endTime indicates an ended object
Expand Down Expand Up @@ -375,16 +374,6 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}

if r.resyncOnNextPacket {
r.resyncOnNextPacket = false

if r.initialized {
r.sequenceNumber.ResetHighest(rtph.SequenceNumber - 1)
r.timestamp.ResetHighest(rtph.Timestamp)
r.highestTime = packetTime
}
}

var resSN utils.WrapAroundUpdateResult[uint32]
var resTS utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
Expand Down Expand Up @@ -417,8 +406,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
"rtp stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.sequenceNumber.GetExtendedHighest(),
"startTS", r.timestamp.GetExtendedHighest(),
"startSN", r.sequenceNumber.GetExtendedStart(),
"startTS", r.timestamp.GetExtendedStart(),
)
} else {
resSN = r.sequenceNumber.Update(rtph.SequenceNumber)
Expand Down Expand Up @@ -483,7 +472,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}

flowState.IsOutOfOrder = true
flowState.ExtSeqNumber = resSN.ExtendedVal
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
// update gap histogram
r.updateGapHistogram(int(gapSN))
Expand All @@ -505,7 +495,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSeqNumber = resSN.ExtendedVal
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
}

if !isDuplicate {
Expand All @@ -527,11 +518,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}

func (r *RTPStats) ResyncOnNextPacket() {
func (r *RTPStats) Resync(esn uint32, ets uint64, at time.Time) {
r.lock.Lock()
defer r.lock.Unlock()

r.resyncOnNextPacket = true
if !r.initialized {
return
}
r.sequenceNumber.ResetHighest(esn - 1)
r.timestamp.ResetHighest(ets)
r.highestTime = at
}

func (r *RTPStats) getPacketsExpected() uint32 {
Expand Down Expand Up @@ -788,11 +784,11 @@ func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) {
defer r.lock.Unlock()

if srData != nil {
r.maybeAdjustFirstPacketTime(srData.RTPTimestamp)
r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt)
}
}

func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
func (r *RTPStats) maybeAdjustFirstPacketTime(ets uint64) {
if time.Since(r.startTime) > firstPacketTimeAdjustWindow {
return
}
Expand All @@ -803,7 +799,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
samplesDiff := int32(ts - uint32(r.timestamp.GetExtendedStart()))
samplesDiff := int64(ets - r.timestamp.GetExtendedStart())
if samplesDiff < 0 {
// out-of-order, skip
return
Expand All @@ -819,7 +815,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extNowTS", ets,
"extStartTS", r.timestamp.GetExtendedStart(),
)
if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold {
Expand All @@ -829,7 +825,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extNowTS", ets,
"extStartTS", r.timestamp.GetExtendedStart(),
)
} else {
Expand Down Expand Up @@ -864,12 +860,17 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
cycles += (1 << 32)
}

ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate))
goArounds := rtpDiff / (1 << 32)
cycles += goArounds * (1 << 32)
}

srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles

r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp)
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)

// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
Expand Down Expand Up @@ -1018,13 +1019,13 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)

// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
var rtpDiffSinceLast uint64
var departureDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp
rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt
departureDiffSinceLast = now.Sub(r.srNewest.At)

expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
Expand Down
Loading

0 comments on commit 3b30f49

Please sign in to comment.