Skip to content

Commit

Permalink
Filter packets and reports by SSRC
Browse files Browse the repository at this point in the history
Otherwise we would record packets and reports which do not belong to the
stream that is monitored by this recorder.
  • Loading branch information
mengelbart authored and Sean-Der committed Apr 26, 2023
1 parent 71346d6 commit 3f299e1
Show file tree
Hide file tree
Showing 3 changed files with 542 additions and 24 deletions.
2 changes: 1 addition & 1 deletion AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ David Zhao <david@davidzhao.com>
Jonathan Müller <jonathan@fotokite.com>
Kevin Caffrey <kcaffrey@gmail.com>
Maksim Nesterov <msnesterov@avito.ru>
Mathis <mathis.engelbart@gmail.com>
Mathis Engelbart <mathis.engelbart@gmail.com>
Quentin Renard <contact@asticode.com>
Rayleigh Li <rayleigh.li@zoom.us>
Sean <sean@siobud.com>
Sean DuBois <sean@siobud.com>
Steffen Vogel <post@steffenvogel.de>
XLPolar <guangjin_pan@163.com>
Expand Down
80 changes: 58 additions & 22 deletions pkg/stats/stats_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func (r *recorder) GetStats() Stats {
}

func (r *recorder) recordIncomingRTP(latestStats internalStats, v *incomingRTP) internalStats {
if v.header.SSRC != r.ssrc {
return latestStats
}
sequenceNumber := latestStats.inboundSequencerNumber.Unwrap(v.header.SequenceNumber)
if !latestStats.inboundSequenceNumberInitialized {
latestStats.inboundFirstSequenceNumber = sequenceNumber
Expand Down Expand Up @@ -149,14 +152,33 @@ func (r *recorder) recordIncomingRTP(latestStats internalStats, v *incomingRTP)

func (r *recorder) recordOutgoingRTCP(latestStats internalStats, v *outgoingRTCP) internalStats {
for _, pkt := range v.pkts {
// The SSRC check is performed for most of the cases but not all. The
// reason is that ReceiverReferenceTimeReportBlocks don't have
// destination SSRCs but must still be recorded.
switch rtcpPkt := pkt.(type) {
case *rtcp.FullIntraRequest:
if !contains(pkt.DestinationSSRC(), r.ssrc) {
r.logger.Debugf("skipping outgoing RTCP pkt: %v\n", pkt)
continue
}
latestStats.InboundRTPStreamStats.FIRCount++
case *rtcp.PictureLossIndication:
if !contains(pkt.DestinationSSRC(), r.ssrc) {
r.logger.Debugf("skipping outgoing RTCP pkt: %v\n", pkt)
continue
}
latestStats.InboundRTPStreamStats.PLICount++
case *rtcp.TransportLayerNack:
if !contains(pkt.DestinationSSRC(), r.ssrc) {
r.logger.Debugf("skipping outgoing RTCP pkt: %v\n", pkt)
continue
}
latestStats.InboundRTPStreamStats.NACKCount++
case *rtcp.SenderReport:
if !contains(pkt.DestinationSSRC(), r.ssrc) {
r.logger.Debugf("skipping outgoing RTCP pkt: %v\n", pkt)
continue
}
latestStats.lastSenderReports = append(latestStats.lastSenderReports, rtcpPkt.NTPTime)
if len(latestStats.lastSenderReports) > r.maxLastSenderReports {
latestStats.lastSenderReports = latestStats.lastSenderReports[len(latestStats.lastSenderReports)-r.maxLastSenderReports:]
Expand All @@ -176,6 +198,9 @@ func (r *recorder) recordOutgoingRTCP(latestStats internalStats, v *outgoingRTCP
}

func (r *recorder) recordOutgoingRTP(latestStats internalStats, v *outgoingRTP) internalStats {
if v.header.SSRC != r.ssrc {
return latestStats
}
headerSize := v.header.MarshalSize()
latestStats.OutboundRTPStreamStats.PacketsSent++
latestStats.OutboundRTPStreamStats.BytesSent += uint64(headerSize + v.payloadLen)
Expand All @@ -189,30 +214,28 @@ func (r *recorder) recordOutgoingRTP(latestStats internalStats, v *outgoingRTP)

func (r *recorder) recordIncomingRR(latestStats internalStats, pkt *rtcp.ReceiverReport, ts time.Time) internalStats {
for _, report := range pkt.Reports {
if report.SSRC == r.ssrc {
if latestStats.remoteInboundFirstSequenceNumberInitialized {
cycles := uint64(report.LastSequenceNumber & 0xFFFF0000)
nr := uint64(report.LastSequenceNumber & 0x0000FFFF)
highest := cycles*0xFFFF + nr
latestStats.RemoteInboundRTPStreamStats.PacketsReceived = highest - uint64(report.TotalLost) - uint64(latestStats.remoteInboundFirstSequenceNumber) + 1
}
latestStats.RemoteInboundRTPStreamStats.PacketsLost = int64(report.TotalLost)
latestStats.RemoteInboundRTPStreamStats.Jitter = float64(report.Jitter) / r.clockRate

if report.Delay != 0 && report.LastSenderReport != 0 {
for i := min(r.maxLastSenderReports, len(latestStats.lastSenderReports)) - 1; i >= 0; i-- {
lastReport := latestStats.lastSenderReports[i]
if (lastReport&0x0000FFFFFFFF0000)>>16 == uint64(report.LastSenderReport) {
dlsr := time.Duration(float64(report.Delay) / 65536.0 * float64(time.Second))
latestStats.RemoteInboundRTPStreamStats.RoundTripTime = (ts.Add(-dlsr)).Sub(ntp.ToTime(lastReport))
latestStats.RemoteInboundRTPStreamStats.TotalRoundTripTime += latestStats.RemoteInboundRTPStreamStats.RoundTripTime
latestStats.RemoteInboundRTPStreamStats.RoundTripTimeMeasurements++
break
}
if latestStats.remoteInboundFirstSequenceNumberInitialized {
cycles := uint64(report.LastSequenceNumber & 0xFFFF0000)
nr := uint64(report.LastSequenceNumber & 0x0000FFFF)
highest := cycles*0xFFFF + nr
latestStats.RemoteInboundRTPStreamStats.PacketsReceived = highest - uint64(report.TotalLost) - uint64(latestStats.remoteInboundFirstSequenceNumber) + 1
}
latestStats.RemoteInboundRTPStreamStats.PacketsLost = int64(report.TotalLost)
latestStats.RemoteInboundRTPStreamStats.Jitter = float64(report.Jitter) / r.clockRate

if report.Delay != 0 && report.LastSenderReport != 0 {
for i := min(r.maxLastSenderReports, len(latestStats.lastSenderReports)) - 1; i >= 0; i-- {
lastReport := latestStats.lastSenderReports[i]
if (lastReport&0x0000FFFFFFFF0000)>>16 == uint64(report.LastSenderReport) {
dlsr := time.Duration(float64(report.Delay) / 65536.0 * float64(time.Second))
latestStats.RemoteInboundRTPStreamStats.RoundTripTime = (ts.Add(-dlsr)).Sub(ntp.ToTime(lastReport))
latestStats.RemoteInboundRTPStreamStats.TotalRoundTripTime += latestStats.RemoteInboundRTPStreamStats.RoundTripTime
latestStats.RemoteInboundRTPStreamStats.RoundTripTimeMeasurements++
break
}
}
latestStats.FractionLost = float64(report.FractionLost) / 256.0
}
latestStats.FractionLost = float64(report.FractionLost) / 256.0
}
return latestStats
}
Expand All @@ -238,8 +261,21 @@ func (r *recorder) recordIncomingXR(latestStats internalStats, pkt *rtcp.Extende
return latestStats
}

func contains(ls []uint32, e uint32) bool {
for _, x := range ls {
if x == e {
return true
}
}
return false
}

func (r *recorder) recordIncomingRTCP(latestStats internalStats, v *incomingRTCP) internalStats {
for _, pkt := range v.pkts {
if !contains(pkt.DestinationSSRC(), r.ssrc) {
r.logger.Debugf("skipping incoming RTCP pkt: %v\n", pkt)
continue
}
switch pkt := pkt.(type) {
case *rtcp.TransportLayerNack:
latestStats.OutboundRTPStreamStats.NACKCount++
Expand All @@ -248,7 +284,7 @@ func (r *recorder) recordIncomingRTCP(latestStats internalStats, v *incomingRTCP
case *rtcp.PictureLossIndication:
latestStats.OutboundRTPStreamStats.PLICount++
case *rtcp.ReceiverReport:
return r.recordIncomingRR(latestStats, pkt, v.ts)
latestStats = r.recordIncomingRR(latestStats, pkt, v.ts)
case *rtcp.SenderReport:
latestStats.RemoteOutboundRTPStreamStats.PacketsSent = uint64(pkt.PacketCount)
latestStats.RemoteOutboundRTPStreamStats.BytesSent = uint64(pkt.OctetCount)
Expand Down

0 comments on commit 3f299e1

Please sign in to comment.