Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-7403 - add remote rtp_passthrough support #3957

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 176 additions & 52 deletions components/camera/client.go

Large diffs are not rendered by default.

509 changes: 509 additions & 0 deletions components/camera/client_test.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions components/camera/fake/camera.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/edaniels/golog"
"github.com/golang/geo/r3"
"github.com/pkg/errors"
"go.viam.com/utils"
Expand Down Expand Up @@ -294,6 +295,8 @@ func (c *Camera) SubscribeRTP(
bufferSize int,
packetsCB rtppassthrough.PacketCallback,
) (rtppassthrough.Subscription, error) {
golog.Global().Warnf("SubscribeRTP FAKE START %s", c.Name().String())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

defer golog.Global().Warnf("SubscribeRTP FAKE END %s", c.Name().String())
if !c.RTPPassthrough {
return rtppassthrough.NilSubscription, ErrRTPPassthroughNotEnabled
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,5 @@ require (
github.com/ziutek/mymysql v1.5.4 // indirect
golang.org/x/exp v0.0.0-20230725012225-302865e7556b
)

replace go.viam.com/utils => github.com/nicksanford/goutils v0.0.0-20240621203708-64f2ab5ce1c5
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,8 @@ github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 h1:4kuARK6Y6Fx
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354/go.mod h1:KSVJerMDfblTH7p5MZaTt+8zaT2iEk3AkVb9PQdZuE8=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/nicksanford/goutils v0.0.0-20240621203708-64f2ab5ce1c5 h1:H5hkz1B66vck5csOHfe4CN8G2DaBCkNOt/aMwNjeMYE=
github.com/nicksanford/goutils v0.0.0-20240621203708-64f2ab5ce1c5/go.mod h1:G1biDWOtjs8gbQDfJpc9NJuz+gxfqM07gSIqekeZE7M=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nishanths/exhaustive v0.1.0/go.mod h1:S1j9110vxV1ECdCudXRkeMnFQ/DQk9ajLT0Uf2MYZQQ=
github.com/nishanths/exhaustive v0.11.0 h1:T3I8nUGhl/Cwu5Z2hfc92l0e04D2GEW6e0l8pzda2l0=
Expand Down Expand Up @@ -1544,8 +1546,6 @@ go.viam.com/api v0.1.311 h1:SFSlK2ol2mGD82XKofDrGIP4g1cxBRhQrrbMfMYAxk8=
go.viam.com/api v0.1.311/go.mod h1:msa4TPrMVeRDcG4YzKA/S6wLEUC7GyHQE973JklrQ10=
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2 h1:oBiK580EnEIzgFLU4lHOXmGAE3MxnVbeR7s1wp/F3Ps=
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2/go.mod h1:XM0tej6riszsiNLT16uoyq1YjuYPWlRBweTPRDanIts=
go.viam.com/utils v0.1.82 h1:4X687qm+Qvl8aV6BKttqELnRJaigvk7JPUAwHDvAf2g=
go.viam.com/utils v0.1.82/go.mod h1:G1biDWOtjs8gbQDfJpc9NJuz+gxfqM07gSIqekeZE7M=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 h1:WJhcL4p+YeDxmZWg141nRm7XC8IDmhz7lk5GpadO1Sg=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs=
Expand Down
8 changes: 8 additions & 0 deletions gostream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Stream interface {
// Start starts processing frames.
Start()
WriteRTP(pkt *rtp.Packet) error
VideoStreamSourceChanged()

// Ready signals that there is at least one client connected and that
// streams are ready for input. The returned context should be used for
Expand Down Expand Up @@ -165,6 +166,13 @@ func (bs *basicStream) Start() {
utils.ManagedGo(bs.processOutputAudioChunks, bs.activeBackgroundWorkers.Done)
}

func (bs *basicStream) VideoStreamSourceChanged() {
bs.videoTrackLocal.rtpTrack.StreamSourceChanged()
}

// NOTE: (Nick S) This only writes video RTP packets
// if we also need to support writing audio RTP packets, we should split
// this method into WriteVideoRTP and WriteAudioRTP
func (bs *basicStream) WriteRTP(pkt *rtp.Packet) error {
return bs.videoTrackLocal.rtpTrack.WriteRTP(pkt)
}
Expand Down
71 changes: 67 additions & 4 deletions gostream/webrtc_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/edaniels/golog"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3"
Expand Down Expand Up @@ -34,15 +36,21 @@ type trackLocalStaticRTP struct {
bindings []trackBinding
codec webrtc.RTPCodecCapability
id, rid, streamID string

// testing
sequenceNumberOffset func(uint16) uint16
streamSourceChanged atomic.Bool
highestSequenceNumber uint16
}

// newtrackLocalStaticRTP returns a trackLocalStaticRTP.
func newtrackLocalStaticRTP(c webrtc.RTPCodecCapability, id, streamID string) *trackLocalStaticRTP {
return &trackLocalStaticRTP{
codec: c,
bindings: []trackBinding{},
id: id,
streamID: streamID,
sequenceNumberOffset: func(u uint16) uint16 { return u },
codec: c,
bindings: []trackBinding{},
id: id,
streamID: streamID,
}
}

Expand Down Expand Up @@ -112,13 +120,68 @@ func (s *trackLocalStaticRTP) Codec() webrtc.RTPCodecCapability {
return s.codec
}

func sequenceNumberOffset(lastMaxPacketSequenceNumber uint16, firstNewPacketSequenceNumber uint16) func(uint16) uint16 {
if lastMaxPacketSequenceNumber > firstNewPacketSequenceNumber {
golog.Global().Warnf("new sequenceNumberGenerator which is not identity lastMaxPacketSequenceNumber(%d), firstNewPacketSequenceNumber(%d)", lastMaxPacketSequenceNumber, firstNewPacketSequenceNumber)
// continue with prev
return func(sequenceNumber uint16) uint16 {
return lastMaxPacketSequenceNumber + 1 + sequenceNumber - firstNewPacketSequenceNumber
}

}
return func(u uint16) uint16 { return u }
}

func (s *trackLocalStaticRTP) StreamSourceChanged() {
s.streamSourceChanged.Store(true)
}

func wrapped(currentSequenceNumber uint16, hightestSequenceNumber uint16) bool {
// if the current sequence number is smaller than the currentHighestSequenceNumber by more
// than half the u16, assume that the sequence number has wrapped
if currentSequenceNumber > hightestSequenceNumber {
return false
}
return hightestSequenceNumber-currentSequenceNumber > math.MaxUint16/2
}

// WriteRTP writes a RTP Packet to the trackLocalStaticRTP
// If one PeerConnection fails the packets will still be sent to
// all PeerConnections. The error message will contain the ID of the failed
// PeerConnections so you can remove them.
func (s *trackLocalStaticRTP) WriteRTP(p *rtp.Packet) error {
s.mu.RLock()
defer s.mu.RUnlock()
originalSequenceNumber := p.Header.SequenceNumber

// create new generator if the stream has changed needed
if s.streamSourceChanged.CompareAndSwap(true, false) {
// TODO: Rollover
s.sequenceNumberOffset = sequenceNumberOffset(s.highestSequenceNumber, p.Header.SequenceNumber)
}

// Update the header's sequence number to
p.Header.SequenceNumber = s.sequenceNumberOffset(p.Header.SequenceNumber)

// set the currentHighestSequenceNumber to the current packet's sequence number
// if the packet's sequence number is greater than the current highest sequence number
// or if the sequence number wrapped
setHighest := false
if p.Header.SequenceNumber > s.highestSequenceNumber {
setHighest = true
}

if wrapped(p.Header.SequenceNumber, s.highestSequenceNumber) {
golog.Global().Warnf("updating highestSequenceNumber to %d due to wrapping, prevhighestSequenceNumber: %d, originalSN: %d", p.Header.SequenceNumber, s.highestSequenceNumber, originalSequenceNumber)
setHighest = true
}

if setHighest {
s.highestSequenceNumber = p.Header.SequenceNumber
} else {
golog.Global().Warnf("publishing out of order message p.Header.SequenceNumber(%d), originalSN: %d, highestSequenceNumber: %d", p.Header.SequenceNumber, originalSequenceNumber, s.highestSequenceNumber)
}
golog.Global().Debugf("SN: %d", p.Header.SequenceNumber)

writeErrs := []error{}
outboundPacket := *p
Expand Down
42 changes: 42 additions & 0 deletions grpc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ import (
"errors"
"sync"

"github.com/edaniels/golog"
"github.com/pion/webrtc/v3"
"go.viam.com/utils/rpc"
"golang.org/x/exp/maps"
googlegrpc "google.golang.org/grpc"
)

// ReconfigurableClientConn allows for the underlying client connections to be swapped under the hood.
type ReconfigurableClientConn struct {
connMu sync.RWMutex
conn rpc.ClientConn

onTrackCBByTrackNameMu sync.Mutex
onTrackCBByTrackName map[string]OnTrackCB
}

// Return this constant such that backoff error logging can compare consecutive errors and reliably
Expand Down Expand Up @@ -57,8 +62,31 @@ func (c *ReconfigurableClientConn) NewStream(
// ReplaceConn replaces the underlying client connection with the connection passed in. This does not close the
// old connection, the caller is expected to close it if needed.
func (c *ReconfigurableClientConn) ReplaceConn(conn rpc.ClientConn) {
golog.Global().Info("ReplaceConn START")
defer golog.Global().Info("ReplaceConn END")
c.connMu.Lock()
c.conn = conn
// It is safe to access this without a mutex as it is only ever nil once at the beginning of the
// ReconfigurableClientConn's lifetime
if c.onTrackCBByTrackName == nil {
c.onTrackCBByTrackName = make(map[string]OnTrackCB)
}

if pc := conn.PeerConn(); pc != nil {
pc.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
golog.Global().Warnf("OnTrack START %s pc: %p", trackRemote.StreamID(), pc)
defer golog.Global().Warnf("OnTrack END %s pc: %p", trackRemote.StreamID(), pc)
Comment on lines +77 to +78
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

c.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := c.onTrackCBByTrackName[trackRemote.StreamID()]
c.onTrackCBByTrackNameMu.Unlock()
if !ok {
msg := "Callback not found for StreamID (trackName): %s, keys(resOnTrackCBs): %#v"
golog.Global().Errorf(msg, trackRemote.StreamID(), maps.Keys(c.onTrackCBByTrackName))
return
}
onTrackCB(trackRemote, rtpReceiver)
})
}
c.connMu.Unlock()
}

Expand All @@ -84,3 +112,17 @@ func (c *ReconfigurableClientConn) Close() error {
c.conn = nil
return conn.Close()
}

// AddOnTrackSub adds an OnTrack subscription for the track.
func (c *ReconfigurableClientConn) AddOnTrackSub(trackName string, onTrackCB OnTrackCB) {
c.onTrackCBByTrackNameMu.Lock()
defer c.onTrackCBByTrackNameMu.Unlock()
c.onTrackCBByTrackName[trackName] = onTrackCB
}

// RemoveOnTrackSub removes an OnTrack subscription for the track.
func (c *ReconfigurableClientConn) RemoveOnTrackSub(trackName string) {
c.onTrackCBByTrackNameMu.Lock()
defer c.onTrackCBByTrackNameMu.Unlock()
delete(c.onTrackCBByTrackName, trackName)
}
47 changes: 22 additions & 25 deletions grpc/shared_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package grpc
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
Expand All @@ -16,10 +15,10 @@ import (
"go.uber.org/zap"
"go.viam.com/utils"
"go.viam.com/utils/rpc"
"golang.org/x/exp/maps"
googlegrpc "google.golang.org/grpc"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
rutils "go.viam.com/rdk/utils"
)

Expand Down Expand Up @@ -81,8 +80,8 @@ type SharedConn struct {
// set to nil before this channel is closed.
peerConnFailed chan struct{}

resOnTrackMu sync.Mutex
resOnTrackCBs map[resource.Name]OnTrackCB
onTrackCBByTrackNameMu sync.Mutex
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renameed variables as we are no longer storing resource names, but rather strings of the resource name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull out the name changes into a separate PR

onTrackCBByTrackName map[string]OnTrackCB

logger logging.Logger
}
Expand All @@ -107,18 +106,18 @@ func (sc *SharedConn) NewStream(
return sc.grpcConn.NewStream(ctx, desc, method, opts...)
}

// AddOnTrackSub adds an OnTrack subscription for the resource.
func (sc *SharedConn) AddOnTrackSub(name resource.Name, onTrackCB OnTrackCB) {
sc.resOnTrackMu.Lock()
defer sc.resOnTrackMu.Unlock()
sc.resOnTrackCBs[name] = onTrackCB
// AddOnTrackSub adds an OnTrack subscription for the track.
func (sc *SharedConn) AddOnTrackSub(trackName string, onTrackCB OnTrackCB) {
sc.onTrackCBByTrackNameMu.Lock()
defer sc.onTrackCBByTrackNameMu.Unlock()
sc.onTrackCBByTrackName[trackName] = onTrackCB
}

// RemoveOnTrackSub removes an OnTrack subscription for the resource.
func (sc *SharedConn) RemoveOnTrackSub(name resource.Name) {
sc.resOnTrackMu.Lock()
defer sc.resOnTrackMu.Unlock()
delete(sc.resOnTrackCBs, name)
// RemoveOnTrackSub removes an OnTrack subscription for the track.
func (sc *SharedConn) RemoveOnTrackSub(trackName string) {
sc.onTrackCBByTrackNameMu.Lock()
defer sc.onTrackCBByTrackNameMu.Unlock()
delete(sc.onTrackCBByTrackName, trackName)
}

// GrpcConn returns a gRPC capable client connection.
Expand Down Expand Up @@ -160,9 +159,11 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
sc.logger = moduleLogger.Sublogger("conn")
}

if sc.resOnTrackCBs == nil {
// It is safe to access this without a mutex as it is only ever nil once at the beginning of the
// SharedConn's lifetime
if sc.onTrackCBByTrackName == nil {
// Same initilization argument as above with the logger.
sc.resOnTrackCBs = make(map[resource.Name]OnTrackCB)
sc.onTrackCBByTrackName = make(map[string]OnTrackCB)
}

sc.peerConnMu.Lock()
Expand Down Expand Up @@ -200,16 +201,12 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
}

sc.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
name, err := resource.NewFromString(trackRemote.StreamID())
if err != nil {
sc.logger.Errorw("StreamID did not parse as a ResourceName", "sharedConn", fmt.Sprintf("%p", sc), "streamID", trackRemote.StreamID())
return
}
sc.resOnTrackMu.Lock()
onTrackCB, ok := sc.resOnTrackCBs[name]
sc.resOnTrackMu.Unlock()
sc.onTrackCBByTrackNameMu.Lock()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the requirement that the streamID (aka track name) parse as a resource name as we are now just storing strings in the callback lookup map

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth it to have the onTrackByTrackName as a separate struct with a separate mutex within the client struct? I always worry with multiple mutexes on the same struct that we're making the code harder to handle.

Totally okay if the answer is no - too much redesign/I'm actually making it more complex.

onTrackCB, ok := sc.onTrackCBByTrackName[trackRemote.StreamID()]
sc.onTrackCBByTrackNameMu.Unlock()
if !ok {
sc.logger.Errorw("Callback not found for StreamID", "sharedConn", fmt.Sprintf("%p", sc), "streamID", trackRemote.StreamID())
msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
sc.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(sc.onTrackCBByTrackName))
return
}
onTrackCB(trackRemote, rtpReceiver)
Expand Down
9 changes: 9 additions & 0 deletions grpc/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package grpc

// Tracker allows callback functions to a WebRTC peer connection's OnTrack callback
// function by track name.
// Both grpc.SharedConn and grpc.ReconfigurableClientConn implement tracker.
type Tracker interface {
AddOnTrackSub(trackName string, onTrackCB OnTrackCB)
RemoveOnTrackSub(trackName string)
}
20 changes: 20 additions & 0 deletions grpc/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package grpc

import (
"reflect"
"testing"

"go.viam.com/test"
)

func TestTrackerImplementations(t *testing.T) {
tracker := reflect.TypeOf((*Tracker)(nil)).Elem()

t.Run("*ReconfigurableClientConn should implement Tracker", func(t *testing.T) {
test.That(t, reflect.TypeOf(&ReconfigurableClientConn{}).Implements(tracker), test.ShouldBeTrue)
})

t.Run("*SharedConn should implement Tracker", func(t *testing.T) {
test.That(t, reflect.TypeOf(&SharedConn{}).Implements(tracker), test.ShouldBeTrue)
})
}
5 changes: 5 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,15 @@ func (rc *RobotClient) connectWithLock(ctx context.Context) error {
rc.conn.ReplaceConn(conn)
rc.client = client
rc.refClient = refClient
rc.logger.Warn("connected")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

rc.connected.Store(true)
if len(rc.resourceClients) != 0 {
rc.logger.Warn("updateResources START")
if err := rc.updateResources(ctx); err != nil {
rc.logger.Errorf("updateResources END, err: %s", err.Error())
return err
}
rc.logger.Warn("updateResources END")
}

if rc.changeChan != nil {
Expand Down Expand Up @@ -450,6 +454,7 @@ func (rc *RobotClient) checkConnection(ctx context.Context, checkEvery, reconnec
"reconnect_interval", reconnectEvery.Seconds(),
)
rc.mu.Lock()
rc.logger.Warn("NOT connected")
rc.connected.Store(false)
if rc.changeChan != nil {
rc.changeChan <- true
Expand Down
Loading
Loading