Skip to content

Commit

Permalink
Use receiver report stats for loss/rtt/jitter. (#1781)
Browse files Browse the repository at this point in the history
* Use receiver report stats for loss/rtt/jitter.

Reversing a bit of #1664.
That PR did two snapshots (one based on what SFU is sending
and one based on combination of what SFU is sending reconciled with
stats reported from client via RTCP Receiver Report). That PR
reported SFU only view to analytics. But, that view does not have
information about loss seen by client in the downstream.
Also, that does not have RTT/jitter information. The rationale behind
using SFU only view is that SFU should report what it sends irrespective
of client is receiving or not. But, that view did not have proper
loss/RTT/jitter.

So, switch back to reporting SFU + receiver report reconciled view.
The down side is that when receiver reports are not receiver,
packets sent/bytes sent will not be reported to analytics.

An option is to report SFU only view if there are no receiver reports.
But, it becomes complex because of the offset. Receiver report would
acknowledge certain range whereas SFU only view could be different
because of propagation delay. To simplify, just using the reconciled
view to report to analytics. Using the available view will require
a bunch more work to produce accurate data.
(NOTE: all this started due to a bug where RTCP was not restarted on
a track resume which killed receiver reports and we went on this path
to distinguish between publisher stopping vs RTCP receiver report not
happening)

One optimisation to here here concerns the check to see if publisher is sending data.
Using a full DeltaInfo for that is an overkill. Can do a lighter weight
for that later.

* return available streams

* fix test
  • Loading branch information
boks1971 committed Jun 9, 2023
1 parent f518f5d commit 72ed5b1
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 58 deletions.
52 changes: 29 additions & 23 deletions pkg/sfu/connectionquality/connectionstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (

const (
UpdateInterval = 5 * time.Second
processThreshold = 0.95
noStatsTooLongMultiplier = 2
noReceiverReportTooLongThreshold = 30 * time.Second
)

Expand Down Expand Up @@ -120,9 +118,9 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at
return mos
}

func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 {
func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
if cs.params.GetDeltaStatsOverridden == nil || cs.params.GetLastReceiverReportTime == nil {
return MinMOS
return MinMOS, nil
}

cs.lock.RLock()
Expand All @@ -131,7 +129,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 {
if streamingStartedAt.IsZero() {
// not streaming, just return current score
mos, _ := cs.scorer.GetMOSAndQuality()
return mos
return mos, nil
}

streams := cs.params.GetDeltaStatsOverridden()
Expand All @@ -143,12 +141,12 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 {
}
if time.Since(marker) > noReceiverReportTooLongThreshold {
// have not received receiver report for a long time when streaming, run with nil stat
return cs.updateScoreWithAggregate(nil, at)
return cs.updateScoreWithAggregate(nil, at), nil
}

// wait for receiver report, return current score
mos, _ := cs.scorer.GetMOSAndQuality()
return mos
return mos, nil
}

// delta stat duration could be large due to not receiving receiver report for a long time (for example, due to mute),
Expand All @@ -157,17 +155,27 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 {
if streamingStartedAt.After(cs.params.GetLastReceiverReportTime()) {
// last receiver report was before streaming started, wait for next one
mos, _ := cs.scorer.GetMOSAndQuality()
return mos
return mos, streams
}

if streamingStartedAt.After(agg.StartTime) {
agg.Duration = agg.StartTime.Add(agg.Duration).Sub(streamingStartedAt)
agg.StartTime = streamingStartedAt
}
return cs.updateScoreWithAggregate(agg, at)
return cs.updateScoreWithAggregate(agg, at), streams
}

func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWithLayers, at time.Time) float32 {
func (cs *ConnectionStats) updateScore(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
if cs.params.GetDeltaStats == nil {
return MinMOS, nil
}

streams := cs.params.GetDeltaStats()
if len(streams) == 0 {
mos, _ := cs.scorer.GetMOSAndQuality()
return mos, nil
}

deltaInfoList := make([]*buffer.RTPDeltaInfo, 0, len(streams))
for _, s := range streams {
deltaInfoList = append(deltaInfoList, s.RTPStats)
Expand All @@ -185,7 +193,7 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit
return cs.updateScoreFromReceiverReport(at)
}

return cs.updateScoreWithAggregate(agg, at)
return cs.updateScoreWithAggregate(agg, at), streams
}

func (cs *ConnectionStats) maybeSetStreamingStart(at time.Time) {
Expand All @@ -203,18 +211,9 @@ func (cs *ConnectionStats) clearStreamingStart() {
}

func (cs *ConnectionStats) getStat(at time.Time) {
if cs.params.GetDeltaStats == nil {
return
}
score, streams := cs.updateScore(at)

streams := cs.params.GetDeltaStats()
if len(streams) == 0 {
return
}

score := cs.updateScore(streams, at)

if cs.onStatsUpdate != nil {
if cs.onStatsUpdate != nil && len(streams) != 0 {
analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams))
for ssrc, stream := range streams {
as := toAnalyticsStream(ssrc, stream.RTPStats)
Expand Down Expand Up @@ -317,6 +316,13 @@ func toAggregateDeltaInfo(streams map[uint32]*buffer.StreamStatsWithLayers) *buf
}

func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.AnalyticsStream {
// discount the feed side loss when reporting forwarded track stats
packetsLost := deltaStats.PacketsLost
if deltaStats.PacketsMissing > packetsLost {
packetsLost = 0
} else {
packetsLost -= deltaStats.PacketsMissing
}
return &livekit.AnalyticsStream{
Ssrc: ssrc,
PrimaryPackets: deltaStats.Packets,
Expand All @@ -325,7 +331,7 @@ func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.An
RetransmitBytes: deltaStats.BytesDuplicate,
PaddingPackets: deltaStats.PacketsPadding,
PaddingBytes: deltaStats.BytesPadding,
PacketsLost: deltaStats.PacketsLost,
PacketsLost: packetsLost,
Frames: deltaStats.Frames,
Rtt: deltaStats.RttMax,
Jitter: uint32(deltaStats.JitterMax),
Expand Down
Loading

0 comments on commit 72ed5b1

Please sign in to comment.