This repository has been archived by the owner on Jan 31, 2018. It is now read-only.
/
multee.go
146 lines (131 loc) · 3.47 KB
/
multee.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package writer
import (
"sync"
"github.com/twitchscience/aws_utils/logger"
)
// Multee implements the `SpadeWriter` and 'SpadeWriterManager' interface and forwards all calls
// to a map of targets.
type Multee struct {
// targets is the spadewriters we will Multee events to
targets map[string]SpadeWriter
sync.RWMutex
}
// SpadeWriterManager allows operations on a set of SpadeWriters
type SpadeWriterManager interface {
Add(key string, w SpadeWriter)
Drop(key string)
Replace(key string, newWriter SpadeWriter)
}
// Add adds a new writer to the target map
func (t *Multee) Add(key string, w SpadeWriter) {
t.Lock()
defer t.Unlock()
_, exists := t.targets[key]
if exists {
logger.WithField("key", key).Error("Could not add SpadeWriter due to key collision")
return
}
t.targets[key] = w
}
// NewMultee makes a empty multee and returns it
func NewMultee() *Multee {
return &Multee{
targets: make(map[string]SpadeWriter),
}
}
// Drop drops an existing writer from the target map
func (t *Multee) Drop(key string) {
t.Lock()
defer t.Unlock()
logger.WithField("key", key).Info("Dropping writer...")
writer, exists := t.targets[key]
if !exists {
logger.WithField("key", key).Error("Could not drop SpadeWriter due to non existent key")
return
}
logger.Go(func() {
err := writer.Close()
if err != nil {
logger.WithError(err).
WithField("writer_key", key).
Error("Failed to close SpadeWriter on drop")
}
})
delete(t.targets, key)
logger.WithField("key", key).Info("Done dropping writer")
}
// Replace adds a new writer to the target map
func (t *Multee) Replace(key string, newWriter SpadeWriter) {
t.Lock()
defer t.Unlock()
logger.WithField("key", key).Info("Replacing writer...")
oldWriter, exists := t.targets[key]
if !exists {
logger.WithField("key", key).Error("Could not replace SpadeWriter due to non existent key")
return
}
logger.Go(func() {
err := oldWriter.Close()
if err != nil {
logger.WithError(err).
WithField("writer_key", key).
Error("Failed to close SpadeWriter on replace")
}
})
delete(t.targets, key)
t.targets[key] = newWriter
logger.WithField("key", key).Info("Done replacing writer")
}
// Write forwards a writerequest to multiple targets
func (t *Multee) Write(r *WriteRequest) {
t.RLock()
defer t.RUnlock()
for _, writer := range t.targets {
writer.Write(r)
}
}
// Rotate forwards a rotation request to multiple targets
func (t *Multee) Rotate() (bool, error) {
t.RLock()
defer t.RUnlock()
allDone := true
for k, writer := range t.targets {
// losing errors here. Alternatives are to
// not rotate writers further down the
// chain, or to return an arbitrary error
// out of all possible ones that occured
done, err := writer.Rotate()
if err != nil {
logger.WithError(err).WithField("writer_key", k).Error("Failed to forward rotation request")
allDone = false
} else {
allDone = allDone && done
}
}
return allDone, nil
}
// Close closes all the target writers, it does this asynchronously
func (t *Multee) Close() error {
t.Lock()
defer t.Unlock()
var wg sync.WaitGroup
wg.Add(len(t.targets))
for key, writer := range t.targets {
k := key
w := writer
// losing errors here. Alternative is to
// return an arbitrary error out of all
// possible ones that occured
logger.Go(func() {
defer wg.Done()
err := w.Close()
if err != nil {
logger.WithError(err).
WithField("writer_key", k).
Error("Failed to forward rotation request")
}
})
}
wg.Wait()
return nil
}