Skip to content

Commit

Permalink
Add an API to get basic stats around stream stats.
Browse files Browse the repository at this point in the history
Relates to #610

stats around outboundrtpstreamstats

Description
Reference issue
Fixes #...
  • Loading branch information
obasajujoshua31 committed Sep 24, 2020
1 parent 7d79c76 commit e39e854
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 11 deletions.
10 changes: 10 additions & 0 deletions peerconnection.go
Expand Up @@ -1960,6 +1960,16 @@ func (pc *PeerConnection) GetStats() StatsReport {

pc.api.mediaEngine.collectStats(statsCollector)

for _, transceiver := range pc.GetTransceivers() {
if sender := transceiver.Sender(); sender != nil && sender.Track() != nil {
sender.Track().collectStats(statsCollector)
}

if receiver := transceiver.Sender(); receiver != nil && receiver.Track() != nil {
receiver.Track().collectStats(statsCollector)
}
}

return statsCollector.Ready()
}

Expand Down
15 changes: 15 additions & 0 deletions stats_go.go
Expand Up @@ -92,3 +92,18 @@ func (r StatsReport) GetCodecStats(c *RTPCodec) (CodecStats, bool) {
}
return codecStats, true
}

// Get OutboundRTPStats is a helper method to return the associated outbound stats for a given track
func (r StatsReport) GetOutboundRTPStreamStats(t *Track) (OutboundRTPStreamStats, bool) {
statsID := t.statsID
stats, ok := r[statsID]
if !ok {
return OutboundRTPStreamStats{}, false
}

outboundRTPStreamStats, ok := stats.(OutboundRTPStreamStats)
if !ok {
return OutboundRTPStreamStats{}, false
}
return outboundRTPStreamStats, true
}
21 changes: 21 additions & 0 deletions stats_go_test.go
Expand Up @@ -5,6 +5,7 @@ package webrtc
import (
"encoding/json"
"fmt"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/stretchr/testify/require"
"math/rand"
"sync"
Expand Down Expand Up @@ -157,6 +158,12 @@ func findCandidatePairStats(t *testing.T, report StatsReport) []ICECandidatePair
return result
}

func getOutboundRTPStreamStats(t *testing.T, report StatsReport, track *Track) OutboundRTPStreamStats {
stats, ok := report.GetOutboundRTPStreamStats(track)
assert.True(t, ok)
return stats
}

func signalPairForStats(pcOffer *PeerConnection, pcAnswer *PeerConnection) error {
offerChan := make(chan SessionDescription)
pcOffer.OnICECandidate(func(candidate *ICECandidate) {
Expand Down Expand Up @@ -245,6 +252,10 @@ func TestPeerConnection_GetStats(t *testing.T) {
})

assert.NoError(t, signalPairForStats(offerPC, answerPC))

err = track1.WriteSample(media.Sample{Data: []byte{0x00}, Samples: 1})
assert.NoError(t, err)

waitWithTimeout(t, &dcWait)

answerDC := <-answerDCChan
Expand Down Expand Up @@ -329,6 +340,16 @@ func TestPeerConnection_GetStats(t *testing.T) {
assert.NotEmpty(t, certificateStats)
}

for _, transceiver := range offerPC.GetTransceivers() {
if sender := transceiver.Sender(); sender != nil && sender.Track() != nil {
assert.NotEmpty(t, getOutboundRTPStreamStats(t, reportPCOffer, sender.Track()))
}

if receiver := transceiver.Receiver(); receiver != nil && receiver.Track() != nil {
assert.NotEmpty(t, getOutboundRTPStreamStats(t, reportPCOffer, receiver.Track()))
}
}

assert.NoError(t, offerPC.Close())
assert.NoError(t, answerPC.Close())
}
Expand Down
87 changes: 76 additions & 11 deletions track.go
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"

"github.com/pion/rtp"
"github.com/pion/webrtc/v3/internal/util"
Expand All @@ -20,7 +22,8 @@ const (

// Track represents a single media track
type Track struct {
mu sync.RWMutex
statsID string
mu sync.RWMutex

id string
payloadType uint8
Expand All @@ -32,9 +35,12 @@ type Track struct {

packetizer rtp.Packetizer

receiver *RTPReceiver
activeSenders []*RTPSender
totalSenderCount int // count of all senders (accounts for senders that have not been started yet)
receiver *RTPReceiver
activeSenders []*RTPSender
totalSenderCount int // count of all senders (accounts for senders that have not been started yet)
packetSent atomic.Value
bytesSent atomic.Value
lastPacketSentTimestamp atomic.Value
}

// ID gets the ID of the track
Expand Down Expand Up @@ -143,6 +149,9 @@ func (t *Track) Write(b []byte) (n int, err error) {
return 0, err
}

bytesSent := t.bytesSent.Load().(uint64)
t.bytesSent.Store(bytesSent + uint64(len(b)))

return len(b), nil
}

Expand Down Expand Up @@ -175,12 +184,17 @@ func (t *Track) WriteRTP(p *rtp.Packet) error {
}

writeErrs := []error{}
totalPacketSent := t.packetSent.Load().(uint32)
for _, s := range senders {
if _, err := s.SendRTP(&p.Header, p.Payload); err != nil {
writeErrs = append(writeErrs, err)
}
totalPacketSent += uint32(len(p.Payload))
}

t.packetSent.Store(totalPacketSent)
t.lastPacketSentTimestamp.Store(statsTimestampFrom(time.Now()))

return util.FlattenErrs(writeErrs)
}

Expand All @@ -199,14 +213,24 @@ func NewTrack(payloadType uint8, ssrc uint32, id, label string, codec *RTPCodec)
codec.ClockRate,
)

packetSent := atomic.Value{}
packetSent.Store(uint32(0))

bytesSent := atomic.Value{}
bytesSent.Store(uint64(0))

return &Track{
id: id,
payloadType: payloadType,
kind: codec.Type,
label: label,
ssrc: ssrc,
codec: codec,
packetizer: packetizer,
id: id,
payloadType: payloadType,
kind: codec.Type,
label: label,
ssrc: ssrc,
codec: codec,
packetizer: packetizer,
statsID: fmt.Sprintf("outbound-rtp-stream-%d", time.Now().UnixNano()),
packetSent: packetSent,
bytesSent: bytesSent,
lastPacketSentTimestamp: atomic.Value{},
}, nil
}

Expand All @@ -224,3 +248,44 @@ func (t *Track) determinePayloadType() error {

return nil
}

func (t *Track) collectStats(report *statsReportCollector) {
report.Collecting()

var lastPacketSentTimestamp StatsTimestamp

lastTimeStamp, ok := t.lastPacketSentTimestamp.Load().(StatsTimestamp)
if ok {
lastPacketSentTimestamp = lastTimeStamp
}

stats := OutboundRTPStreamStats{
Timestamp: statsTimestampFrom(time.Now()),
Type: StatsTypeStream,
ID: t.statsID,
SSRC: t.SSRC(),
Kind: t.Kind().String(),
TransportID: "", // Todo ...
CodecID: t.codec.statsID,
FIRCount: 0, // Todo ...
PLICount: 0, // Todo ...
NACKCount: 0, // Todo ...
SLICount: 0, // Todo ...
QPSum: 0, // Todo ...
PacketsSent: t.packetSent.Load().(uint32),
PacketsDiscardedOnSend: 0, // Todo ...
FECPacketsSent: 0, // Todo ...
BytesSent: t.bytesSent.Load().(uint64),
BytesDiscardedOnSend: 0, // Todo ...
TrackID: t.ID(),
SenderID: "", // Todo ...
RemoteID: "", // Todo ...
LastPacketSentTimestamp: lastPacketSentTimestamp,
TargetBitrate: 0, // Todo ...
FramesEncoded: 0, // Todo ...
TotalEncodeTime: 0, // Todo ...
AverageRTCPInterval: 0, // Todo ...
}

report.Collect(stats.ID, stats)
}

0 comments on commit e39e854

Please sign in to comment.