forked from livekit/livekit
/
channelobserver.go
152 lines (122 loc) · 3.97 KB
/
channelobserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package streamallocator
import (
"fmt"
"time"
"github.com/whoyao/protocol/logger"
)
// ------------------------------------------------
type ChannelTrend int
const (
ChannelTrendNeutral ChannelTrend = iota
ChannelTrendClearing
ChannelTrendCongesting
)
func (c ChannelTrend) String() string {
switch c {
case ChannelTrendNeutral:
return "NEUTRAL"
case ChannelTrendClearing:
return "CLEARING"
case ChannelTrendCongesting:
return "CONGESTING"
default:
return fmt.Sprintf("%d", int(c))
}
}
// ------------------------------------------------
type ChannelCongestionReason int
const (
ChannelCongestionReasonNone ChannelCongestionReason = iota
ChannelCongestionReasonEstimate
ChannelCongestionReasonLoss
)
func (c ChannelCongestionReason) String() string {
switch c {
case ChannelCongestionReasonNone:
return "NONE"
case ChannelCongestionReasonEstimate:
return "ESTIMATE"
case ChannelCongestionReasonLoss:
return "LOSS"
default:
return fmt.Sprintf("%d", int(c))
}
}
// ------------------------------------------------
type ChannelObserverParams struct {
Name string
EstimateRequiredSamples int
EstimateDownwardTrendThreshold float64
EstimateCollapseThreshold time.Duration
NackWindowMinDuration time.Duration
NackWindowMaxDuration time.Duration
NackRatioThreshold float64
}
type ChannelObserver struct {
params ChannelObserverParams
logger logger.Logger
estimateTrend *TrendDetector
nackTracker *NackTracker
nackWindowStartTime time.Time
packets uint32
repeatedNacks uint32
}
func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver {
return &ChannelObserver{
params: params,
logger: logger,
estimateTrend: NewTrendDetector(TrendDetectorParams{
Name: params.Name + "-estimate",
Logger: logger,
RequiredSamples: params.EstimateRequiredSamples,
DownwardTrendThreshold: params.EstimateDownwardTrendThreshold,
CollapseThreshold: params.EstimateCollapseThreshold,
}),
nackTracker: NewNackTracker(NackTrackerParams{
Name: params.Name + "-nack",
Logger: logger,
WindowMinDuration: params.NackWindowMinDuration,
WindowMaxDuration: params.NackWindowMaxDuration,
RatioThreshold: params.NackRatioThreshold,
}),
}
}
func (c *ChannelObserver) SeedEstimate(estimate int64) {
c.estimateTrend.Seed(estimate)
}
func (c *ChannelObserver) AddEstimate(estimate int64) {
c.estimateTrend.AddValue(estimate)
}
func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32) {
c.nackTracker.Add(packets, repeatedNacks)
}
func (c *ChannelObserver) GetLowestEstimate() int64 {
return c.estimateTrend.GetLowest()
}
func (c *ChannelObserver) GetHighestEstimate() int64 {
return c.estimateTrend.GetHighest()
}
func (c *ChannelObserver) GetNackRatio() float64 {
return c.nackTracker.GetRatio()
}
func (c *ChannelObserver) GetNackHistory() []string {
return c.nackTracker.GetHistory()
}
func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason) {
estimateDirection := c.estimateTrend.GetDirection()
switch {
case estimateDirection == TrendDirectionDownward:
c.logger.Debugw("stream allocator: channel observer: estimate is trending downward", "channel", c.ToString())
return ChannelTrendCongesting, ChannelCongestionReasonEstimate
case c.nackTracker.IsTriggered():
c.logger.Debugw("stream allocator: channel observer: high rate of repeated NACKs", "channel", c.ToString())
return ChannelTrendCongesting, ChannelCongestionReasonLoss
case estimateDirection == TrendDirectionUpward:
return ChannelTrendClearing, ChannelCongestionReasonNone
}
return ChannelTrendNeutral, ChannelCongestionReasonNone
}
func (c *ChannelObserver) ToString() string {
return fmt.Sprintf("name: %s, estimate: {%s}, nack {%s}", c.params.Name, c.estimateTrend.ToString(), c.nackTracker.ToString())
}
// ------------------------------------------------