-
Notifications
You must be signed in to change notification settings - Fork 51
/
delay_based_bwe.go
106 lines (83 loc) · 2.57 KB
/
delay_based_bwe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package gcc
import (
"sync"
"time"
"github.com/pion/interceptor/internal/cc"
"github.com/pion/logging"
)
// DelayStats contains some internal statistics of the delay based congestion
// controller
type DelayStats struct {
Measurement time.Duration
Estimate time.Duration
Threshold time.Duration
LastReceiveDelta time.Duration
Usage usage
State state
TargetBitrate int
}
type now func() time.Time
type delayController struct {
ackPipe chan<- []cc.Acknowledgment
ackRatePipe chan<- []cc.Acknowledgment
*arrivalGroupAccumulator
*rateController
onUpdateCallback func(DelayStats)
wg sync.WaitGroup
log logging.LeveledLogger
}
type delayControllerConfig struct {
nowFn now
initialBitrate int
minBitrate int
maxBitrate int
}
func newDelayController(c delayControllerConfig) *delayController {
ackPipe := make(chan []cc.Acknowledgment)
ackRatePipe := make(chan []cc.Acknowledgment)
delayController := &delayController{
ackPipe: ackPipe,
ackRatePipe: ackRatePipe,
arrivalGroupAccumulator: nil,
rateController: nil,
onUpdateCallback: nil,
wg: sync.WaitGroup{},
log: logging.NewDefaultLoggerFactory().NewLogger("gcc_delay_controller"),
}
rateController := newRateController(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate, func(ds DelayStats) {
delayController.log.Infof("delaystats: %v", ds)
if delayController.onUpdateCallback != nil {
delayController.onUpdateCallback(ds)
}
})
delayController.rateController = rateController
overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond, rateController.onDelayStats)
slopeEstimator := newSlopeEstimator(newKalman(), overuseDetector.onDelayStats)
arrivalGroupAccumulator := newArrivalGroupAccumulator()
rc := newRateCalculator(500 * time.Millisecond)
delayController.wg.Add(2)
go func() {
defer delayController.wg.Done()
arrivalGroupAccumulator.run(ackPipe, slopeEstimator.onArrivalGroup)
}()
go func() {
defer delayController.wg.Done()
rc.run(ackRatePipe, rateController.onReceivedRate)
}()
return delayController
}
func (d *delayController) onUpdate(f func(DelayStats)) {
d.onUpdateCallback = f
}
func (d *delayController) updateDelayEstimate(acks []cc.Acknowledgment) {
d.ackPipe <- acks
d.ackRatePipe <- acks
}
func (d *delayController) Close() error {
defer d.wg.Wait()
close(d.ackPipe)
close(d.ackRatePipe)
return nil
}