Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended type for RTP timestamp. #2001

Merged
merged 2 commits into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/rtc/wrappedreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 {
return 0
}

func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer)
}
return 0, errors.New("receiver not available")
}
20 changes: 11 additions & 9 deletions pkg/sfu/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type ExtPacket struct {
VideoLayer
Arrival time.Time
ExtSequenceNumber uint32
ExtTimestamp uint64
Packet *rtp.Packet
Payload interface{}
KeyFrame bool
Expand Down Expand Up @@ -414,21 +415,21 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
return
}

extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime)
flowState := b.updateStreamState(&rtpPacket, arrivalTime)
b.processHeaderExtensions(&rtpPacket, arrivalTime)
if !isOutOfOrder && len(rtpPacket.Payload) == 0 {
if !flowState.IsOutOfOrder && len(rtpPacket.Payload) == 0 {
// drop padding only in-order packet
b.snRangeMap.IncValue(1)
return
}

// add to RTX buffer using sequence number after accounting for dropped padding only packets
snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber)
snAdjustment, err := b.snRangeMap.GetValue(flowState.ExtSequenceNumber)
if err != nil {
b.logger.Errorw("could not get sequence number adjustment", err)
return
}
rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment)
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment)
_, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber)
if err != nil {
if err != bucket.ErrRTXPacket {
Expand All @@ -441,7 +442,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {

b.doReports(arrivalTime)

ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime)
ep := b.getExtPacket(&rtpPacket, arrivalTime, flowState)
if ep == nil {
return
}
Expand Down Expand Up @@ -499,7 +500,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) {
}
}

func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) {
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState {
flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime)

if b.nacker != nil {
Expand All @@ -513,7 +514,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32
}
}

return flowState.ExtSeqNumber, flowState.IsOutOfOrder
return flowState
}

func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
Expand Down Expand Up @@ -546,10 +547,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
}
}

func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket {
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket {
ep := &ExtPacket{
Arrival: arrivalTime,
ExtSequenceNumber: extSeqNumber,
ExtSequenceNumber: flowState.ExtSequenceNumber,
ExtTimestamp: flowState.ExtTimestamp,
Packet: rtpPacket,
VideoLayer: VideoLayer{
Spatial: InvalidLayerSpatial,
Expand Down
57 changes: 29 additions & 28 deletions pkg/sfu/buffer/rtpstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type RTPFlowState struct {

IsOutOfOrder bool

ExtSeqNumber uint32
ExtSequenceNumber uint32
ExtTimestamp uint64
}

type IntervalStats struct {
Expand Down Expand Up @@ -152,8 +153,7 @@ type RTPStats struct {

lock sync.RWMutex

initialized bool
resyncOnNextPacket bool
initialized bool

startTime time.Time
endTime time.Time
Expand Down Expand Up @@ -245,7 +245,6 @@ func (r *RTPStats) Seed(from *RTPStats) {
}

r.initialized = from.initialized
r.resyncOnNextPacket = from.resyncOnNextPacket

r.startTime = from.startTime
// do not clone endTime as a non-zero endTime indicates an ended object
Expand Down Expand Up @@ -375,16 +374,6 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}

if r.resyncOnNextPacket {
r.resyncOnNextPacket = false

if r.initialized {
r.sequenceNumber.ResetHighest(rtph.SequenceNumber - 1)
r.timestamp.ResetHighest(rtph.Timestamp)
r.highestTime = packetTime
}
}

var resSN utils.WrapAroundUpdateResult[uint32]
var resTS utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
Expand Down Expand Up @@ -417,8 +406,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
"rtp stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.sequenceNumber.GetExtendedHighest(),
"startTS", r.timestamp.GetExtendedHighest(),
"startSN", r.sequenceNumber.GetExtendedStart(),
"startTS", r.timestamp.GetExtendedStart(),
)
} else {
resSN = r.sequenceNumber.Update(rtph.SequenceNumber)
Expand Down Expand Up @@ -483,7 +472,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}

flowState.IsOutOfOrder = true
flowState.ExtSeqNumber = resSN.ExtendedVal
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
// update gap histogram
r.updateGapHistogram(int(gapSN))
Expand All @@ -505,7 +495,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSeqNumber = resSN.ExtendedVal
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
}

if !isDuplicate {
Expand All @@ -527,11 +518,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}

func (r *RTPStats) ResyncOnNextPacket() {
func (r *RTPStats) Resync(esn uint32, ets uint64, at time.Time) {
r.lock.Lock()
defer r.lock.Unlock()

r.resyncOnNextPacket = true
if !r.initialized {
return
}
r.sequenceNumber.ResetHighest(esn - 1)
r.timestamp.ResetHighest(ets)
r.highestTime = at
}

func (r *RTPStats) getPacketsExpected() uint32 {
Expand Down Expand Up @@ -788,11 +784,11 @@ func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) {
defer r.lock.Unlock()

if srData != nil {
r.maybeAdjustFirstPacketTime(srData.RTPTimestamp)
r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt)
}
}

func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
func (r *RTPStats) maybeAdjustFirstPacketTime(ets uint64) {
if time.Since(r.startTime) > firstPacketTimeAdjustWindow {
return
}
Expand All @@ -803,7 +799,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
samplesDiff := int32(ts - uint32(r.timestamp.GetExtendedStart()))
samplesDiff := int64(ets - r.timestamp.GetExtendedStart())
if samplesDiff < 0 {
// out-of-order, skip
return
Expand All @@ -819,7 +815,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extNowTS", ets,
"extStartTS", r.timestamp.GetExtendedStart(),
)
if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold {
Expand All @@ -829,7 +825,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extNowTS", ets,
"extStartTS", r.timestamp.GetExtendedStart(),
)
} else {
Expand Down Expand Up @@ -864,12 +860,17 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
cycles += (1 << 32)
}

ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate))
goArounds := rtpDiff / (1 << 32)
cycles += goArounds * (1 << 32)
}

srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles

r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp)
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)

// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
Expand Down Expand Up @@ -1018,13 +1019,13 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)

// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
var rtpDiffSinceLast uint64
var departureDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp
rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt
departureDiffSinceLast = now.Sub(r.srNewest.At)

expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
Expand Down
Loading