-
Notifications
You must be signed in to change notification settings - Fork 4
/
event_processor.go
378 lines (345 loc) · 11.9 KB
/
event_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
package ldevents
import (
"sync"
"time"
"gopkg.in/launchdarkly/go-jsonstream.v1/jwriter"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldlog"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldtime"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldvalue"
)
type defaultEventProcessor struct {
inboxCh chan eventDispatcherMessage
inboxFullOnce sync.Once
closeOnce sync.Once
loggers ldlog.Loggers
}
type eventDispatcher struct {
config EventsConfiguration
outbox *eventsOutbox
flushCh chan *flushPayload
workersGroup *sync.WaitGroup
userKeys lruCache
lastKnownPastTime ldtime.UnixMillisecondTime
deduplicatedUsers int
eventsInLastBatch int
disabled bool
currentTimestampFn func() ldtime.UnixMillisecondTime
stateLock sync.Mutex
}
type flushPayload struct {
diagnosticEvent ldvalue.Value
events []Event
summary eventSummary
}
// Payload of the inboxCh channel.
type eventDispatcherMessage interface{}
type sendEventMessage struct {
event Event
}
type flushEventsMessage struct{}
type shutdownEventsMessage struct {
replyCh chan struct{}
}
type syncEventsMessage struct {
replyCh chan struct{}
}
const (
maxFlushWorkers = 5
)
// NewDefaultEventProcessor creates an instance of the default implementation of analytics event processing.
func NewDefaultEventProcessor(config EventsConfiguration) EventProcessor {
inboxCh := make(chan eventDispatcherMessage, config.Capacity)
startEventDispatcher(config, inboxCh)
return &defaultEventProcessor{
inboxCh: inboxCh,
loggers: config.Loggers,
}
}
func (ep *defaultEventProcessor) RecordFeatureRequestEvent(e FeatureRequestEvent) {
ep.postNonBlockingMessageToInbox(sendEventMessage{event: e})
}
func (ep *defaultEventProcessor) RecordIdentifyEvent(e IdentifyEvent) {
ep.postNonBlockingMessageToInbox(sendEventMessage{event: e})
}
func (ep *defaultEventProcessor) RecordCustomEvent(e CustomEvent) {
ep.postNonBlockingMessageToInbox(sendEventMessage{event: e})
}
func (ep *defaultEventProcessor) Flush() {
ep.postNonBlockingMessageToInbox(flushEventsMessage{})
}
func (ep *defaultEventProcessor) postNonBlockingMessageToInbox(e eventDispatcherMessage) {
select {
case ep.inboxCh <- e:
return
default: // COVERAGE: no way to simulate this condition in unit tests
}
// If the inbox is full, it means the eventDispatcher is seriously backed up with not-yet-processed events.
// This is unlikely, but if it happens, it means the application is probably doing a ton of flag evaluations
// across many goroutines-- so if we wait for a space in the inbox, we risk a very serious slowdown of the
// app. To avoid that, we'll just drop the event. The log warning about this will only be shown once.
ep.inboxFullOnce.Do(func() { // COVERAGE: no way to simulate this condition in unit tests
ep.loggers.Warn("Events are being produced faster than they can be processed; some events will be dropped")
})
}
func (ep *defaultEventProcessor) Close() error {
ep.closeOnce.Do(func() {
// We put the flush and shutdown messages directly into the channel instead of calling
// postNonBlockingMessageToInbox, because we *do* want to block to make sure there is room in the channel;
// these aren't analytics events, they are messages that are necessary for an orderly shutdown.
ep.inboxCh <- flushEventsMessage{}
m := shutdownEventsMessage{replyCh: make(chan struct{})}
ep.inboxCh <- m
<-m.replyCh
})
return nil
}
func startEventDispatcher(
config EventsConfiguration,
inboxCh <-chan eventDispatcherMessage,
) {
ed := &eventDispatcher{
config: config,
outbox: newEventsOutbox(config.Capacity, config.Loggers),
flushCh: make(chan *flushPayload, 1),
workersGroup: &sync.WaitGroup{},
userKeys: newLruCache(config.UserKeysCapacity),
currentTimestampFn: config.currentTimeProvider,
}
if ed.currentTimestampFn == nil {
ed.currentTimestampFn = ldtime.UnixMillisNow
}
// Start a fixed-size pool of workers that wait on flushTriggerCh. This is the
// maximum number of flushes we can do concurrently.
for i := 0; i < maxFlushWorkers; i++ {
go runFlushTask(config, ed.flushCh, ed.workersGroup, ed.handleResult)
}
if config.DiagnosticsManager != nil {
event := config.DiagnosticsManager.CreateInitEvent()
ed.sendDiagnosticsEvent(event)
}
go ed.runMainLoop(inboxCh)
}
func (ed *eventDispatcher) runMainLoop(
inboxCh <-chan eventDispatcherMessage,
) {
if err := recover(); err != nil { // COVERAGE: no way to simulate this condition in unit tests
ed.config.Loggers.Errorf("Unexpected panic in event processing thread: %+v", err)
}
flushInterval := ed.config.FlushInterval
if flushInterval <= 0 { // COVERAGE: no way to test this logic in unit tests
flushInterval = DefaultFlushInterval
}
userKeysFlushInterval := ed.config.UserKeysFlushInterval
if userKeysFlushInterval <= 0 { // COVERAGE: no way to test this logic in unit tests
userKeysFlushInterval = DefaultUserKeysFlushInterval
}
flushTicker := time.NewTicker(flushInterval)
usersResetTicker := time.NewTicker(userKeysFlushInterval)
var diagnosticsTicker *time.Ticker
var diagnosticsTickerCh <-chan time.Time
diagnosticsManager := ed.config.DiagnosticsManager
if diagnosticsManager != nil {
interval := ed.config.DiagnosticRecordingInterval
if interval > 0 {
if interval < MinimumDiagnosticRecordingInterval { // COVERAGE: no way to test this logic in unit tests
interval = DefaultDiagnosticRecordingInterval
}
} else {
if ed.config.forceDiagnosticRecordingInterval > 0 {
interval = ed.config.forceDiagnosticRecordingInterval
} else {
interval = DefaultDiagnosticRecordingInterval
}
}
diagnosticsTicker = time.NewTicker(interval)
diagnosticsTickerCh = diagnosticsTicker.C
}
for {
// Drain the response channel with a higher priority than anything else
// to ensure that the flush workers don't get blocked.
select {
case message := <-inboxCh:
switch m := message.(type) {
case sendEventMessage:
ed.processEvent(m.event)
case flushEventsMessage:
ed.triggerFlush()
case syncEventsMessage:
ed.workersGroup.Wait()
m.replyCh <- struct{}{}
case shutdownEventsMessage:
flushTicker.Stop()
usersResetTicker.Stop()
if diagnosticsTicker != nil {
diagnosticsTicker.Stop()
}
ed.workersGroup.Wait() // Wait for all in-progress flushes to complete
close(ed.flushCh) // Causes all idle flush workers to terminate
m.replyCh <- struct{}{}
return
}
case <-flushTicker.C:
ed.triggerFlush()
case <-usersResetTicker.C:
ed.userKeys.clear()
case <-diagnosticsTickerCh:
if diagnosticsManager == nil || !diagnosticsManager.CanSendStatsEvent() {
// COVERAGE: no way to test this logic in unit tests
break
}
event := diagnosticsManager.CreateStatsEventAndReset(
ed.outbox.droppedEvents,
ed.deduplicatedUsers,
ed.eventsInLastBatch,
)
ed.outbox.droppedEvents = 0
ed.deduplicatedUsers = 0
ed.eventsInLastBatch = 0
ed.sendDiagnosticsEvent(event)
}
}
}
func (ed *eventDispatcher) processEvent(evt Event) {
// Decide whether to add the event to the payload. Feature events may be added twice, once for
// the event (if tracked) and once for debugging.
willAddFullEvent := true
var debugEvent Event
inlinedUser := ed.config.InlineUsersInEvents
switch evt := evt.(type) {
case FeatureRequestEvent:
ed.outbox.addToSummary(evt) // add all feature events to summaries
willAddFullEvent = evt.TrackEvents
if ed.shouldDebugEvent(&evt) {
de := evt
de.Debug = true
debugEvent = de
}
case IdentifyEvent:
inlinedUser = true
}
// For each user we haven't seen before, we add an index event before the event that referenced
// the user - unless the event must contain an inline user (i.e. if InlineUsersInEvents is true,
// or if it is an identify event).
user := evt.GetBase().User
alreadySeenUser := ed.userKeys.add(user.GetKey())
if !(willAddFullEvent && inlinedUser) {
if alreadySeenUser {
ed.deduplicatedUsers++
} else {
indexEvent := indexEvent{
BaseEvent{CreationDate: evt.GetBase().CreationDate, User: user},
}
ed.outbox.addEvent(indexEvent)
}
}
if willAddFullEvent {
ed.outbox.addEvent(evt)
}
if debugEvent != nil {
ed.outbox.addEvent(debugEvent)
}
}
func (ed *eventDispatcher) shouldDebugEvent(evt *FeatureRequestEvent) bool {
if evt.DebugEventsUntilDate == 0 {
return false
}
// The "last known past time" comes from the last HTTP response we got from the server.
// In case the client's time is set wrong, at least we know that any expiration date
// earlier than that point is definitely in the past. If there's any discrepancy, we
// want to err on the side of cutting off event debugging sooner.
ed.stateLock.Lock() // This should be done infrequently since it's only for debug events
defer ed.stateLock.Unlock()
return evt.DebugEventsUntilDate > ed.lastKnownPastTime &&
evt.DebugEventsUntilDate > ed.currentTimestampFn()
}
// Signal that we would like to do a flush as soon as possible.
func (ed *eventDispatcher) triggerFlush() {
if ed.isDisabled() {
ed.outbox.clear()
return
}
// Is there anything to flush?
payload := ed.outbox.getPayload()
totalEventCount := len(payload.events)
if len(payload.summary.counters) > 0 {
totalEventCount++
}
if totalEventCount == 0 {
ed.eventsInLastBatch = 0
return
}
ed.workersGroup.Add(1) // Increment the count of active flushes
select {
case ed.flushCh <- &payload:
// If the channel wasn't full, then there is a worker available who will pick up
// this flush payload and send it. The event outbox and summary state can now be
// cleared from the main goroutine.
ed.eventsInLastBatch = totalEventCount
ed.outbox.clear()
default:
// We can't start a flush right now because we're waiting for one of the workers
// to pick up the last one. Do not reset the event outbox or summary state.
ed.workersGroup.Done()
}
}
func (ed *eventDispatcher) isDisabled() bool {
// Since we're using a mutex, we should avoid calling this often.
ed.stateLock.Lock()
defer ed.stateLock.Unlock()
return ed.disabled
}
func (ed *eventDispatcher) handleResult(result EventSenderResult) {
if result.MustShutDown {
ed.stateLock.Lock()
defer ed.stateLock.Unlock()
ed.disabled = true
} else if result.TimeFromServer > 0 {
ed.stateLock.Lock()
defer ed.stateLock.Unlock()
ed.lastKnownPastTime = result.TimeFromServer
}
}
func (ed *eventDispatcher) sendDiagnosticsEvent(
event ldvalue.Value,
) {
payload := flushPayload{diagnosticEvent: event}
ed.workersGroup.Add(1) // Increment the count of active flushes
select {
case ed.flushCh <- &payload:
// If the channel wasn't full, then there is a worker available who will pick up
// this flush payload and send it.
default:
// We can't start a flush right now because we're waiting for one of the workers
// to pick up the last one. We'll just discard this diagnostic event - presumably
// we'll send another one later anyway, and we don't want this kind of nonessential
// data to cause any kind of back-pressure.
ed.workersGroup.Done() // COVERAGE: no way to simulate this condition in unit tests
}
}
func runFlushTask(config EventsConfiguration, flushCh <-chan *flushPayload,
workersGroup *sync.WaitGroup, resultFn func(EventSenderResult)) {
formatter := eventOutputFormatter{
userFilter: newUserFilter(config),
config: config,
}
for {
payload, more := <-flushCh
if !more {
// Channel has been closed - we're shutting down
break
}
if !payload.diagnosticEvent.IsNull() {
w := jwriter.NewWriter()
payload.diagnosticEvent.WriteToJSONWriter(&w)
bytes := w.Bytes()
_ = config.EventSender.SendEventData(DiagnosticEventDataKind, bytes, 1)
} else {
bytes, count := formatter.makeOutputEvents(payload.events, payload.summary)
if len(bytes) > 0 {
result := config.EventSender.SendEventData(AnalyticsEventDataKind, bytes, count)
resultFn(result)
}
}
workersGroup.Done() // Decrement the count of in-progress flushes
}
}