diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 42d7d0f8c4..578d505f5d 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -265,8 +265,8 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { } t.params.Logger.Debugw("removing peerconnection track", "track", t.ID(), - "pIDs", []string{t.params.ParticipantID, sub.ID()}, - "participant", sub.Identity(), + "subscriber", sub.Identity(), + "subscriberID", sub.ID(), "kind", t.Kind(), ) if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { @@ -279,7 +279,9 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { // been set to Inactive t.params.Logger.Debugw("could not remove remoteTrack from forwarder", "error", err, - "participant", sub.Identity(), "pID", sub.ID()) + "subscriber", sub.Identity(), + "subscriberID", sub.ID(), + ) } } @@ -327,8 +329,6 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC())) if buff == nil || rtcpReader == nil { logger.Errorw("could not retrieve buffer pair", nil, - "participant", t.params.ParticipantIdentity, - "participantID", t.params.ParticipantID, "track", t.ID()) return } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index a7b62fc5b0..20bd9d6c32 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -36,6 +36,7 @@ const ( type ParticipantParams struct { Identity string + SID string Config *WebRTCConfig Sink routing.MessageSink AudioConfig config.AudioConfig @@ -50,7 +51,6 @@ type ParticipantParams struct { type ParticipantImpl struct { params ParticipantParams - id string publisher *PCTransport subscriber *PCTransport isClosed utils.AtomicFlag @@ -102,7 +102,6 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p := &ParticipantImpl{ params: params, - id: utils.NewGuid(utils.ParticipantPrefix), rtcpCh: make(chan []rtcp.Packet, 50), pliThrottle: newPLIThrottle(params.ThrottleConfig), subscribedTracks: make(map[string]types.SubscribedTrack), @@ -118,7 +117,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { return nil, err } p.publisher, err = NewPCTransport(TransportParams{ - ParticipantID: p.id, + ParticipantID: p.params.SID, ParticipantIdentity: p.params.Identity, Target: livekit.SignalTarget_PUBLISHER, Config: params.Config, @@ -130,7 +129,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { return nil, err } p.subscriber, err = NewPCTransport(TransportParams{ - ParticipantID: p.id, + ParticipantID: p.params.SID, ParticipantIdentity: p.params.Identity, Target: livekit.SignalTarget_SUBSCRIBER, Config: params.Config, @@ -188,7 +187,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { } func (p *ParticipantImpl) ID() string { - return p.id + return p.params.SID } func (p *ParticipantImpl) Identity() string { @@ -231,7 +230,7 @@ func (p *ParticipantImpl) RTCPChan() chan []rtcp.Packet { func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { info := &livekit.ParticipantInfo{ - Sid: p.id, + Sid: p.params.SID, Identity: p.params.Identity, Metadata: p.metadata, State: p.State(), @@ -288,8 +287,8 @@ func (p *ParticipantImpl) OnClose(callback func(types.Participant)) { // HandleOffer an offer from remote participant, used when clients make the initial connection func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error) { - p.params.Logger.Debugw("answering pub offer", "state", p.State().String(), - "participant", p.Identity(), "pID", p.ID(), + p.params.Logger.Debugw("answering pub offer", + "state", p.State().String(), //"sdp", sdp.SDP, ) @@ -313,10 +312,8 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web return } - p.params.Logger.Debugw("sending answer to client", - "participant", p.Identity(), "pID", p.ID(), - //"answer sdp", answer.SDP, - ) + p.params.Logger.Debugw("sending answer to client") + err = p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Answer{ Answer: ToProtoSessionDescription(answer), @@ -351,8 +348,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) { } if !p.CanPublish() { - p.params.Logger.Warnw("no permission to publish track", nil, - "participant", p.Identity(), "pID", p.ID()) + p.params.Logger.Warnw("no permission to publish track", nil) return } @@ -385,10 +381,7 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error { if sdp.Type != webrtc.SDPTypeAnswer { return ErrUnexpectedOffer } - p.params.Logger.Debugw("setting subPC answer", - "participant", p.Identity(), "pID", p.ID(), - //"sdp", sdp.SDP, - ) + p.params.Logger.Debugw("setting subPC answer") if err := p.subscriber.SetRemoteDescription(sdp); err != nil { return errors.Wrap(err, "could not set remote description") @@ -485,8 +478,8 @@ func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) { } p.params.Logger.Debugw("subscribing new participant to tracks", - "participants", []string{p.Identity(), op.Identity()}, - "pIDs", []string{p.ID(), op.ID()}, + "subscriber", op.Identity(), + "subscriberID", op.ID(), "numTracks", len(tracks)) n := 0 @@ -646,8 +639,6 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool, fromAdmin bo if currentMuted != track.IsMuted() && p.onTrackUpdated != nil { p.params.Logger.Debugw("mute status changed", - "participant", p.Identity(), - "pID", p.ID(), "track", trackId, "muted", track.IsMuted()) p.onTrackUpdated(p, track) @@ -799,8 +790,9 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack { // AddSubscribedTrack adds a track to the participant's subscribed list func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { - p.params.Logger.Debugw("added subscribedTrack", "publisher", subTrack.PublisherIdentity(), - "participant", p.Identity(), "track", subTrack.ID()) + p.params.Logger.Debugw("added subscribedTrack", + "publisher", subTrack.PublisherIdentity(), + "track", subTrack.ID()) p.lock.Lock() p.subscribedTracks[subTrack.ID()] = subTrack p.lock.Unlock() @@ -812,7 +804,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { // RemoveSubscribedTrack removes a track to the participant's subscribed list func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) { p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(), - "participant", p.Identity(), "track", subTrack.ID(), "kind", subTrack.DownTrack().Kind()) + "track", subTrack.ID(), "kind", subTrack.DownTrack().Kind()) p.subscriber.RemoveTrack(subTrack) @@ -836,8 +828,6 @@ func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target liveki // write candidate p.params.Logger.Debugw("sending ice candidates", - "participant", p.Identity(), - "pID", p.ID(), "candidate", c.String()) trickle := ToProtoTrickle(ci) trickle.Target = target @@ -854,7 +844,7 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { return } p.state.Store(state) - p.params.Logger.Debugw("updating participant state", "state", state.String(), "participant", p.Identity(), "pID", p.ID()) + p.params.Logger.Debugw("updating participant state", "state", state.String()) p.lock.RLock() onStateChange := p.onStateChange p.lock.RUnlock() @@ -877,8 +867,6 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { err := sink.WriteMessage(msg) if err != nil { p.params.Logger.Warnw("could not send message to participant", err, - "pID", p.ID(), - "participant", p.Identity(), "message", fmt.Sprintf("%T", msg.Message)) return err } @@ -888,15 +876,11 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { // when the server has an offer for participant func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) { if p.State() == livekit.ParticipantInfo_DISCONNECTED { - p.params.Logger.Debugw("skipping server offer", "participant", p.Identity(), "pID", p.ID()) // skip when disconnected return } - p.params.Logger.Debugw("sending server offer to participant", - "participant", p.Identity(), "pID", p.ID(), - //"sdp", offer.SDP, - ) + p.params.Logger.Debugw("sending server offer to participant") err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ @@ -918,15 +902,12 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w p.params.Logger.Debugw("mediaTrack added", "kind", track.Kind().String(), - "participant", p.Identity(), - "pID", p.ID(), "track", track.ID(), "rid", track.RID(), "SSRC", track.SSRC()) if !p.CanPublish() { - p.params.Logger.Warnw("no permission to publish mediaTrack", nil, - "participant", p.Identity(), "pID", p.ID()) + p.params.Logger.Warnw("no permission to publish mediaTrack", nil) return } @@ -946,7 +927,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w TrackInfo: ti, SignalCid: signalCid, SdpCid: track.ID(), - ParticipantID: p.id, + ParticipantID: p.params.SID, ParticipantIdentity: p.Identity(), RTCPChan: p.rtcpCh, BufferFactory: p.params.Config.BufferFactory, @@ -996,7 +977,7 @@ func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) { p.handleDataMessage(livekit.DataPacket_LOSSY, msg.Data) }) default: - p.params.Logger.Warnw("unsupported datachannel added", nil, "participant", p.Identity(), "pID", p.ID(), "label", dc.Label()) + p.params.Logger.Warnw("unsupported datachannel added", nil, "label", dc.Label()) } } @@ -1060,7 +1041,7 @@ func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data [ switch payload := dp.Value.(type) { case *livekit.DataPacket_User: if p.onDataPacket != nil { - payload.User.ParticipantSid = p.id + payload.User.ParticipantSid = p.params.SID p.onDataPacket(p, &dp) } default: @@ -1092,8 +1073,6 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) { } func (p *ParticipantImpl) handlePrimaryICEStateChange(state webrtc.ICEConnectionState) { - // p.params.Logger.Debugw("ICE connection state changed", "state", state.String(), - // "participant", p.identity, "pID", p.ID()) if state == webrtc.ICEConnectionStateConnected { prometheus.ServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1) p.updateState(livekit.ParticipantInfo_ACTIVE) @@ -1160,8 +1139,7 @@ func (p *ParticipantImpl) downTracksRTCPWorker() { if err == io.EOF || err == io.ErrClosedPipe { return } - logger.Errorw("could not send downtrack reports", err, - "participant", p.Identity(), "pID", p.ID()) + logger.Errorw("could not send downtrack reports", err) } } @@ -1200,8 +1178,7 @@ func (p *ParticipantImpl) rtcpSendWorker() { if len(fwdPkts) > 0 { if err := p.publisher.pc.WriteRTCP(fwdPkts); err != nil { - p.params.Logger.Errorw("could not write RTCP to participant", err, - "participant", p.Identity(), "pID", p.ID()) + p.params.Logger.Errorw("could not write RTCP to participant", err) } } } @@ -1336,7 +1313,7 @@ func (p *ParticipantImpl) onStreamStateChange(update *sfu.StreamStateUpdate) err func (p *ParticipantImpl) DebugInfo() map[string]interface{} { info := map[string]interface{}{ - "ID": p.id, + "ID": p.params.SID, "State": p.State().String(), } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 5fc4ca5d6e..da77cca159 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -60,7 +60,7 @@ type ParticipantOptions struct { func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room { r := &Room{ Room: proto.Clone(room).(*livekit.Room), - Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)), + Logger: LoggerWithRoom(logger.Logger(logger.GetLogger()), room.Name), config: config, audioConfig: audioConfig, telemetry: telemetry, @@ -172,7 +172,10 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice // it's important to set this before connection, we don't want to miss out on any publishedTracks participant.OnTrackPublished(r.onTrackPublished) participant.OnStateChange(func(p types.Participant, oldState livekit.ParticipantInfo_State) { - r.Logger.Debugw("participant state changed", "state", p.State(), "participant", p.Identity(), "pID", p.ID(), + r.Logger.Debugw("participant state changed", + "state", p.State(), + "participant", p.Identity(), + "pID", p.ID(), "oldState", oldState) if r.onParticipantChanged != nil { r.onParticipantChanged(participant) diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index e41902b3f8..a3bcc937f3 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -6,53 +6,32 @@ import ( "github.com/livekit/protocol/logger" ) -func HandleParticipantSignal(room types.Room, participant types.Participant, req *livekit.SignalRequest) error { +func HandleParticipantSignal(room types.Room, participant types.Participant, req *livekit.SignalRequest, pLogger logger.Logger) error { switch msg := req.Message.(type) { case *livekit.SignalRequest_Offer: _, err := participant.HandleOffer(FromProtoSessionDescription(msg.Offer)) if err != nil { - logger.Errorw("could not handle offer", err, - "room", room.Name(), - "participant", participant.Identity(), - "pID", participant.ID(), - ) + pLogger.Errorw("could not handle offer", err) return err } case *livekit.SignalRequest_AddTrack: - logger.Debugw("add track request", - "room", room.Name(), - "participant", participant.Identity(), - "pID", participant.ID(), - "track", msg.AddTrack.Cid) + pLogger.Debugw("add track request", "track", msg.AddTrack.Cid) participant.AddTrack(msg.AddTrack) case *livekit.SignalRequest_Answer: sd := FromProtoSessionDescription(msg.Answer) if err := participant.HandleAnswer(sd); err != nil { - logger.Errorw("could not handle answer", err, - "room", room.Name(), - "participant", participant.Identity(), - "pID", participant.ID(), - ) + pLogger.Errorw("could not handle answer", err) // connection cannot be successful if we can't answer return err } case *livekit.SignalRequest_Trickle: candidateInit, err := FromProtoTrickle(msg.Trickle) if err != nil { - logger.Warnw("could not decode trickle", err, - "room", room.Name(), - "participant", participant.Identity(), - "pID", participant.ID(), - ) + pLogger.Warnw("could not decode trickle", err) return nil } - // logger.Debugw("adding peer candidate", "participant", participant.Identity()) if err := participant.AddICECandidate(candidateInit, msg.Trickle.Target); err != nil { - logger.Warnw("could not handle trickle", err, - "room", room.Name(), - "participant", participant.Identity(), - "pID", participant.ID(), - ) + pLogger.Warnw("could not handle trickle", err) } case *livekit.SignalRequest_Mute: participant.SetTrackMuted(msg.Mute.Sid, msg.Mute.Muted, false) @@ -67,10 +46,7 @@ func HandleParticipantSignal(room types.Room, participant types.Participant, req err = ErrCannotSubscribe } if err != nil { - logger.Warnw("could not update subscription", err, - "room", room.Name(), - "participant", participant.Identity(), - "pID", participant.ID(), + pLogger.Warnw("could not update subscription", err, "tracks", msg.Subscription.TrackSids, "subscribe", msg.Subscription.Subscribe) } @@ -78,26 +54,20 @@ func HandleParticipantSignal(room types.Room, participant types.Participant, req for _, sid := range msg.TrackSetting.TrackSids { subTrack := participant.GetSubscribedTrack(sid) if subTrack == nil { - logger.Warnw("unable to find SubscribedTrack", nil, - "room", room.Name(), - "participant", participant.Identity(), - "pID", participant.ID(), + pLogger.Warnw("unable to find SubscribedTrack", nil, "track", sid) continue } // find quality for published track - logger.Debugw("updating track settings", - "room", room.Name(), - "participant", participant.Identity(), - "pID", participant.ID(), + pLogger.Debugw("updating track settings", "settings", msg.TrackSetting) subTrack.UpdateSubscriberSettings(msg.TrackSetting) } case *livekit.SignalRequest_UpdateLayers: track := participant.GetPublishedTrack(msg.UpdateLayers.TrackSid) if track == nil { - logger.Warnw("could not find published track", nil, + pLogger.Warnw("could not find published track", nil, "track", msg.UpdateLayers.TrackSid) return nil } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index f5b8df7f15..439cd75032 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -5,8 +5,8 @@ import ( "time" "github.com/bep/debounce" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - livekit "github.com/livekit/protocol/livekit" "github.com/pion/interceptor" "github.com/pion/webrtc/v3" @@ -100,8 +100,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { } if params.Target == livekit.SignalTarget_SUBSCRIBER { t.streamAllocator = sfu.NewStreamAllocator(sfu.StreamAllocatorParams{ - ParticipantID: params.ParticipantID, - Logger: params.Logger, + Logger: params.Logger, }) t.streamAllocator.Start() } diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index 9b9d1c6f1c..1d94eb056d 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -6,8 +6,9 @@ import ( "io" "strings" + "github.com/go-logr/logr" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - livekit "github.com/livekit/protocol/livekit" "github.com/pion/webrtc/v3" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -130,3 +131,24 @@ func Recover() { logger.GetLogger().Error(err, "recovered panic", "panic", r) } } + +// logger helpers +func LoggerWithParticipant(l logger.Logger, identity, sid string) logger.Logger { + lr := logr.Logger(l) + if identity != "" { + lr = lr.WithValues("participant", identity) + } + if sid != "" { + lr = lr.WithValues("pID", sid) + } + return logger.Logger(lr) +} + +func LoggerWithRoom(l logger.Logger, name string) logger.Logger { + lr := logr.Logger(l) + return logger.Logger( + lr.WithValues( + "room", name, + ), + ) +} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 49ce5c1685..7a4b5ad93b 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -8,6 +8,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -225,8 +226,11 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi rout pv := types.ProtocolVersion(pi.Client.Protocol) rtcConf := *r.rtcConfig rtcConf.SetBufferFactory(room.GetBufferFactor()) + sid := utils.NewGuid(utils.ParticipantPrefix) + pLogger := rtc.LoggerWithParticipant(room.Logger, pi.Identity, sid) participant, err = rtc.NewParticipant(rtc.ParticipantParams{ Identity: pi.Identity, + SID: sid, Config: &rtcConf, Sink: responseSink, AudioConfig: r.config.Audio, @@ -235,7 +239,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi rout ThrottleConfig: r.config.RTC.PLIThrottle, EnabledCodecs: room.Room.EnabledCodecs, Hidden: pi.Hidden, - Logger: room.Logger, + Logger: pLogger, }) if err != nil { logger.Errorw("could not create participant", err) @@ -254,11 +258,11 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi rout AutoSubscribe: pi.AutoSubscribe, } if err = room.Join(participant, &opts, r.iceServersForRoom(room.Room)); err != nil { - logger.Errorw("could not join room", err) + pLogger.Errorw("could not join room", err) return } if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil { - logger.Errorw("could not store participant", err) + pLogger.Errorw("could not store participant", err) } // update roomstore with new numParticipants if !participant.Hidden() { @@ -271,7 +275,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi rout r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto()) participant.OnClose(func(p types.Participant) { if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil { - logger.Errorw("could not delete participant", err) + pLogger.Errorw("could not delete participant", err) } // update roomstore with new numParticipants if !participant.Hidden() { @@ -346,6 +350,11 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici }() defer rtc.Recover() + pLogger := rtc.LoggerWithParticipant( + rtc.LoggerWithRoom(logger.Logger(logger.GetLogger()), room.Name()), + participant.Identity(), participant.ID(), + ) + for { select { case <-time.After(time.Millisecond * 50): @@ -361,7 +370,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici } req := obj.(*livekit.SignalRequest) - if err := rtc.HandleParticipantSignal(room, participant, req); err != nil { + if err := rtc.HandleParticipantSignal(room, participant, req, pLogger); err != nil { // more specific errors are already logged // treat errors returned as fatal return @@ -382,22 +391,27 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity s } participant := room.GetParticipant(identity) + pLogger := rtc.LoggerWithParticipant( + rtc.LoggerWithRoom(logger.Logger(logger.GetLogger()), roomName), + identity, + "", + ) switch rm := msg.Message.(type) { case *livekit.RTCNodeMessage_RemoveParticipant: if participant == nil { return } - logger.Infow("removing participant", "room", roomName, "participant", identity) + pLogger.Infow("removing participant") room.RemoveParticipant(identity) case *livekit.RTCNodeMessage_MuteTrack: if participant == nil { return } - logger.Debugw("setting track muted", "room", roomName, "participant", identity, + pLogger.Debugw("setting track muted", "track", rm.MuteTrack.TrackSid, "muted", rm.MuteTrack.Muted) if !rm.MuteTrack.Muted && !r.config.Room.EnableRemoteUnmute { - logger.Errorw("cannot unmute track, remote unmute is disabled", nil) + pLogger.Errorw("cannot unmute track, remote unmute is disabled", nil) return } participant.SetTrackMuted(rm.MuteTrack.TrackSid, rm.MuteTrack.Muted, true) @@ -405,7 +419,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity s if participant == nil { return } - logger.Debugw("updating participant", "room", roomName, "participant", identity) + pLogger.Debugw("updating participant") if rm.UpdateParticipant.Metadata != "" { participant.SetMetadata(rm.UpdateParticipant.Metadata) } @@ -421,23 +435,21 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity s if participant == nil { return } - logger.Debugw("updating participant subscriptions", "room", roomName, "participant", identity) + pLogger.Debugw("updating participant subscriptions") if err := room.UpdateSubscriptions(participant, rm.UpdateSubscriptions.TrackSids, rm.UpdateSubscriptions.Subscribe); err != nil { - logger.Warnw("could not update subscription", err, - "participant", participant.Identity(), - "pID", participant.ID(), + pLogger.Warnw("could not update subscription", err, "tracks", rm.UpdateSubscriptions.TrackSids, "subscribe", rm.UpdateSubscriptions.Subscribe) } case *livekit.RTCNodeMessage_SendData: - logger.Debugw("SendData", "message", rm) + pLogger.Debugw("SendData", "size", len(rm.SendData.Data)) up := &livekit.UserPacket{ Payload: rm.SendData.Data, DestinationSids: rm.SendData.DestinationSids, } room.SendDataPacket(up, rm.SendData.Kind) case *livekit.RTCNodeMessage_UpdateRoomMetadata: - logger.Debugw("updating room", "room", roomName) + pLogger.Debugw("updating room") room.SetMetadata(rm.UpdateRoomMetadata.Metadata) } } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 6218e4ef4f..7a436a83bb 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -9,8 +9,8 @@ import ( "strings" "github.com/gorilla/websocket" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - livekit "github.com/livekit/protocol/livekit" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -144,10 +144,14 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + pLogger := rtc.LoggerWithParticipant( + rtc.LoggerWithRoom(logger.Logger(logger.GetLogger()), roomName), + pi.Identity, "", + ) done := make(chan struct{}) // function exits when websocket terminates, it'll close the event reading off of response sink as well defer func() { - logger.Infow("server closing WS connection", "participant", pi.Identity, "connID", connId) + pLogger.Infow("server closing WS connection", "connID", connId) reqSink.Close() close(done) }() @@ -156,7 +160,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1) - logger.Warnw("could not upgrade to WS", err) + pLogger.Warnw("could not upgrade to WS", err) handleError(w, http.StatusInternalServerError, err.Error()) return } @@ -166,11 +170,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1) - logger.Infow("new client WS connected", + pLogger.Infow("new client WS connected", "connID", connId, "roomID", rm.Sid, - "room", rm.Name, - "participant", pi.Identity, ) // handle responses @@ -187,22 +189,20 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return case msg := <-resSource.ReadChan(): if msg == nil { - logger.Infow("source closed connection", - "participant", pi.Identity, + pLogger.Infow("source closed connection", "connID", connId) return } res, ok := msg.(*livekit.SignalResponse) if !ok { - logger.Errorw("unexpected message type", nil, + pLogger.Errorw("unexpected message type", nil, "type", fmt.Sprintf("%T", msg), - "participant", pi.Identity, "connID", connId) continue } if err = sigConn.WriteResponse(res); err != nil { - logger.Warnw("error writing to websocket", err) + pLogger.Warnw("error writing to websocket", err) return } } @@ -218,13 +218,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { return } else { - logger.Errorw("error reading from websocket", err) + pLogger.Errorw("error reading from websocket", err) return } } if err := reqSink.WriteMessage(req); err != nil { - logger.Warnw("error writing to request sink", err, - "participant", pi.Identity, + pLogger.Warnw("error writing to request sink", err, "connID", connId) } } diff --git a/pkg/sfu/prober.go b/pkg/sfu/prober.go index b1ef9c8757..0b9d40a082 100644 --- a/pkg/sfu/prober.go +++ b/pkg/sfu/prober.go @@ -117,13 +117,11 @@ import ( ) type ProberParams struct { - ParticipantID string - Logger logger.Logger + Logger logger.Logger } type Prober struct { - participantID string - logger logger.Logger + logger logger.Logger clustersMu sync.RWMutex clusters deque.Deque @@ -134,8 +132,7 @@ type Prober struct { func NewProber(params ProberParams) *Prober { p := &Prober{ - participantID: params.ParticipantID, - logger: params.Logger, + logger: params.Logger, } p.clusters.SetMinCapacity(2) return p @@ -153,7 +150,7 @@ func (p *Prober) Reset() { defer p.clustersMu.Unlock() if p.activeCluster != nil { - p.logger.Debugw("resetting active cluster", "participant", p.participantID, "cluster", p.activeCluster.String()) + p.logger.Debugw("resetting active cluster", "cluster", p.activeCluster.String()) } p.clusters.Clear() @@ -170,7 +167,7 @@ func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration } cluster := NewCluster(desiredRateBps, expectedRateBps, minDuration, maxDuration) - p.logger.Debugw("cluster added", "participant", p.participantID, "cluster", cluster.String()) + p.logger.Debugw("cluster added", "cluster", cluster.String()) p.pushBackClusterAndMaybeStart(cluster) } @@ -258,7 +255,7 @@ func (p *Prober) run() { cluster.Process(p) if cluster.IsFinished() { - p.logger.Debugw("cluster finished", "participant", p.participantID, "cluster", cluster.String()) + p.logger.Debugw("cluster finished", "cluster", cluster.String()) p.popFrontCluster(cluster) continue } diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 9949ccfd10..22ff623c82 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -192,13 +192,11 @@ func (s Signal) String() string { } type StreamAllocatorParams struct { - ParticipantID string - Logger logger.Logger + Logger logger.Logger } type StreamAllocator struct { - participantID string - logger logger.Logger + logger logger.Logger onStreamStateChange func(update *StreamStateUpdate) error @@ -234,13 +232,11 @@ type Event struct { func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator { s := &StreamAllocator{ - participantID: params.ParticipantID, - logger: params.Logger, - audioTracks: make(map[string]*Track), - videoTracks: make(map[string]*Track), + logger: params.Logger, + audioTracks: make(map[string]*Track), + videoTracks: make(map[string]*Track), prober: NewProber(ProberParams{ - ParticipantID: params.ParticipantID, - Logger: params.Logger, + Logger: params.Logger, }), eventCh: make(chan Event, 20), runningCh: make(chan struct{}), @@ -517,7 +513,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) { } if !found { if len(remb.SSRCs) == 0 { - s.logger.Warnw("no SSRC to track REMB", nil, "participant", s.participantID) + s.logger.Warnw("no SSRC to track REMB", nil) return } @@ -542,7 +538,10 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) { s.prevReceivedEstimate = s.receivedEstimate s.receivedEstimate = int64(remb.Bitrate) if s.prevReceivedEstimate != s.receivedEstimate { - s.logger.Debugw("received new estimate", "participant", s.participantID, "old(bps)", s.prevReceivedEstimate, "new(bps)", s.receivedEstimate) + s.logger.Debugw("received new estimate", + "old(bps)", s.prevReceivedEstimate, + "new(bps)", s.receivedEstimate, + ) } if s.maybeCommitEstimate() { @@ -647,7 +646,7 @@ func (s *StreamAllocator) handleSignalSendProbe(event *Event) { func (s *StreamAllocator) setState(state State) { if s.state != state { - s.logger.Infow("state change", "participant", s.participantID, "from", s.state, "to", state) + s.logger.Infow("state change", "from", s.state, "to", state) } s.state = state @@ -700,7 +699,7 @@ func (s *StreamAllocator) maybeCommitEstimate() (isDecreasing bool) { s.committedChannelCapacity = s.receivedEstimate s.lastCommitTime = time.Now() - s.logger.Debugw("committing channel capacity", "participant", s.participantID, "capacity(bps)", s.committedChannelCapacity) + s.logger.Debugw("committing channel capacity", "capacity(bps)", s.committedChannelCapacity) return } @@ -874,11 +873,11 @@ func (s *StreamAllocator) maybeSendUpdate(update *StreamStateUpdate) { return } - s.logger.Debugw("streamed tracks changed", "participant", s.participantID, "update", update) + s.logger.Debugw("streamed tracks changed", "update", update) if s.onStreamStateChange != nil { err := s.onStreamStateChange(update) if err != nil { - s.logger.Errorw("could not send streamed tracks update", err, "participant", s.participantID) + s.logger.Errorw("could not send streamed tracks update", err) } } }