Skip to content

Commit

Permalink
Allocationless handling of RTX pkts
Browse files Browse the repository at this point in the history
  • Loading branch information
adriancable authored and cnderrauber committed Feb 5, 2024
1 parent 5da7278 commit f68b789
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 28 deletions.
52 changes: 30 additions & 22 deletions rtpreceiver.go
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/srtp/v2"
"github.com/pion/webrtc/v3/internal/util"
)
Expand All @@ -42,7 +41,7 @@ type trackStreams struct {
}

type rtxPacketWithAttributes struct {
rtxPacket rtp.Packet
pkt []byte
attributes interceptor.Attributes
}

Expand Down Expand Up @@ -419,31 +418,40 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
return
}

pkt := &rtp.Packet{}
if err := pkt.Unmarshal(b[:i]); err != nil {
return
// RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
// payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
// as non-RTX RTP packets
hasExtension := b[0]&0b10000 > 0
hasPadding := b[0]&0b100000 > 0
csrcCount := b[0] & 0b1111
headerLength := uint16(12 + (4 * csrcCount))
paddingLength := 0
if hasExtension {
headerLength += 4 * (1 + binary.BigEndian.Uint16(b[headerLength+2:headerLength+4]))
}
if hasPadding {
paddingLength = int(b[i-1])
}

if len(pkt.Payload) < 2 {
if i-int(headerLength)-paddingLength < 2 {
// BWE probe packet, ignore
continue
}

// RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
// payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
// as non-RTX RTP packets
attributes.Set(attributeRtxPayloadType, pkt.Header.PayloadType)
attributes.Set(attributeRtxSsrc, pkt.Header.SSRC)
attributes.Set(attributeRtxSequenceNumber, pkt.Header.SequenceNumber)
pkt.Header.PayloadType = uint8(track.track.PayloadType())
pkt.Header.SSRC = uint32(track.track.SSRC())
pkt.Header.SequenceNumber = binary.BigEndian.Uint16(pkt.Payload[:2])
pkt.Payload = pkt.Payload[2:]
attributes.Set(attributeRtxPayloadType, b[1]&0x7F)
attributes.Set(attributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4]))
attributes.Set(attributeRtxSsrc, binary.BigEndian.Uint32(b[8:12]))

b[1] = (b[1] & 0x80) | uint8(track.track.PayloadType())
b[2] = b[headerLength]
b[3] = b[headerLength+1]
binary.BigEndian.PutUint32(b[8:12], uint32(track.track.SSRC()))
copy(b[headerLength:i-2], b[headerLength+2:i])

select {
case <-r.closed:
return
case track.repairStreamChannel <- rtxPacketWithAttributes{rtxPacket: *pkt, attributes: attributes}:
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}:
}
}
}()
Expand Down Expand Up @@ -484,23 +492,23 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote
}

// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
func (r *RTPReceiver) readRTX(reader *TrackRemote) (*rtp.Packet, interceptor.Attributes) {
func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
if !reader.HasRTX() {
return nil, interceptor.Attributes{}
return nil
}

select {
case <-r.received:
default:
return nil, interceptor.Attributes{}
return nil
}

if t := r.streamsForTrack(reader); t != nil {
select {
case rtxPacketReceived := <-t.repairStreamChannel:
return &rtxPacketReceived.rtxPacket, rtxPacketReceived.attributes
return &rtxPacketReceived
default:
}
}
return nil, interceptor.Attributes{}
return nil
}
10 changes: 4 additions & 6 deletions track_remote.go
Expand Up @@ -128,12 +128,10 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
}

// If there's a separate RTX track and an RTX packet is available, return that
if rtxPacket, rtxAttributes := r.readRTX(t); rtxPacket != nil {
n, err = rtxPacket.MarshalTo(b)
attributes = rtxAttributes
if err != nil {
return 0, nil, err
}
if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil {
n = copy(b, rtxPacketReceived.pkt)
attributes = rtxPacketReceived.attributes
err = nil
} else {
// If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return
// a packet from the main track
Expand Down

0 comments on commit f68b789

Please sign in to comment.