Skip to content

Commit

Permalink
Fix receiveForRtx usage for simulcast
Browse files Browse the repository at this point in the history
  • Loading branch information
aalekseevx committed Apr 18, 2024
1 parent fc3ef75 commit 6f718a5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
2 changes: 1 addition & 1 deletion peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
if rsid != "" {
receiver.mu.Lock()
defer receiver.mu.Unlock()
return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor)
return receiver.receiveForSimulcastRtx(rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor)

Check warning on line 1625 in peerconnection.go

View check run for this annotation

Codecov / codecov/patch

peerconnection.go#L1625

Added line #L1625 was not covered by tests
}

track, err := receiver.receiveForRid(rid, params, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor)
Expand Down
41 changes: 32 additions & 9 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {
return err
}

if err = r.receiveForRtx(rtxSsrc, "", streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil {
if err = r.receiveForRtx(rtxSsrc, streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil {

Check warning on line 228 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L228

Added line #L228 was not covered by tests
return err
}
}
Expand Down Expand Up @@ -409,21 +409,44 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo
return nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
}

func (r *RTPReceiver) receiveForSimulcastRtx(rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
var track *trackStreams
for i := range r.tracks {
if r.tracks[i].track.RID() == rsid {
track = &r.tracks[i]

Check warning on line 416 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L412-L416

Added lines #L412 - L416 were not covered by tests
}
}

if track == nil {
return fmt.Errorf("%w: rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, rsid)

Check warning on line 421 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L420-L421

Added lines #L420 - L421 were not covered by tests
}

track.repairStreamInfo = streamInfo
track.repairReadStream = rtpReadStream
track.repairInterceptor = rtpInterceptor
track.repairRtcpReadStream = rtcpReadStream
track.repairRtcpInterceptor = rtcpInterceptor

Check warning on line 428 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L424-L428

Added lines #L424 - L428 were not covered by tests

go func() {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
for {
if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {
return

Check warning on line 434 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L430-L434

Added lines #L430 - L434 were not covered by tests
}
}
}()
return nil

Check warning on line 438 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L438

Added line #L438 was not covered by tests
}

// receiveForRtx starts a routine that processes the repair stream
func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
func (r *RTPReceiver) receiveForRtx(ssrc SSRC, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {

Check warning on line 442 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L442

Added line #L442 was not covered by tests
var track *trackStreams
if ssrc != 0 && len(r.tracks) == 1 {
track = &r.tracks[0]
} else {
for i := range r.tracks {
if r.tracks[i].track.RID() == rsid {
track = &r.tracks[i]
}
}
}

if track == nil {
return fmt.Errorf("%w: ssrc(%d) rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, ssrc, rsid)
return fmt.Errorf("%w: ssrc(%d)", errRTPReceiverForRIDTrackStreamNotFound, ssrc)

Check warning on line 449 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L449

Added line #L449 was not covered by tests
}

track.repairStreamInfo = streamInfo
Expand Down

0 comments on commit 6f718a5

Please sign in to comment.