diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 4ec917c6ff..05a2f27b2e 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -140,7 +140,6 @@ type ParticipantParams struct { SubscriptionLimitVideo int32 PlayoutDelay *livekit.PlayoutDelay SyncStreams bool - EnableTrafficLoadTracking bool ForwardStats *sfu.ForwardStats } @@ -187,7 +186,6 @@ type ParticipantImpl struct { *TransportManager *UpTrackManager *SubscriptionManager - *ParticipantTrafficLoad // keeps track of unpublished tracks in order to reuse trackID unpublishedTracks []*livekit.TrackInfo @@ -297,7 +295,6 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { p.setupUpTrackManager() p.setupSubscriptionManager() - p.setupParticipantTrafficLoad() return p, nil } @@ -865,9 +862,6 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea go func() { p.SubscriptionManager.Close(isExpectedToResume) p.TransportManager.Close() - if p.ParticipantTrafficLoad != nil { - p.ParticipantTrafficLoad.Close() - } }() p.dataChannelStats.Stop() @@ -1373,16 +1367,6 @@ func (p *ParticipantImpl) setupSubscriptionManager() { }) } -func (p *ParticipantImpl) setupParticipantTrafficLoad() { - if p.params.EnableTrafficLoadTracking { - p.ParticipantTrafficLoad = NewParticipantTrafficLoad(ParticipantTrafficLoadParams{ - Participant: p, - DataChannelStats: p.dataChannelStats, - Logger: p.params.Logger, - }) - } -} - func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { oldState := p.state.Swap(state).(livekit.ParticipantInfo_State) if oldState == state { diff --git a/pkg/rtc/participant_traffic_load.go b/pkg/rtc/participant_traffic_load.go deleted file mode 100644 index 28f6867d1c..0000000000 --- a/pkg/rtc/participant_traffic_load.go +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rtc - -import ( - "sync" - "time" - - "github.com/frostbyte73/core" - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - - "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/telemetry" -) - -const ( - reportInterval = 10 * time.Second -) - -type ParticipantTrafficLoadParams struct { - Participant *ParticipantImpl - DataChannelStats *telemetry.BytesTrackStats - Logger logger.Logger -} - -type ParticipantTrafficLoad struct { - params ParticipantTrafficLoadParams - - lock sync.RWMutex - onTrafficLoad func(trafficLoad *types.TrafficLoad) - tracksStatsMedia map[livekit.TrackID]*livekit.RTPStats - dataChannelTraffic *telemetry.TrafficTotals - trafficLoad *types.TrafficLoad - - closed core.Fuse -} - -func NewParticipantTrafficLoad(params ParticipantTrafficLoadParams) *ParticipantTrafficLoad { - p := &ParticipantTrafficLoad{ - params: params, - tracksStatsMedia: make(map[livekit.TrackID]*livekit.RTPStats), - } - go p.reporter() - return p -} - -func (p *ParticipantTrafficLoad) Close() { - p.closed.Break() -} - -func (p *ParticipantTrafficLoad) OnTrafficLoad(f func(trafficLoad *types.TrafficLoad)) { - if p == nil { - return - } - - p.lock.Lock() - p.onTrafficLoad = f - p.lock.Unlock() -} - -func (p *ParticipantTrafficLoad) getOnTrafficLoad() func(trafficLoad *types.TrafficLoad) { - p.lock.RLock() - defer p.lock.RUnlock() - - return p.onTrafficLoad -} - -func (p *ParticipantTrafficLoad) GetTrafficLoad() *types.TrafficLoad { - if p == nil { - return nil - } - - p.lock.RLock() - defer p.lock.RUnlock() - - return p.trafficLoad -} - -func (p *ParticipantTrafficLoad) updateTrafficLoad() *types.TrafficLoad { - publishedTracks := p.params.Participant.GetPublishedTracks() - subscribedTracks := p.params.Participant.SubscriptionManager.GetSubscribedTracks() - - availableTracks := make(map[livekit.TrackID]bool, len(publishedTracks)+len(subscribedTracks)) - - upstreamAudioStats := make([]*types.TrafficStats, 0, len(publishedTracks)) - upstreamVideoStats := make([]*types.TrafficStats, 0, len(publishedTracks)) - - downstreamAudioStats := make([]*types.TrafficStats, 0, len(subscribedTracks)) - downstreamVideoStats := make([]*types.TrafficStats, 0, len(subscribedTracks)) - - p.lock.Lock() - defer p.lock.Unlock() - for _, pt := range publishedTracks { - lmt, ok := pt.(types.LocalMediaTrack) - if !ok { - continue - } - trackID := lmt.ID() - stats := lmt.GetTrackStats() - trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats) - if stats != nil { - p.tracksStatsMedia[trackID] = stats - availableTracks[trackID] = true - } - if trafficStats != nil { - switch lmt.Kind() { - case livekit.TrackType_AUDIO: - upstreamAudioStats = append(upstreamAudioStats, trafficStats) - case livekit.TrackType_VIDEO: - upstreamVideoStats = append(upstreamVideoStats, trafficStats) - } - } - } - - for _, st := range subscribedTracks { - trackID := st.ID() - stats := st.DownTrack().GetTrackStats() - trafficStats := types.RTPStatsDiffToTrafficStats(p.tracksStatsMedia[trackID], stats) - if stats != nil { - p.tracksStatsMedia[trackID] = stats - availableTracks[trackID] = true - } - if trafficStats != nil { - switch st.MediaTrack().Kind() { - case livekit.TrackType_AUDIO: - downstreamAudioStats = append(downstreamAudioStats, trafficStats) - case livekit.TrackType_VIDEO: - downstreamVideoStats = append(downstreamVideoStats, trafficStats) - } - } - } - - // remove unavailable tracks from track stats cache - for trackID := range p.tracksStatsMedia { - if !availableTracks[trackID] { - delete(p.tracksStatsMedia, trackID) - } - } - - trafficTypeStats := make([]*types.TrafficTypeStats, 0, 6) - addTypeStats := func(statsList []*types.TrafficStats, trackType livekit.TrackType, streamType livekit.StreamType) { - agg := types.AggregateTrafficStats(statsList...) - if agg != nil { - trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ - TrackType: trackType, - StreamType: streamType, - TrafficStats: agg, - }) - } - } - addTypeStats(upstreamAudioStats, livekit.TrackType_AUDIO, livekit.StreamType_UPSTREAM) - addTypeStats(upstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_UPSTREAM) - addTypeStats(downstreamAudioStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM) - addTypeStats(downstreamVideoStats, livekit.TrackType_VIDEO, livekit.StreamType_DOWNSTREAM) - - if p.params.DataChannelStats != nil { - dataChannelTraffic := p.params.DataChannelStats.GetTrafficTotals() - if p.dataChannelTraffic != nil { - trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ - TrackType: livekit.TrackType_DATA, - StreamType: livekit.StreamType_UPSTREAM, - TrafficStats: &types.TrafficStats{ - StartTime: p.dataChannelTraffic.At, - EndTime: dataChannelTraffic.At, - Packets: dataChannelTraffic.RecvMessages - p.dataChannelTraffic.RecvMessages, - Bytes: dataChannelTraffic.RecvBytes - p.dataChannelTraffic.RecvBytes, - }, - }) - - trafficTypeStats = append(trafficTypeStats, &types.TrafficTypeStats{ - TrackType: livekit.TrackType_DATA, - StreamType: livekit.StreamType_DOWNSTREAM, - TrafficStats: &types.TrafficStats{ - StartTime: p.dataChannelTraffic.At, - EndTime: dataChannelTraffic.At, - Packets: dataChannelTraffic.SendMessages - p.dataChannelTraffic.SendMessages, - Bytes: dataChannelTraffic.SendBytes - p.dataChannelTraffic.SendBytes, - }, - }) - } - p.dataChannelTraffic = dataChannelTraffic - } - - p.trafficLoad = &types.TrafficLoad{ - TrafficTypeStats: trafficTypeStats, - } - return p.trafficLoad -} - -func (p *ParticipantTrafficLoad) reporter() { - ticker := time.NewTicker(reportInterval) - defer ticker.Stop() - - for { - select { - case <-p.closed.Watch(): - return - - case <-ticker.C: - trafficLoad := p.updateTrafficLoad() - if onTrafficLoad := p.getOnTrafficLoad(); onTrafficLoad != nil { - onTrafficLoad(trafficLoad) - } - } - } -} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 01b874c626..b51aaa784a 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -394,7 +394,6 @@ type LocalParticipant interface { OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) - OnTrafficLoad(callback func(trafficLoad *TrafficLoad)) HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) @@ -423,8 +422,6 @@ type LocalParticipant interface { SetSubscriberChannelCapacity(channelCapacity int64) GetPacer() pacer.Pacer - - GetTrafficLoad() *TrafficLoad } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/rtc/types/trafficstats.go b/pkg/rtc/types/trafficstats.go deleted file mode 100644 index f5c0b386df..0000000000 --- a/pkg/rtc/types/trafficstats.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "time" - - "github.com/livekit/protocol/livekit" -) - -type TrafficStats struct { - StartTime time.Time - EndTime time.Time - Packets uint32 - PacketsLost uint32 - PacketsPadding uint32 - PacketsOutOfOrder uint32 - Bytes uint64 -} - -type TrafficTypeStats struct { - TrackType livekit.TrackType - StreamType livekit.StreamType - TrafficStats *TrafficStats -} - -type TrafficLoad struct { - TrafficTypeStats []*TrafficTypeStats -} - -func RTPStatsDiffToTrafficStats(before, after *livekit.RTPStats) *TrafficStats { - if after == nil { - return nil - } - - startTime := after.StartTime - if before != nil { - startTime = before.EndTime - } - - getAfter := func() *TrafficStats { - return &TrafficStats{ - StartTime: startTime.AsTime(), - EndTime: after.EndTime.AsTime(), - Packets: after.Packets, - PacketsLost: after.PacketsLost, - PacketsPadding: after.PacketsPadding, - PacketsOutOfOrder: after.PacketsOutOfOrder, - Bytes: after.Bytes + after.BytesDuplicate + after.BytesPadding, - } - } - - if before == nil { - return getAfter() - } - - if (after.Packets - before.Packets) > (1 << 31) { - // after packets < before packets, probably got reset, just return after - return getAfter() - } - if ((after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding)) > (1 << 63) { - // after bytes < before bytes, probably got reset, just return after - return getAfter() - } - - packetsLost := uint32(0) - if after.PacketsLost >= before.PacketsLost { - packetsLost = after.PacketsLost - before.PacketsLost - } - return &TrafficStats{ - StartTime: startTime.AsTime(), - EndTime: after.EndTime.AsTime(), - Packets: after.Packets - before.Packets, - PacketsLost: packetsLost, - PacketsPadding: after.PacketsPadding - before.PacketsPadding, - PacketsOutOfOrder: after.PacketsOutOfOrder - before.PacketsOutOfOrder, - Bytes: (after.Bytes + after.BytesDuplicate + after.BytesPadding) - (before.Bytes + before.BytesDuplicate + before.BytesPadding), - } -} - -func AggregateTrafficStats(statsList ...*TrafficStats) *TrafficStats { - if len(statsList) == 0 { - return nil - } - - startTime := time.Time{} - endTime := time.Time{} - - packets := uint32(0) - packetsLost := uint32(0) - packetsPadding := uint32(0) - packetsOutOfOrder := uint32(0) - bytes := uint64(0) - - for _, stats := range statsList { - if startTime.IsZero() || startTime.After(stats.StartTime) { - startTime = stats.StartTime - } - - if endTime.IsZero() || endTime.Before(stats.EndTime) { - endTime = stats.EndTime - } - - packets += stats.Packets - packetsLost += stats.PacketsLost - packetsPadding += stats.PacketsPadding - packetsOutOfOrder += stats.PacketsOutOfOrder - bytes += stats.Bytes - } - - if endTime.IsZero() { - endTime = time.Now() - } - return &TrafficStats{ - StartTime: startTime, - EndTime: endTime, - Packets: packets, - PacketsLost: packetsLost, - PacketsPadding: packetsPadding, - PacketsOutOfOrder: packetsOutOfOrder, - Bytes: bytes, - } -} - -func TrafficLoadToTrafficRate(trafficLoad *TrafficLoad) ( - packetRateIn float64, - byteRateIn float64, - packetRateOut float64, - byteRateOut float64, -) { - if trafficLoad == nil { - return - } - - for _, trafficTypeStat := range trafficLoad.TrafficTypeStats { - elapsed := trafficTypeStat.TrafficStats.EndTime.Sub(trafficTypeStat.TrafficStats.StartTime).Seconds() - packetRate := float64(trafficTypeStat.TrafficStats.Packets) / elapsed - byteRate := float64(trafficTypeStat.TrafficStats.Bytes) / elapsed - switch trafficTypeStat.StreamType { - case livekit.StreamType_UPSTREAM: - packetRateIn += packetRate - byteRateIn += byteRate - case livekit.StreamType_DOWNSTREAM: - packetRateOut += packetRate - byteRateOut += byteRate - } - } - return -} diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 7705cda478..8a4bf1b86a 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -345,16 +345,6 @@ type FakeLocalParticipant struct { getSubscribedTracksReturnsOnCall map[int]struct { result1 []types.SubscribedTrack } - GetTrafficLoadStub func() *types.TrafficLoad - getTrafficLoadMutex sync.RWMutex - getTrafficLoadArgsForCall []struct { - } - getTrafficLoadReturns struct { - result1 *types.TrafficLoad - } - getTrafficLoadReturnsOnCall map[int]struct { - result1 *types.TrafficLoad - } GetTrailerStub func() []byte getTrailerMutex sync.RWMutex getTrailerArgsForCall []struct { @@ -636,11 +626,6 @@ type FakeLocalParticipant struct { onTrackUpdatedArgsForCall []struct { arg1 func(types.LocalParticipant, types.MediaTrack) } - OnTrafficLoadStub func(func(trafficLoad *types.TrafficLoad)) - onTrafficLoadMutex sync.RWMutex - onTrafficLoadArgsForCall []struct { - arg1 func(trafficLoad *types.TrafficLoad) - } ProtocolVersionStub func() types.ProtocolVersion protocolVersionMutex sync.RWMutex protocolVersionArgsForCall []struct { @@ -2722,59 +2707,6 @@ func (fake *FakeLocalParticipant) GetSubscribedTracksReturnsOnCall(i int, result }{result1} } -func (fake *FakeLocalParticipant) GetTrafficLoad() *types.TrafficLoad { - fake.getTrafficLoadMutex.Lock() - ret, specificReturn := fake.getTrafficLoadReturnsOnCall[len(fake.getTrafficLoadArgsForCall)] - fake.getTrafficLoadArgsForCall = append(fake.getTrafficLoadArgsForCall, struct { - }{}) - stub := fake.GetTrafficLoadStub - fakeReturns := fake.getTrafficLoadReturns - fake.recordInvocation("GetTrafficLoad", []interface{}{}) - fake.getTrafficLoadMutex.Unlock() - if stub != nil { - return stub() - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeLocalParticipant) GetTrafficLoadCallCount() int { - fake.getTrafficLoadMutex.RLock() - defer fake.getTrafficLoadMutex.RUnlock() - return len(fake.getTrafficLoadArgsForCall) -} - -func (fake *FakeLocalParticipant) GetTrafficLoadCalls(stub func() *types.TrafficLoad) { - fake.getTrafficLoadMutex.Lock() - defer fake.getTrafficLoadMutex.Unlock() - fake.GetTrafficLoadStub = stub -} - -func (fake *FakeLocalParticipant) GetTrafficLoadReturns(result1 *types.TrafficLoad) { - fake.getTrafficLoadMutex.Lock() - defer fake.getTrafficLoadMutex.Unlock() - fake.GetTrafficLoadStub = nil - fake.getTrafficLoadReturns = struct { - result1 *types.TrafficLoad - }{result1} -} - -func (fake *FakeLocalParticipant) GetTrafficLoadReturnsOnCall(i int, result1 *types.TrafficLoad) { - fake.getTrafficLoadMutex.Lock() - defer fake.getTrafficLoadMutex.Unlock() - fake.GetTrafficLoadStub = nil - if fake.getTrafficLoadReturnsOnCall == nil { - fake.getTrafficLoadReturnsOnCall = make(map[int]struct { - result1 *types.TrafficLoad - }) - } - fake.getTrafficLoadReturnsOnCall[i] = struct { - result1 *types.TrafficLoad - }{result1} -} - func (fake *FakeLocalParticipant) GetTrailer() []byte { fake.getTrailerMutex.Lock() ret, specificReturn := fake.getTrailerReturnsOnCall[len(fake.getTrailerArgsForCall)] @@ -4357,38 +4289,6 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo return argsForCall.arg1 } -func (fake *FakeLocalParticipant) OnTrafficLoad(arg1 func(trafficLoad *types.TrafficLoad)) { - fake.onTrafficLoadMutex.Lock() - fake.onTrafficLoadArgsForCall = append(fake.onTrafficLoadArgsForCall, struct { - arg1 func(trafficLoad *types.TrafficLoad) - }{arg1}) - stub := fake.OnTrafficLoadStub - fake.recordInvocation("OnTrafficLoad", []interface{}{arg1}) - fake.onTrafficLoadMutex.Unlock() - if stub != nil { - fake.OnTrafficLoadStub(arg1) - } -} - -func (fake *FakeLocalParticipant) OnTrafficLoadCallCount() int { - fake.onTrafficLoadMutex.RLock() - defer fake.onTrafficLoadMutex.RUnlock() - return len(fake.onTrafficLoadArgsForCall) -} - -func (fake *FakeLocalParticipant) OnTrafficLoadCalls(stub func(func(trafficLoad *types.TrafficLoad))) { - fake.onTrafficLoadMutex.Lock() - defer fake.onTrafficLoadMutex.Unlock() - fake.OnTrafficLoadStub = stub -} - -func (fake *FakeLocalParticipant) OnTrafficLoadArgsForCall(i int) func(trafficLoad *types.TrafficLoad) { - fake.onTrafficLoadMutex.RLock() - defer fake.onTrafficLoadMutex.RUnlock() - argsForCall := fake.onTrafficLoadArgsForCall[i] - return argsForCall.arg1 -} - func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion { fake.protocolVersionMutex.Lock() ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)] @@ -6577,8 +6477,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getSubscribedParticipantsMutex.RUnlock() fake.getSubscribedTracksMutex.RLock() defer fake.getSubscribedTracksMutex.RUnlock() - fake.getTrafficLoadMutex.RLock() - defer fake.getTrafficLoadMutex.RUnlock() fake.getTrailerMutex.RLock() defer fake.getTrailerMutex.RUnlock() fake.handleAnswerMutex.RLock() @@ -6653,8 +6551,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onTrackUnpublishedMutex.RUnlock() fake.onTrackUpdatedMutex.RLock() defer fake.onTrackUpdatedMutex.RUnlock() - fake.onTrafficLoadMutex.RLock() - defer fake.onTrafficLoadMutex.RUnlock() fake.protocolVersionMutex.RLock() defer fake.protocolVersionMutex.RUnlock() fake.removePublishedTrackMutex.RLock()