Skip to content

Commit

Permalink
Adapt samplebuilder new depacketizer in pion/rtp
Browse files Browse the repository at this point in the history
Both partition head and tail checking is now done in the
depacketizer.  For backwards compatibility, there are stubs
for PartitionHeadChecker and WithPartitionHeadChecker that do
nothing; these should be removed for 4.0.

This also fixes a bug with the depacketizer: if no head checker
was present, every packet would be considered as a potential
partition head, even if it was at the beginning of the buffer.
Since a partition head checker is now always present, the bug
cannot happen.

The tests assume the old bug, which is why the fakePacketizer
returns true if headBytes is empty.  It would be better to adapt
the tests to do the right thing, as in jech/depacketizer.
  • Loading branch information
jech authored and Sean-Der committed Aug 23, 2021
1 parent 9cc9730 commit 23a436c
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 37 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
github.com/pion/rtcp v1.2.6
github.com/pion/rtp v1.7.1
github.com/pion/rtp v1.7.2
github.com/pion/sctp v1.7.12
github.com/pion/sdp/v3 v3.0.4
github.com/pion/srtp/v2 v2.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TB
github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo=
github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtp v1.7.0/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.7.1 h1:hCaxfVgPGt13eF/Tu9RhVn04c+dAcRZmhdDWqUE13oY=
github.com/pion/rtp v1.7.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.7.2 h1:HCDKDCixh7PVjkQTsqHAbk1lg+bx059EHxcnyl42dYs=
github.com/pion/rtp v1.7.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0=
github.com/pion/sctp v1.7.12 h1:GsatLufywVruXbZZT1CKg+Jr8ZTkwiPnmUC/oO9+uuY=
github.com/pion/sctp v1.7.12/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s=
Expand Down
23 changes: 8 additions & 15 deletions pkg/media/samplebuilder/samplebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ type SampleBuilder struct {
// sampleRate allows us to compute duration of media.SamplecA
sampleRate uint32

// Interface that checks whether the packet is the first fragment of the frame or not
partitionHeadChecker rtp.PartitionHeadChecker

// the handler to be called when the builder is about to remove the
// reference to some packet.
packetReleaseHandler func(*rtp.Packet)
Expand Down Expand Up @@ -204,7 +201,7 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
var consume sampleSequenceLocation

for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
if s.depacketizer.IsDetectedFinalPacketInSequence(s.buffer[i].Marker) {
if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
consume.head = s.active.head
consume.tail = i + 1
break
Expand Down Expand Up @@ -244,13 +241,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {

// prior to decoding all the packets, check if this packet
// would end being disposed anyway
if s.partitionHeadChecker != nil {
if !s.partitionHeadChecker.IsPartitionHead(s.buffer[consume.head].Payload) {
s.droppedPackets += consume.count()
s.purgeConsumedLocation(consume, true)
s.purgeConsumedBuffers()
return nil
}
if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
s.droppedPackets += consume.count()
s.purgeConsumedLocation(consume, true)
s.purgeConsumedBuffers()
return nil
}

// merge all the buffers into a sample
Expand Down Expand Up @@ -329,11 +324,9 @@ func timestampDistance(x, y uint32) uint32 {
// An Option configures a SampleBuilder.
type Option func(o *SampleBuilder)

// WithPartitionHeadChecker assigns a codec-specific PartitionHeadChecker to SampleBuilder.
// Several PartitionHeadCheckers are available in package github.com/pion/rtp/codecs.
func WithPartitionHeadChecker(checker rtp.PartitionHeadChecker) Option {
// WithPartitionHeadChecker is obsolete, it does nothing.
func WithPartitionHeadChecker(checker interface{}) Option {
return func(o *SampleBuilder) {
o.partitionHeadChecker = checker
}
}

Expand Down
44 changes: 25 additions & 19 deletions pkg/media/samplebuilder/samplebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ type sampleBuilderTest struct {
maxLateTimestamp uint32
}

type fakeDepacketizer struct{}
type fakeDepacketizer struct {
headChecker bool
headBytes []byte
}

func (f *fakeDepacketizer) Unmarshal(r []byte) ([]byte, error) {
return r, nil
}

func (f *fakeDepacketizer) IsDetectedFinalPacketInSequence(rtpPacketMarketBit bool) bool {
return rtpPacketMarketBit
}

type fakePartitionHeadChecker struct {
headBytes []byte
}

func (f *fakePartitionHeadChecker) IsPartitionHead(payload []byte) bool {
func (f *fakeDepacketizer) IsPartitionHead(payload []byte) bool {
if !f.headChecker {
// simulates a bug in the 3.0 version
// the tests should be fixed to not assume the bug
return true
}
for _, b := range f.headBytes {
if payload[0] == b {
return true
Expand All @@ -43,6 +43,10 @@ func (f *fakePartitionHeadChecker) IsPartitionHead(payload []byte) bool {
return false
}

func (f *fakeDepacketizer) IsPartitionTail(marker bool, payload []byte) bool {
return marker
}

func TestSampleBuilder(t *testing.T) {
testData := []sampleBuilderTest{
{
Expand Down Expand Up @@ -226,18 +230,17 @@ func TestSampleBuilder(t *testing.T) {

for _, t := range testData {
var opts []Option
if t.withHeadChecker {
opts = append(opts, WithPartitionHeadChecker(
&fakePartitionHeadChecker{headBytes: t.headBytes},
))
}
if t.maxLateTimestamp != 0 {
opts = append(opts, WithMaxTimeDelay(
time.Millisecond*time.Duration(int64(t.maxLateTimestamp)),
))
}

s := New(t.maxLate, &fakeDepacketizer{}, 1, opts...)
d := &fakeDepacketizer{
headChecker: t.withHeadChecker,
headBytes: t.headBytes,
}
s := New(t.maxLate, d, 1, opts...)
samples := []*media.Sample{}

for _, p := range t.packets {
Expand Down Expand Up @@ -333,9 +336,12 @@ func TestSampleBuilderPushMaxZero(t *testing.T) {
pkts := []rtp.Packet{
{Header: rtp.Header{SequenceNumber: 0, Timestamp: 0, Marker: true}, Payload: []byte{0x01}},
}
s := New(0, &fakeDepacketizer{}, 1, WithPartitionHeadChecker(
&fakePartitionHeadChecker{headBytes: []byte{0x01}},
))
d := &fakeDepacketizer{
headChecker: true,
headBytes: []byte{0x01},
}

s := New(0, d, 1)
s.Push(&pkts[0])
if sample := s.Pop(); sample == nil {
t.Error("Should expect a popped sample")
Expand Down

0 comments on commit 23a436c

Please sign in to comment.