Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log cleanup pass #2285

Merged
merged 3 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb
github.com/livekit/protocol v1.9.3-0.20231130173607-ec88d89da1d3
github.com/livekit/protocol v1.9.4-0.20231202181655-afa0350bcd0f
github.com/livekit/psrpc v0.5.2
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb h1:KiGg4k+kYQD9NjKixaSDMMeYOO2//XBM4IROTI1Itjo=
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc=
github.com/livekit/protocol v1.9.3-0.20231130173607-ec88d89da1d3 h1:am72beYtXZM71MRr+12lkG3IyqKxzrCa6slsbKrwMe8=
github.com/livekit/protocol v1.9.3-0.20231130173607-ec88d89da1d3/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes=
github.com/livekit/protocol v1.9.4-0.20231202181655-afa0350bcd0f h1:6XPC53t/XEcfIe8BUwKkeFmgTLKPfF77JmQ7nydqoOs=
github.com/livekit/protocol v1.9.4-0.20231202181655-afa0350bcd0f/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
Expand Down
9 changes: 8 additions & 1 deletion pkg/routing/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package routing

import "errors"
import (
"errors"
)

var (
ErrNotFound = errors.New("could not find object")
Expand All @@ -26,4 +28,9 @@ var (
ErrInvalidRouterMessage = errors.New("invalid router message")
ErrChannelClosed = errors.New("channel closed")
ErrChannelFull = errors.New("channel is full")

// errors when starting signal connection
ErrRequestChannelClosed = errors.New("request channel closed")
ErrCouldNotMigrateParticipant = errors.New("could not migrate participant")
ErrClientInfoNotSet = errors.New("client info not set")
)
9 changes: 8 additions & 1 deletion pkg/routing/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,16 @@ type Router interface {
OnRTCMessage(callback RTCMessageCallback)
}

type StartParticipantSignalResults struct {
ConnectionID livekit.ConnectionID
RequestSink MessageSink
ResponseSource MessageSource
NodeID livekit.NodeID
}

type MessageRouter interface {
// StartParticipantSignal participant signal connection is ready to start
StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

// Write a message to a participant or room
WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
Expand Down
13 changes: 10 additions & 3 deletions pkg/routing/localrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,25 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) {
}, nil
}

func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) {
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error) {
return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(r.currentNode.Id))
}

func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) {
connectionID, reqSink, resSource, err = r.signalClient.StartParticipantSignal(ctx, roomName, pi, nodeID)
func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error) {
connectionID, reqSink, resSource, err := r.signalClient.StartParticipantSignal(ctx, roomName, pi, nodeID)
if err != nil {
logger.Errorw("could not handle new participant", err,
"room", roomName,
"participant", pi.Identity,
"connID", connectionID,
)
} else {
return StartParticipantSignalResults{
ConnectionID: connectionID,
RequestSink: reqSink,
ResponseSource: resSource,
NodeID: nodeID,
}, nil
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it now returns nodeID, so RTCService would have that information for consolidated logging

}
return
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/routing/redisrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,41 +156,41 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) {
}

// StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue
func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) {
func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error) {
// find the node where the room is hosted at
rtcNode, err := r.GetNodeForRoom(ctx, roomName)
if err != nil {
return
}

if r.usePSRPCSignal {
connectionID, reqSink, resSource, err = r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id))
res, err = r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id))
if err != nil {
return
}

// map signal & rtc nodes
err = r.setParticipantSignalNode(connectionID, r.currentNode.Id)
err = r.setParticipantSignalNode(res.ConnectionID, r.currentNode.Id)
return
}

connectionID = livekit.ConnectionID(utils.NewGuid("CO_"))
res.ConnectionID = livekit.ConnectionID(utils.NewGuid("CO_"))
pKey := ParticipantKeyLegacy(roomName, pi.Identity)
pKeyB62 := ParticipantKey(roomName, pi.Identity)

// map signal & rtc nodes
if err = r.setParticipantSignalNode(connectionID, r.currentNode.Id); err != nil {
if err = r.setParticipantSignalNode(res.ConnectionID, r.currentNode.Id); err != nil {
return
}

// index by connectionID, since there may be multiple connections for the participant
// set up response channel before sending StartSession and be ready to receive responses.
resChan := r.getOrCreateMessageChannel(r.responseChannels, string(connectionID))
resChan := r.getOrCreateMessageChannel(r.responseChannels, string(res.ConnectionID))

sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), connectionID, pKey, pKeyB62)
sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), res.ConnectionID, pKey, pKeyB62)

// serialize claims
ss, err := pi.ToStartSession(roomName, connectionID)
ss, err := pi.ToStartSession(roomName, res.ConnectionID)
if err != nil {
return
}
Expand All @@ -201,7 +201,9 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek
return
}

return connectionID, sink, resChan, nil
res.RequestSink = sink
res.ResponseSource = resChan
return res, nil
}

func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
Expand Down
48 changes: 19 additions & 29 deletions pkg/routing/routingfakes/fake_router.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,6 @@ func (p *ParticipantImpl) HandleSignalSourceClose() {

if !p.TransportManager.HasPublisherEverConnected() && !p.TransportManager.HasSubscriberEverConnected() {
reason := types.ParticipantCloseReasonJoinFailed
p.params.Logger.Infow("closing disconnected participant", "reason", reason)
_ = p.Close(false, reason, false)
}
}
Expand Down Expand Up @@ -858,7 +857,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool {
if p.IsClosed() || p.IsDisconnected() {
return
}
p.subLogger.Infow("closing subscriber peer connection to aid migration")
p.subLogger.Debugw("closing subscriber peer connection to aid migration")

//
// Close all down tracks before closing subscriber peer connection.
Expand Down Expand Up @@ -1444,7 +1443,6 @@ func (p *ParticipantImpl) setupDisconnectTimer() {
return
}
reason := types.ParticipantCloseReasonPeerConnectionDisconnected
p.params.Logger.Infow("closing disconnected participant", "reason", reason)
_ = p.Close(true, reason, false)
})
p.lock.Unlock()
Expand Down Expand Up @@ -1693,7 +1691,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
}

p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}}
p.pubLogger.Infow("pending track added", "trackID", ti.Sid, "track", logger.Proto(ti), "request", logger.Proto(req))
p.pubLogger.Debugw("pending track added", "trackID", ti.Sid, "track", logger.Proto(ti), "request", logger.Proto(req))
return ti
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/rtc/participant_signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) {
sink := p.getResponseSink()
if sink != nil {
p.params.Logger.Infow("closing signal connection", "reason", reason, "connID", sink.ConnectionID())
if reason != types.SignallingCloseReasonParticipantClose {
p.params.Logger.Infow("closing signal connection", "reason", reason, "connID", sink.ConnectionID())
}
sink.Close()
p.SetResponseSink(nil)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtc/subscriptionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ func (s *trackSubscription) handleSourceTrackRemoved() {
}

// source track removed, we would unsubscribe
s.logger.Infow("unsubscribing from track since source track was removed")
s.logger.Debugw("unsubscribing from track since source track was removed")
s.desired = false

s.setChangedNotifierLocked(nil)
Expand Down
Loading