forked from quic-go/quic-go
/
window_update_manager.go
74 lines (59 loc) · 1.68 KB
/
window_update_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package quic
import (
"sync"
"github.com/lucas-clemente/quic-go/frames"
"github.com/lucas-clemente/quic-go/protocol"
)
type windowUpdateItem struct {
Offset protocol.ByteCount
Counter uint8
}
// windowUpdateManager manages window update frames for receiving data
type windowUpdateManager struct {
streamOffsets map[protocol.StreamID]*windowUpdateItem
mutex sync.RWMutex
}
// newWindowUpdateManager returns a new windowUpdateManager
func newWindowUpdateManager() *windowUpdateManager {
return &windowUpdateManager{
streamOffsets: make(map[protocol.StreamID]*windowUpdateItem),
}
}
// SetStreamOffset sets an offset for a stream
func (m *windowUpdateManager) SetStreamOffset(streamID protocol.StreamID, n protocol.ByteCount) {
m.mutex.Lock()
defer m.mutex.Unlock()
entry, ok := m.streamOffsets[streamID]
if !ok {
m.streamOffsets[streamID] = &windowUpdateItem{Offset: n}
return
}
if n > entry.Offset {
entry.Offset = n
entry.Counter = 0
}
}
// GetWindowUpdateFrames gets all the WindowUpdate frames that need to be sent
func (m *windowUpdateManager) GetWindowUpdateFrames() []*frames.WindowUpdateFrame {
m.mutex.RLock()
defer m.mutex.RUnlock()
var wuf []*frames.WindowUpdateFrame
for key, value := range m.streamOffsets {
if value.Counter >= protocol.WindowUpdateNumRepetitions {
continue
}
frame := frames.WindowUpdateFrame{
StreamID: key,
ByteOffset: value.Offset,
}
value.Counter++
wuf = append(wuf, &frame)
}
return wuf
}
// RemoveStream should be called when a stream is closed for receiving
func (m *windowUpdateManager) RemoveStream(streamID protocol.StreamID) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.streamOffsets, streamID)
}