-
Notifications
You must be signed in to change notification settings - Fork 177
/
eventd.go
392 lines (327 loc) · 10.3 KB
/
eventd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
package eventd
import (
"context"
"errors"
"fmt"
"path"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
corev2 "github.com/sensu/sensu-go/api/core/v2"
"github.com/sensu/sensu-go/backend/liveness"
"github.com/sensu/sensu-go/backend/messaging"
"github.com/sensu/sensu-go/backend/store"
"github.com/sirupsen/logrus"
)
const (
// ComponentName identifies Eventd as the component/daemon implemented in this
// package.
ComponentName = "eventd"
// DefaultHandlerCount is the number of goroutines that will be allocated
// for processing incoming events.
DefaultHandlerCount = 1000
// EventsProcessedCounterVec is the name of the prometheus counter vec used to count events processed.
EventsProcessedCounterVec = "sensu_go_events_processed"
// EventsProcessedLabelName is the name of the label which stores prometheus values.
EventsProcessedLabelName = "status"
// EventsProcessedLabelSuccess is the name of the label used to count events processed successfully.
EventsProcessedLabelSuccess = "success"
)
var (
logger = logrus.WithFields(logrus.Fields{
"component": ComponentName,
})
// EventsProcessed counts the number of sensu go events processed.
EventsProcessed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: EventsProcessedCounterVec,
Help: "The total number of processed events",
},
[]string{EventsProcessedLabelName},
)
)
// Eventd handles incoming sensu events and stores them in etcd.
type Eventd struct {
store store.Store
eventStore store.EventStore
bus messaging.MessageBus
handlerCount int
livenessFactory liveness.Factory
eventChan chan interface{}
subscription messaging.Subscription
errChan chan error
mu *sync.Mutex
shutdownChan chan struct{}
wg *sync.WaitGroup
Logger Logger
}
// Option is a functional option.
type Option func(*Eventd) error
// Config configures Eventd
type Config struct {
Store store.Store
EventStore store.EventStore
Bus messaging.MessageBus
LivenessFactory liveness.Factory
}
// New creates a new Eventd.
func New(c Config, opts ...Option) (*Eventd, error) {
e := &Eventd{
store: c.Store,
eventStore: c.EventStore,
bus: c.Bus,
handlerCount: DefaultHandlerCount,
livenessFactory: c.LivenessFactory,
errChan: make(chan error, 1),
shutdownChan: make(chan struct{}, 1),
eventChan: make(chan interface{}, 100),
wg: &sync.WaitGroup{},
mu: &sync.Mutex{},
Logger: &RawLogger{},
}
for _, o := range opts {
if err := o(e); err != nil {
return nil, err
}
}
_ = prometheus.Register(EventsProcessed)
return e, nil
}
// Receiver returns the event receiver channel.
func (e *Eventd) Receiver() chan<- interface{} {
return e.eventChan
}
// Start eventd.
func (e *Eventd) Start() error {
e.wg.Add(e.handlerCount)
sub, err := e.bus.Subscribe(messaging.TopicEventRaw, "eventd", e)
e.subscription = sub
if err != nil {
return err
}
e.startHandlers()
return nil
}
func (e *Eventd) startHandlers() {
for i := 0; i < e.handlerCount; i++ {
go func() {
defer e.wg.Done()
for {
select {
case <-e.shutdownChan:
// drain the event channel.
for msg := range e.eventChan {
if err := e.handleMessage(msg); err != nil {
logger.WithError(err).Error("eventd - error handling event")
}
}
return
case msg, ok := <-e.eventChan:
// The message bus will close channels when it's shut down which means
// we will end up reading from a closed channel. If it's closed,
// return from this goroutine and emit a fatal error. It is then
// the responsility of eventd's parent to shutdown eventd.
//
// NOTE: Should that be the case? If eventd is signalling that it has,
// effectively, shutdown, why would something else be responsible for
// shutting it down.
if !ok {
// This only buffers a single error. We can't block on
// sending these or shutdown will block indefinitely.
select {
case e.errChan <- errors.New("event channel closed"):
default:
}
return
}
if err := e.handleMessage(msg); err != nil {
logger.WithError(err).Error("eventd - error handling event")
}
}
}
}()
}
}
// eventKey creates a key to identify the event for liveness monitoring
func eventKey(event *corev2.Event) string {
// Typically we want the entity name to be the thing we monitor, but if
// it's a round robin check, and there is no proxy entity, then use
// the check name instead.
if event.Check.RoundRobin && event.Entity.EntityClass != corev2.EntityProxyClass {
return path.Join(event.Check.Namespace, event.Check.Name)
}
return path.Join(event.Entity.Namespace, event.Check.Name, event.Entity.Name)
}
func (e *Eventd) handleMessage(msg interface{}) error {
event, ok := msg.(*corev2.Event)
if !ok {
return errors.New("received non-Event on event channel")
}
// Validate the received event
if err := event.Validate(); err != nil {
return err
}
// If the event does not contain a check (rather, it contains metrics)
// publish the event without writing to the store
if !event.HasCheck() {
e.Logger.Println(event)
return e.bus.Publish(messaging.TopicEvent, event)
}
ctx := context.WithValue(context.Background(), corev2.NamespaceKey, event.Entity.Namespace)
// Add any silenced subscriptions to the event
if err := getSilenced(ctx, event, e.store); err != nil {
return err
}
// Handle expire on resolve silenced entries
if err := handleExpireOnResolveEntries(ctx, event, e.store); err != nil {
return err
}
event, prevEvent, err := e.eventStore.UpdateEvent(ctx, event)
if err != nil {
return err
}
e.Logger.Println(event)
switches := e.livenessFactory("eventd", e.dead, e.alive, logger)
switchKey := eventKey(event)
if event.Check.Ttl > 0 {
// Reset the switch
timeout := int64(event.Check.Ttl)
if err := switches.Alive(context.TODO(), switchKey, timeout); err != nil {
return err
}
} else if prevEvent != nil && prevEvent.Check.Ttl > 0 {
// The check TTL has been disabled, there is no longer a need to track it
if err := switches.Bury(context.TODO(), switchKey); err != nil {
// It's better to publish the event even if this fails, so
// don't return the error here.
logger.WithError(err).Error("error burying switch")
}
}
EventsProcessed.WithLabelValues(EventsProcessedLabelSuccess).Inc()
return e.bus.Publish(messaging.TopicEvent, event)
}
func (e *Eventd) alive(key string, prev liveness.State, leader bool) (bury bool) {
lager := logger.WithFields(logrus.Fields{
"status": liveness.Alive.String(),
"previous_status": prev.String()})
namespace, check, entity, err := parseKey(key)
if err != nil {
lager.Error(err)
return false
}
lager = lager.WithFields(logrus.Fields{
"check": check,
"entity": entity,
"namespace": namespace})
lager.Info("check TTL reset")
return false
}
func (e *Eventd) dead(key string, prev liveness.State, leader bool) (bury bool) {
lager := logger.WithFields(logrus.Fields{
"status": liveness.Dead.String(),
"previous_status": prev.String()})
namespace, check, entity, err := parseKey(key)
if err != nil {
lager.Error(err)
return false
}
lager = lager.WithFields(logrus.Fields{
"check": check,
"entity": entity,
"namespace": namespace})
lager.Warn("check TTL expired")
// NOTE: To support check TTL for round robin scheduling, load all events
// here, filter by check, and update all events involved in the round robin
if entity == "" {
lager.Error("round robin check TTL not supported")
return false
}
ctx := store.NamespaceContext(context.Background(), namespace)
// The entity has been deleted, and so there is no reason to track check
// TTL for it anymore.
if ent, err := e.store.GetEntityByName(ctx, entity); err == nil && ent == nil {
return true
}
event, err := e.eventStore.GetEventByEntityCheck(ctx, entity, check)
if err != nil {
lager.WithError(err).Error("can't handle check TTL failure")
return false
}
if event == nil {
// The user deleted the check event but not the entity
lager.Error("event is nil")
return false
}
if leader {
if err := e.handleFailure(event); err != nil {
lager.WithError(err).Error("can't handle check TTL failure")
}
}
return false
}
func parseKey(key string) (namespace, check, entity string, err error) {
parts := strings.Split(key, "/")
if len(parts) == 2 {
return parts[0], parts[1], "", nil
}
if len(parts) == 3 {
return parts[0], parts[1], parts[2], nil
}
return "", "", "", errors.New("bad key")
}
// handleFailure creates a check event with a warn status and publishes it to
// TopicEvent.
func (e *Eventd) handleFailure(event *corev2.Event) error {
entity := event.Entity
ctx := context.WithValue(context.Background(), corev2.NamespaceKey, entity.Namespace)
failedCheckEvent, err := e.createFailedCheckEvent(ctx, event)
if err != nil {
return err
}
updatedEvent, _, err := e.eventStore.UpdateEvent(ctx, failedCheckEvent)
if err != nil {
return err
}
return e.bus.Publish(messaging.TopicEvent, updatedEvent)
}
func (e *Eventd) createFailedCheckEvent(ctx context.Context, event *corev2.Event) (*corev2.Event, error) {
if !event.HasCheck() {
return nil, errors.New("event does not contain a check")
}
event, err := e.eventStore.GetEventByEntityCheck(
ctx, event.Entity.Name, event.Check.Name,
)
if err != nil {
return nil, err
}
check := corev2.NewCheck(corev2.NewCheckConfigFromFace(event.Check))
output := fmt.Sprintf("Last check execution was %d seconds ago", time.Now().Unix()-event.Check.Executed)
check.Output = output
check.Status = 1
check.State = corev2.EventFailingState
check.Executed = time.Now().Unix()
check.MergeWith(event.Check)
event.Timestamp = time.Now().Unix()
event.Check = check
return event, nil
}
// Stop eventd.
func (e *Eventd) Stop() error {
logger.Info("shutting down eventd")
if err := e.subscription.Cancel(); err != nil {
logger.WithError(err).Error("unable to unsubscribe from message bus")
}
close(e.eventChan)
close(e.shutdownChan)
e.wg.Wait()
return nil
}
// Err returns a channel to listen for terminal errors on.
func (e *Eventd) Err() <-chan error {
return e.errChan
}
// Name returns the daemon name
func (e *Eventd) Name() string {
return "eventd"
}