Skip to content

Commit

Permalink
ignore packets was not forwarded
Browse files Browse the repository at this point in the history
  • Loading branch information
cnderrauber committed May 24, 2024
1 parent 0230a8e commit 40013db
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
8 changes: 4 additions & 4 deletions pkg/sfu/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type WebRTCReceiver struct {

primaryReceiver atomic.Pointer[RedPrimaryReceiver]
redReceiver atomic.Pointer[RedReceiver]
redPktWriter func(pkt *buffer.ExtPacket, spatialLayer int32)
redPktWriter func(pkt *buffer.ExtPacket, spatialLayer int32) int

forwardStats *ForwardStats
}
Expand Down Expand Up @@ -715,15 +715,15 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
}
}

w.downTrackSpreader.Broadcast(func(dt TrackSender) {
writeCount := w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(pkt, spatialLayer)
})

if redPktWriter != nil {
redPktWriter(pkt, spatialLayer)
writeCount += redPktWriter(pkt, spatialLayer)
}

if w.forwardStats != nil {
if writeCount > 0 && w.forwardStats != nil {
w.forwardStats.Update(pkt.Arrival, time.Now())
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/sfu/redprimaryreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,19 @@ func NewRedPrimaryReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams)
}
}

func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) {
func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int {
// extract primary payload from RED and forward to downtracks
if r.downTrackSpreader.DownTrackCount() == 0 {
return
return 0
}

pkts, err := r.getSendPktsFromRed(pkt.Packet)
if err != nil {
r.logger.Errorw("get encoding for red failed", err, "payloadtype", pkt.Packet.PayloadType)
return
return 0
}

var count int
for i, sendPkt := range pkts {
pPkt := *pkt
if i != len(pkts)-1 {
Expand All @@ -81,10 +82,11 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3

// not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack,
// otherwise it should be set to the correct value (marshal the primary rtp packet)
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
count += r.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(&pPkt, spatialLayer)
})
}
return count
}

func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/sfu/redreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ func NewRedReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams) *RedRec
}
}

func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) {
func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int {
// extract primary payload from RED and forward to downtracks
if r.downTrackSpreader.DownTrackCount() == 0 {
return
return 0
}
redLen, err := r.encodeRedForPrimary(pkt.Packet, r.redPayloadBuf[:])
if err != nil {
r.logger.Errorw("red encoding failed", err)
return
return 0
}

pPkt := *pkt
Expand All @@ -73,7 +73,7 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) {

// not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack,
// otherwise it should be set to the correct value (marshal the primary rtp packet)
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
return r.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(&pPkt, spatialLayer)
})
}
Expand Down

0 comments on commit 40013db

Please sign in to comment.