Skip to content

Commit

Permalink
Log cleanup pass (#2285)
Browse files Browse the repository at this point in the history
* Log cleanup pass

Demoted a bunch of logs to DEBUG, consolidated logs.

* use context logger and fix context var usage

* moved common error types, fixed tests
  • Loading branch information
davidzhao committed Dec 2, 2023
1 parent beecfe3 commit 3fe124c
Show file tree
Hide file tree
Showing 24 changed files with 1,171 additions and 149 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb
github.com/livekit/protocol v1.9.3-0.20231130173607-ec88d89da1d3
github.com/livekit/protocol v1.9.4-0.20231202181655-afa0350bcd0f
github.com/livekit/psrpc v0.5.2
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb h1:KiGg4k+kYQD9NjKixaSDMMeYOO2//XBM4IROTI1Itjo=
github.com/livekit/mediatransportutil v0.0.0-20231128042044-05525c8278cb/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc=
github.com/livekit/protocol v1.9.3-0.20231130173607-ec88d89da1d3 h1:am72beYtXZM71MRr+12lkG3IyqKxzrCa6slsbKrwMe8=
github.com/livekit/protocol v1.9.3-0.20231130173607-ec88d89da1d3/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes=
github.com/livekit/protocol v1.9.4-0.20231202181655-afa0350bcd0f h1:6XPC53t/XEcfIe8BUwKkeFmgTLKPfF77JmQ7nydqoOs=
github.com/livekit/protocol v1.9.4-0.20231202181655-afa0350bcd0f/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
Expand Down
9 changes: 8 additions & 1 deletion pkg/routing/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package routing

import "errors"
import (
"errors"
)

var (
ErrNotFound = errors.New("could not find object")
Expand All @@ -26,4 +28,9 @@ var (
ErrInvalidRouterMessage = errors.New("invalid router message")
ErrChannelClosed = errors.New("channel closed")
ErrChannelFull = errors.New("channel is full")

// errors when starting signal connection
ErrRequestChannelClosed = errors.New("request channel closed")
ErrCouldNotMigrateParticipant = errors.New("could not migrate participant")
ErrClientInfoNotSet = errors.New("client info not set")
)
9 changes: 8 additions & 1 deletion pkg/routing/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,16 @@ type Router interface {
OnRTCMessage(callback RTCMessageCallback)
}

type StartParticipantSignalResults struct {
ConnectionID livekit.ConnectionID
RequestSink MessageSink
ResponseSource MessageSource
NodeID livekit.NodeID
}

type MessageRouter interface {
// StartParticipantSignal participant signal connection is ready to start
StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

// Write a message to a participant or room
WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
Expand Down
13 changes: 10 additions & 3 deletions pkg/routing/localrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,25 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) {
}, nil
}

func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) {
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error) {
return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(r.currentNode.Id))
}

func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) {
connectionID, reqSink, resSource, err = r.signalClient.StartParticipantSignal(ctx, roomName, pi, nodeID)
func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error) {
connectionID, reqSink, resSource, err := r.signalClient.StartParticipantSignal(ctx, roomName, pi, nodeID)
if err != nil {
logger.Errorw("could not handle new participant", err,
"room", roomName,
"participant", pi.Identity,
"connID", connectionID,
)
} else {
return StartParticipantSignalResults{
ConnectionID: connectionID,
RequestSink: reqSink,
ResponseSource: resSource,
NodeID: nodeID,
}, nil
}
return
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/routing/redisrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,41 +156,41 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) {
}

// StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue
func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) {
func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error) {
// find the node where the room is hosted at
rtcNode, err := r.GetNodeForRoom(ctx, roomName)
if err != nil {
return
}

if r.usePSRPCSignal {
connectionID, reqSink, resSource, err = r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id))
res, err = r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id))
if err != nil {
return
}

// map signal & rtc nodes
err = r.setParticipantSignalNode(connectionID, r.currentNode.Id)
err = r.setParticipantSignalNode(res.ConnectionID, r.currentNode.Id)
return
}

connectionID = livekit.ConnectionID(utils.NewGuid("CO_"))
res.ConnectionID = livekit.ConnectionID(utils.NewGuid("CO_"))
pKey := ParticipantKeyLegacy(roomName, pi.Identity)
pKeyB62 := ParticipantKey(roomName, pi.Identity)

// map signal & rtc nodes
if err = r.setParticipantSignalNode(connectionID, r.currentNode.Id); err != nil {
if err = r.setParticipantSignalNode(res.ConnectionID, r.currentNode.Id); err != nil {
return
}

// index by connectionID, since there may be multiple connections for the participant
// set up response channel before sending StartSession and be ready to receive responses.
resChan := r.getOrCreateMessageChannel(r.responseChannels, string(connectionID))
resChan := r.getOrCreateMessageChannel(r.responseChannels, string(res.ConnectionID))

sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), connectionID, pKey, pKeyB62)
sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), res.ConnectionID, pKey, pKeyB62)

// serialize claims
ss, err := pi.ToStartSession(roomName, connectionID)
ss, err := pi.ToStartSession(roomName, res.ConnectionID)
if err != nil {
return
}
Expand All @@ -201,7 +201,9 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek
return
}

return connectionID, sink, resChan, nil
res.RequestSink = sink
res.ResponseSource = resChan
return res, nil
}

func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
Expand Down
48 changes: 19 additions & 29 deletions pkg/routing/routingfakes/fake_router.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,6 @@ func (p *ParticipantImpl) HandleSignalSourceClose() {

if !p.TransportManager.HasPublisherEverConnected() && !p.TransportManager.HasSubscriberEverConnected() {
reason := types.ParticipantCloseReasonJoinFailed
p.params.Logger.Infow("closing disconnected participant", "reason", reason)
_ = p.Close(false, reason, false)
}
}
Expand Down Expand Up @@ -858,7 +857,7 @@ func (p *ParticipantImpl) MaybeStartMigration(force bool, onStart func()) bool {
if p.IsClosed() || p.IsDisconnected() {
return
}
p.subLogger.Infow("closing subscriber peer connection to aid migration")
p.subLogger.Debugw("closing subscriber peer connection to aid migration")

//
// Close all down tracks before closing subscriber peer connection.
Expand Down Expand Up @@ -1444,7 +1443,6 @@ func (p *ParticipantImpl) setupDisconnectTimer() {
return
}
reason := types.ParticipantCloseReasonPeerConnectionDisconnected
p.params.Logger.Infow("closing disconnected participant", "reason", reason)
_ = p.Close(true, reason, false)
})
p.lock.Unlock()
Expand Down Expand Up @@ -1693,7 +1691,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
}

p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}}
p.pubLogger.Infow("pending track added", "trackID", ti.Sid, "track", logger.Proto(ti), "request", logger.Proto(req))
p.pubLogger.Debugw("pending track added", "trackID", ti.Sid, "track", logger.Proto(ti), "request", logger.Proto(req))
return ti
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/rtc/participant_signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) {
sink := p.getResponseSink()
if sink != nil {
p.params.Logger.Infow("closing signal connection", "reason", reason, "connID", sink.ConnectionID())
if reason != types.SignallingCloseReasonParticipantClose {
p.params.Logger.Infow("closing signal connection", "reason", reason, "connID", sink.ConnectionID())
}
sink.Close()
p.SetResponseSink(nil)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtc/subscriptionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ func (s *trackSubscription) handleSourceTrackRemoved() {
}

// source track removed, we would unsubscribe
s.logger.Infow("unsubscribing from track since source track was removed")
s.logger.Debugw("unsubscribing from track since source track was removed")
s.desired = false

s.setChangedNotifierLocked(nil)
Expand Down
Loading

0 comments on commit 3fe124c

Please sign in to comment.