Skip to content

Commit

Permalink
Add Simulcast RTCP Test
Browse files Browse the repository at this point in the history
Resolves #1803
  • Loading branch information
Sean-Der committed Sep 15, 2023
1 parent 9052126 commit 6f8bcd6
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 39 deletions.
135 changes: 98 additions & 37 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"io"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1212,56 +1213,53 @@ func TestPeerConnection_Simulcast(t *testing.T) {
defer report()

rids := []string{"a", "b", "c"}
var ridMapLock sync.RWMutex
ridMap := map[string]int{}

assertRidCorrect := func(t *testing.T) {
ridMapLock.Lock()
defer ridMapLock.Unlock()

for _, rid := range rids {
assert.Equal(t, ridMap[rid], 1)
}
assert.Equal(t, len(ridMap), 3)
}

ridsFullfilled := func() bool {
ridMapLock.Lock()
defer ridMapLock.Unlock()

ridCount := len(ridMap)
return ridCount == 3
}
t.Run("E2E", func(t *testing.T) {
pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)

onTrackHandler := func(trackRemote *TrackRemote, _ *RTPReceiver) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
}
vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
assert.NoError(t, err)

t.Run("RTP Extension Based", func(t *testing.T) {
pcOffer, pcAnswer, err := newPair()
vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
assert.NoError(t, err)

vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("a"))
vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2]))
assert.NoError(t, err)

sender, err := pcOffer.AddTrack(vp8WriterA)
assert.NoError(t, err)
assert.NotNil(t, sender)

vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("b"))
assert.NoError(t, err)
err = sender.AddEncoding(vp8WriterB)
assert.NoError(t, err)
assert.NoError(t, sender.AddEncoding(vp8WriterB))
assert.NoError(t, sender.AddEncoding(vp8WriterC))

vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("c"))
assert.NoError(t, err)
err = sender.AddEncoding(vp8WriterC)
assert.NoError(t, err)
var ridMapLock sync.RWMutex
ridMap := map[string]int{}

assertRidCorrect := func(t *testing.T) {
ridMapLock.Lock()
defer ridMapLock.Unlock()

for _, rid := range rids {
assert.Equal(t, ridMap[rid], 1)
}
assert.Equal(t, len(ridMap), 3)
}

ridsFullfilled := func() bool {
ridMapLock.Lock()
defer ridMapLock.Unlock()

ridCount := len(ridMap)
return ridCount == 3
}

ridMap = map[string]int{}
pcAnswer.OnTrack(onTrackHandler)
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
})

parameters := sender.GetParameters()
assert.Equal(t, "a", parameters.Encodings[0].RID)
Expand Down Expand Up @@ -1304,6 +1302,69 @@ func TestPeerConnection_Simulcast(t *testing.T) {
assertRidCorrect(t)
closePairNow(t, pcOffer, pcAnswer)
})

t.Run("RTCP", func(t *testing.T) {
pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)

vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
assert.NoError(t, err)

vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
assert.NoError(t, err)

vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2]))
assert.NoError(t, err)

sender, err := pcOffer.AddTrack(vp8WriterA)
assert.NoError(t, err)
assert.NotNil(t, sender)

assert.NoError(t, sender.AddEncoding(vp8WriterB))
assert.NoError(t, sender.AddEncoding(vp8WriterC))

rtcpCounter := uint64(0)
pcAnswer.OnTrack(func(trackRemote *TrackRemote, receiver *RTPReceiver) {
_, _, simulcastReadErr := receiver.ReadSimulcastRTCP(trackRemote.RID())
assert.NoError(t, simulcastReadErr)
atomic.AddUint64(&rtcpCounter, 1)
})

var midID, ridID uint8
for _, extension := range sender.GetParameters().HeaderExtensions {
switch extension.URI {
case sdp.SDESMidURI:
midID = uint8(extension.ID)
case sdp.SDESRTPStreamIDURI:
ridID = uint8(extension.ID)
}
}
assert.NotZero(t, midID)
assert.NotZero(t, ridID)

assert.NoError(t, signalPair(pcOffer, pcAnswer))

for sequenceNumber := uint16(0); atomic.LoadUint64(&rtcpCounter) < 3; sequenceNumber++ {
time.Sleep(20 * time.Millisecond)

for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
},
Payload: []byte{0x00},
}
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))

assert.NoError(t, track.WriteRTP(pkt))
}
}

closePairNow(t, pcOffer, pcAnswer)
})
}

// Everytime we receieve a new SSRC we probe it and try to determine the proper way to handle it.
Expand Down
13 changes: 11 additions & 2 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,21 @@ func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error
func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
select {
case <-r.received:
var rtcpInterceptor interceptor.RTCPReader

r.mu.Lock()
for _, t := range r.tracks {
if t.track != nil && t.track.rid == rid {
return t.rtcpInterceptor.Read(b, a)
rtcpInterceptor = t.rtcpInterceptor
}
}
return 0, nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
r.mu.Unlock()

if rtcpInterceptor == nil {
return 0, nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
}

Check warning on line 247 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L246-L247

Added lines #L246 - L247 were not covered by tests
return rtcpInterceptor.Read(b, a)

case <-r.closed:
return 0, nil, io.ErrClosedPipe
}
Expand Down

0 comments on commit 6f8bcd6

Please sign in to comment.