-
-
Notifications
You must be signed in to change notification settings - Fork 316
/
sink_pool.go
121 lines (106 loc) · 3.23 KB
/
sink_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package system
import (
"sync"
"time"
)
// SinkName represents one of the registered sinks for a server.
type SinkName string
const (
// LogSink handles console output for game servers, including messages being
// sent via Wings to the console instance.
LogSink SinkName = "log"
// InstallSink handles installation output for a server.
InstallSink SinkName = "install"
)
// SinkPool represents a pool with sinks.
type SinkPool struct {
mu sync.RWMutex
sinks []chan []byte
}
// NewSinkPool returns a new empty SinkPool. A sink pool generally lives with a
// server instance for its full lifetime.
func NewSinkPool() *SinkPool {
return &SinkPool{}
}
// On adds a channel to the sink pool instance.
func (p *SinkPool) On(c chan []byte) {
p.mu.Lock()
p.sinks = append(p.sinks, c)
p.mu.Unlock()
}
// Off removes a given channel from the sink pool. If no matching sink is found
// this function is a no-op. If a matching channel is found, it will be removed.
func (p *SinkPool) Off(c chan []byte) {
p.mu.Lock()
defer p.mu.Unlock()
sinks := p.sinks
for i, sink := range sinks {
if c != sink {
continue
}
// We need to maintain the order of the sinks in the slice we're tracking,
// so shift everything to the left, rather than changing the order of the
// elements.
copy(sinks[i:], sinks[i+1:])
sinks[len(sinks)-1] = nil
sinks = sinks[:len(sinks)-1]
p.sinks = sinks
// Avoid a panic if the sink channel is nil at this point.
if c != nil {
close(c)
}
return
}
}
// Destroy destroys the pool by removing and closing all sinks and destroying
// all of the channels that are present.
func (p *SinkPool) Destroy() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.sinks {
if c != nil {
close(c)
}
}
p.sinks = nil
}
// Push sends a given message to each of the channels registered in the pool.
// This will use a Ring Buffer channel in order to avoid blocking the channel
// sends, and attempt to push though the most recent messages in the queue in
// favor of the oldest messages.
//
// If the channel becomes full and isn't being drained fast enough, this
// function will remove the oldest message in the channel, and then push the
// message that it got onto the end, effectively making the channel a rolling
// buffer.
//
// There is a potential for data to be lost when passing it through this
// function, but only in instances where the channel buffer is full and the
// channel is not drained fast enough, in which case dropping messages is most
// likely the best option anyways. This uses waitgroups to allow every channel
// to attempt its send concurrently thus making the total blocking time of this
// function "O(1)" instead of "O(n)".
func (p *SinkPool) Push(data []byte) {
p.mu.RLock()
defer p.mu.RUnlock()
var wg sync.WaitGroup
wg.Add(len(p.sinks))
for _, c := range p.sinks {
go func(c chan []byte) {
defer wg.Done()
select {
case c <- data:
case <-time.After(time.Millisecond * 10):
// If there is nothing in the channel to read, but we also cannot write
// to the channel, just skip over sending data. If we don't do this you'll
// end up blocking the application on the channel read below.
if len(c) == 0 {
break
}
<-c
c <- data
}
}(c)
}
wg.Wait()
}