forked from cloudflare/cloudflared
/
readylist.go
151 lines (134 loc) · 3.38 KB
/
readylist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package h2mux
import "sync"
// ReadyList multiplexes several event signals onto a single channel.
type ReadyList struct {
// signalC is used to signal that a stream can be enqueued
signalC chan uint32
// waitC is used to signal the ID of the first ready descriptor
waitC chan uint32
// doneC is used to signal that run should terminate
doneC chan struct{}
closeOnce sync.Once
}
func NewReadyList() *ReadyList {
rl := &ReadyList{
signalC: make(chan uint32),
waitC: make(chan uint32),
doneC: make(chan struct{}),
}
go rl.run()
return rl
}
// ID is the stream ID
func (r *ReadyList) Signal(ID uint32) {
select {
case r.signalC <- ID:
// ReadyList already closed
case <-r.doneC:
}
}
func (r *ReadyList) ReadyChannel() <-chan uint32 {
return r.waitC
}
func (r *ReadyList) Close() {
r.closeOnce.Do(func() {
close(r.doneC)
})
}
func (r *ReadyList) run() {
defer close(r.waitC)
var queue readyDescriptorQueue
var firstReady *readyDescriptor
activeDescriptors := newReadyDescriptorMap()
for {
if firstReady == nil {
select {
case i := <-r.signalC:
firstReady = activeDescriptors.SetIfMissing(i)
case <-r.doneC:
return
}
}
select {
case r.waitC <- firstReady.ID:
activeDescriptors.Delete(firstReady.ID)
firstReady = queue.Dequeue()
case i := <-r.signalC:
newReady := activeDescriptors.SetIfMissing(i)
if newReady != nil {
// key doesn't exist
queue.Enqueue(newReady)
}
case <-r.doneC:
return
}
}
}
type readyDescriptor struct {
ID uint32
Next *readyDescriptor
}
// readyDescriptorQueue is a queue of readyDescriptors in the form of a singly-linked list.
// The nil readyDescriptorQueue is an empty queue ready for use.
type readyDescriptorQueue struct {
Head *readyDescriptor
Tail *readyDescriptor
}
func (q *readyDescriptorQueue) Empty() bool {
return q.Head == nil
}
func (q *readyDescriptorQueue) Enqueue(x *readyDescriptor) {
if x.Next != nil {
panic("enqueued already queued item")
}
if q.Empty() {
q.Head = x
q.Tail = x
} else {
q.Tail.Next = x
q.Tail = x
}
}
// Dequeue returns the first readyDescriptor in the queue, or nil if empty.
func (q *readyDescriptorQueue) Dequeue() *readyDescriptor {
if q.Empty() {
return nil
}
x := q.Head
q.Head = x.Next
x.Next = nil
return x
}
// readyDescriptorQueue is a map of readyDescriptors keyed by ID.
// It maintains a free list of deleted ready descriptors.
type readyDescriptorMap struct {
descriptors map[uint32]*readyDescriptor
free []*readyDescriptor
}
func newReadyDescriptorMap() *readyDescriptorMap {
return &readyDescriptorMap{descriptors: make(map[uint32]*readyDescriptor)}
}
// create or reuse a readyDescriptor if the stream is not in the queue.
// This avoid stream starvation caused by a single high-bandwidth stream monopolising the writer goroutine
func (m *readyDescriptorMap) SetIfMissing(key uint32) *readyDescriptor {
if _, ok := m.descriptors[key]; ok {
return nil
}
var newDescriptor *readyDescriptor
if len(m.free) > 0 {
// reuse deleted ready descriptors
newDescriptor = m.free[len(m.free)-1]
m.free = m.free[:len(m.free)-1]
} else {
newDescriptor = &readyDescriptor{}
}
newDescriptor.ID = key
m.descriptors[key] = newDescriptor
return newDescriptor
}
func (m *readyDescriptorMap) Delete(key uint32) {
if descriptor, ok := m.descriptors[key]; ok {
m.free = append(m.free, descriptor)
delete(m.descriptors, key)
}
}