Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use participant and room specific loggers #252

Merged
merged 2 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/rtc/mediatrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
}
t.params.Logger.Debugw("removing peerconnection track",
"track", t.ID(),
"pIDs", []string{t.params.ParticipantID, sub.ID()},
"participant", sub.Identity(),
"subscriber", sub.Identity(),
"subscriberID", sub.ID(),
"kind", t.Kind(),
)
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
Expand All @@ -279,7 +279,9 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
// been set to Inactive
t.params.Logger.Debugw("could not remove remoteTrack from forwarder",
"error", err,
"participant", sub.Identity(), "pID", sub.ID())
"subscriber", sub.Identity(),
"subscriberID", sub.ID(),
)
}
}

Expand Down Expand Up @@ -327,8 +329,6 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC()))
if buff == nil || rtcpReader == nil {
logger.Errorw("could not retrieve buffer pair", nil,
"participant", t.params.ParticipantIdentity,
"participantID", t.params.ParticipantID,
"track", t.ID())
return
}
Expand Down
75 changes: 26 additions & 49 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (

type ParticipantParams struct {
Identity string
SID string
Config *WebRTCConfig
Sink routing.MessageSink
AudioConfig config.AudioConfig
Expand All @@ -50,7 +51,6 @@ type ParticipantParams struct {

type ParticipantImpl struct {
params ParticipantParams
id string
publisher *PCTransport
subscriber *PCTransport
isClosed utils.AtomicFlag
Expand Down Expand Up @@ -102,7 +102,6 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {

p := &ParticipantImpl{
params: params,
id: utils.NewGuid(utils.ParticipantPrefix),
rtcpCh: make(chan []rtcp.Packet, 50),
pliThrottle: newPLIThrottle(params.ThrottleConfig),
subscribedTracks: make(map[string]types.SubscribedTrack),
Expand All @@ -118,7 +117,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return nil, err
}
p.publisher, err = NewPCTransport(TransportParams{
ParticipantID: p.id,
ParticipantID: p.params.SID,
ParticipantIdentity: p.params.Identity,
Target: livekit.SignalTarget_PUBLISHER,
Config: params.Config,
Expand All @@ -130,7 +129,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
return nil, err
}
p.subscriber, err = NewPCTransport(TransportParams{
ParticipantID: p.id,
ParticipantID: p.params.SID,
ParticipantIdentity: p.params.Identity,
Target: livekit.SignalTarget_SUBSCRIBER,
Config: params.Config,
Expand Down Expand Up @@ -188,7 +187,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
}

func (p *ParticipantImpl) ID() string {
return p.id
return p.params.SID
}

func (p *ParticipantImpl) Identity() string {
Expand Down Expand Up @@ -231,7 +230,7 @@ func (p *ParticipantImpl) RTCPChan() chan []rtcp.Packet {

func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
info := &livekit.ParticipantInfo{
Sid: p.id,
Sid: p.params.SID,
Identity: p.params.Identity,
Metadata: p.metadata,
State: p.State(),
Expand Down Expand Up @@ -288,8 +287,8 @@ func (p *ParticipantImpl) OnClose(callback func(types.Participant)) {

// HandleOffer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error) {
p.params.Logger.Debugw("answering pub offer", "state", p.State().String(),
"participant", p.Identity(), "pID", p.ID(),
p.params.Logger.Debugw("answering pub offer",
"state", p.State().String(),
//"sdp", sdp.SDP,
)

Expand All @@ -313,10 +312,8 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
return
}

p.params.Logger.Debugw("sending answer to client",
"participant", p.Identity(), "pID", p.ID(),
//"answer sdp", answer.SDP,
)
p.params.Logger.Debugw("sending answer to client")

err = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer),
Expand Down Expand Up @@ -351,8 +348,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
}

if !p.CanPublish() {
p.params.Logger.Warnw("no permission to publish track", nil,
"participant", p.Identity(), "pID", p.ID())
p.params.Logger.Warnw("no permission to publish track", nil)
return
}

Expand Down Expand Up @@ -385,10 +381,7 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
if sdp.Type != webrtc.SDPTypeAnswer {
return ErrUnexpectedOffer
}
p.params.Logger.Debugw("setting subPC answer",
"participant", p.Identity(), "pID", p.ID(),
//"sdp", sdp.SDP,
)
p.params.Logger.Debugw("setting subPC answer")

if err := p.subscriber.SetRemoteDescription(sdp); err != nil {
return errors.Wrap(err, "could not set remote description")
Expand Down Expand Up @@ -485,8 +478,8 @@ func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) {
}

p.params.Logger.Debugw("subscribing new participant to tracks",
"participants", []string{p.Identity(), op.Identity()},
"pIDs", []string{p.ID(), op.ID()},
"subscriber", op.Identity(),
"subscriberID", op.ID(),
"numTracks", len(tracks))

n := 0
Expand Down Expand Up @@ -646,8 +639,6 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool, fromAdmin bo

if currentMuted != track.IsMuted() && p.onTrackUpdated != nil {
p.params.Logger.Debugw("mute status changed",
"participant", p.Identity(),
"pID", p.ID(),
"track", trackId,
"muted", track.IsMuted())
p.onTrackUpdated(p, track)
Expand Down Expand Up @@ -799,8 +790,9 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack {

// AddSubscribedTrack adds a track to the participant's subscribed list
func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
p.params.Logger.Debugw("added subscribedTrack", "publisher", subTrack.PublisherIdentity(),
"participant", p.Identity(), "track", subTrack.ID())
p.params.Logger.Debugw("added subscribedTrack",
"publisher", subTrack.PublisherIdentity(),
"track", subTrack.ID())
p.lock.Lock()
p.subscribedTracks[subTrack.ID()] = subTrack
p.lock.Unlock()
Expand All @@ -812,7 +804,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
// RemoveSubscribedTrack removes a track to the participant's subscribed list
func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) {
p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(),
"participant", p.Identity(), "track", subTrack.ID(), "kind", subTrack.DownTrack().Kind())
"track", subTrack.ID(), "kind", subTrack.DownTrack().Kind())

p.subscriber.RemoveTrack(subTrack)

Expand All @@ -836,8 +828,6 @@ func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target liveki

// write candidate
p.params.Logger.Debugw("sending ice candidates",
"participant", p.Identity(),
"pID", p.ID(),
"candidate", c.String())
trickle := ToProtoTrickle(ci)
trickle.Target = target
Expand All @@ -854,7 +844,7 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
return
}
p.state.Store(state)
p.params.Logger.Debugw("updating participant state", "state", state.String(), "participant", p.Identity(), "pID", p.ID())
p.params.Logger.Debugw("updating participant state", "state", state.String())
p.lock.RLock()
onStateChange := p.onStateChange
p.lock.RUnlock()
Expand All @@ -877,8 +867,6 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
err := sink.WriteMessage(msg)
if err != nil {
p.params.Logger.Warnw("could not send message to participant", err,
"pID", p.ID(),
"participant", p.Identity(),
"message", fmt.Sprintf("%T", msg.Message))
return err
}
Expand All @@ -888,15 +876,11 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
// when the server has an offer for participant
func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) {
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
p.params.Logger.Debugw("skipping server offer", "participant", p.Identity(), "pID", p.ID())
// skip when disconnected
return
}

p.params.Logger.Debugw("sending server offer to participant",
"participant", p.Identity(), "pID", p.ID(),
//"sdp", offer.SDP,
)
p.params.Logger.Debugw("sending server offer to participant")

err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Expand All @@ -918,15 +902,12 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w

p.params.Logger.Debugw("mediaTrack added",
"kind", track.Kind().String(),
"participant", p.Identity(),
"pID", p.ID(),
"track", track.ID(),
"rid", track.RID(),
"SSRC", track.SSRC())

if !p.CanPublish() {
p.params.Logger.Warnw("no permission to publish mediaTrack", nil,
"participant", p.Identity(), "pID", p.ID())
p.params.Logger.Warnw("no permission to publish mediaTrack", nil)
return
}

Expand All @@ -946,7 +927,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
TrackInfo: ti,
SignalCid: signalCid,
SdpCid: track.ID(),
ParticipantID: p.id,
ParticipantID: p.params.SID,
ParticipantIdentity: p.Identity(),
RTCPChan: p.rtcpCh,
BufferFactory: p.params.Config.BufferFactory,
Expand Down Expand Up @@ -996,7 +977,7 @@ func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) {
p.handleDataMessage(livekit.DataPacket_LOSSY, msg.Data)
})
default:
p.params.Logger.Warnw("unsupported datachannel added", nil, "participant", p.Identity(), "pID", p.ID(), "label", dc.Label())
p.params.Logger.Warnw("unsupported datachannel added", nil, "label", dc.Label())
}
}

Expand Down Expand Up @@ -1060,7 +1041,7 @@ func (p *ParticipantImpl) handleDataMessage(kind livekit.DataPacket_Kind, data [
switch payload := dp.Value.(type) {
case *livekit.DataPacket_User:
if p.onDataPacket != nil {
payload.User.ParticipantSid = p.id
payload.User.ParticipantSid = p.params.SID
p.onDataPacket(p, &dp)
}
default:
Expand Down Expand Up @@ -1092,8 +1073,6 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) {
}

func (p *ParticipantImpl) handlePrimaryICEStateChange(state webrtc.ICEConnectionState) {
// p.params.Logger.Debugw("ICE connection state changed", "state", state.String(),
// "participant", p.identity, "pID", p.ID())
if state == webrtc.ICEConnectionStateConnected {
prometheus.ServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1)
p.updateState(livekit.ParticipantInfo_ACTIVE)
Expand Down Expand Up @@ -1160,8 +1139,7 @@ func (p *ParticipantImpl) downTracksRTCPWorker() {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
logger.Errorw("could not send downtrack reports", err,
"participant", p.Identity(), "pID", p.ID())
logger.Errorw("could not send downtrack reports", err)
}
}

Expand Down Expand Up @@ -1200,8 +1178,7 @@ func (p *ParticipantImpl) rtcpSendWorker() {

if len(fwdPkts) > 0 {
if err := p.publisher.pc.WriteRTCP(fwdPkts); err != nil {
p.params.Logger.Errorw("could not write RTCP to participant", err,
"participant", p.Identity(), "pID", p.ID())
p.params.Logger.Errorw("could not write RTCP to participant", err)
}
}
}
Expand Down Expand Up @@ -1336,7 +1313,7 @@ func (p *ParticipantImpl) onStreamStateChange(update *sfu.StreamStateUpdate) err

func (p *ParticipantImpl) DebugInfo() map[string]interface{} {
info := map[string]interface{}{
"ID": p.id,
"ID": p.params.SID,
"State": p.State().String(),
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/rtc/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type ParticipantOptions struct {
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room {
r := &Room{
Room: proto.Clone(room).(*livekit.Room),
Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)),
Logger: LoggerWithRoom(logger.Logger(logger.GetLogger()), room.Name),
config: config,
audioConfig: audioConfig,
telemetry: telemetry,
Expand Down Expand Up @@ -172,7 +172,10 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
// it's important to set this before connection, we don't want to miss out on any publishedTracks
participant.OnTrackPublished(r.onTrackPublished)
participant.OnStateChange(func(p types.Participant, oldState livekit.ParticipantInfo_State) {
r.Logger.Debugw("participant state changed", "state", p.State(), "participant", p.Identity(), "pID", p.ID(),
r.Logger.Debugw("participant state changed",
"state", p.State(),
"participant", p.Identity(),
"pID", p.ID(),
"oldState", oldState)
if r.onParticipantChanged != nil {
r.onParticipantChanged(participant)
Expand Down