diff --git a/peerconnection.go b/peerconnection.go index 5e2fba24cf..90e7926baa 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1622,7 +1622,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err if rsid != "" { receiver.mu.Lock() defer receiver.mu.Unlock() - return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor) + return receiver.receiveForSimulcastRtx(rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor) } track, err := receiver.receiveForRid(rid, params, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor) diff --git a/rtpreceiver.go b/rtpreceiver.go index 2457ba080a..026fd23d3d 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -225,7 +225,7 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { return err } - if err = r.receiveForRtx(rtxSsrc, "", streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil { + if err = r.receiveForRtx(rtxSsrc, streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil { return err } } @@ -409,21 +409,44 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo return nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid) } +func (r *RTPReceiver) receiveForSimulcastRtx(rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error { + var track *trackStreams + for i := range r.tracks { + if r.tracks[i].track.RID() == rsid { + track = &r.tracks[i] + } + } + + if track == nil { + return fmt.Errorf("%w: rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, rsid) + } + + track.repairStreamInfo = streamInfo + track.repairReadStream = rtpReadStream + track.repairInterceptor = rtpInterceptor + track.repairRtcpReadStream = rtcpReadStream + track.repairRtcpInterceptor = rtcpInterceptor + + go func() { + b := make([]byte, r.api.settingEngine.getReceiveMTU()) + for { + if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil { + return + } + } + }() + return nil +} + // receiveForRtx starts a routine that processes the repair stream -func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error { +func (r *RTPReceiver) receiveForRtx(ssrc SSRC, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error { var track *trackStreams if ssrc != 0 && len(r.tracks) == 1 { track = &r.tracks[0] - } else { - for i := range r.tracks { - if r.tracks[i].track.RID() == rsid { - track = &r.tracks[i] - } - } } if track == nil { - return fmt.Errorf("%w: ssrc(%d) rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, ssrc, rsid) + return fmt.Errorf("%w: ssrc(%d)", errRTPReceiverForRIDTrackStreamNotFound, ssrc) } track.repairStreamInfo = streamInfo