-
Notifications
You must be signed in to change notification settings - Fork 77
/
event_publisher.go
367 lines (320 loc) · 11 KB
/
event_publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
package events
import (
"encoding/json"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/launchdarkly/ld-relay/v8/internal/credential"
"github.com/launchdarkly/ld-relay/v8/internal/httpconfig"
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
ldevents "github.com/launchdarkly/go-sdk-events/v3"
)
const (
defaultFlushInterval = time.Minute
defaultCapacity = 1000
inputQueueSize = 100
defaultEventsURIPath = "/bulk"
)
var (
defaultEventsBaseURI, _ = url.Parse("https://events.launchdarkly.com") //nolint:gochecknoglobals
)
// EventPublisher is the interface for the component that buffers events and delivers them to LaunchDarkly.
// Events are treated as raw JSON data; the component does not do any parsing or transformation of them.
//
// A single instance of the component exists for each environment, associated with a single credential (such
// as an SDK key). However, it can maintain multiple buffers if it is necessary to deliver events in
// separate batches due to SDKs providing different header metadata, as represented by EventPublisherContext.
//
// The only implementation of this in Relay is HTTPEventPublisher. It is an interface only so that it can
// be mocked in test code.
type EventPublisher interface {
// Publish adds any number of JSON elements to the queue.
//
// The EventPayloadMetadata value provides a way to distinguish between batches of events that have
// different header metadata. If no such distinction is needed, it can simply be an empty
// EventPayloadMetadata{}. Otherwise, each distinct value of EventPayloadMetadata gets its own event
// queue, all of which will be flushed at the same time but delivered in separate HTTP posts.
Publish(EventPayloadMetadata, ...json.RawMessage)
// Flush attempts to deliver all queued events.
Flush()
// ReplaceCredential changes the authorization credential used when sending events, if the previous
// credential was of the same type.
ReplaceCredential(credential.SDKCredential)
// Close releases all resources used by this object.
Close()
}
// EventPayloadMetadata represents HTTP header metadata that may be included in an event post from an SDK, which
// Relay should copy when it forwards the events to LaunchDarkly.
type EventPayloadMetadata struct {
// SchemaVersion is the numeric value of the X-LaunchDarkly-Event-Schema header, or 1 if unknown
// (in version 1, this header was not used).
SchemaVersion int
// Tags is the value of the X-LaunchDarkly-Tags header, or "" if none.
Tags string
}
// GetEventPayloadMetadata parses EventPayloadMetadata values from an HTTP request.
func GetEventPayloadMetadata(req *http.Request) EventPayloadMetadata {
ret := EventPayloadMetadata{
Tags: req.Header.Get(TagsHeader),
}
ret.SchemaVersion, _ = strconv.Atoi(req.Header.Get(EventSchemaHeader))
if ret.SchemaVersion <= 0 {
ret.SchemaVersion = 1
}
return ret
}
// HTTPEventPublisher is the standard implementation of EventPublisher.
type HTTPEventPublisher struct {
baseURI string
uriPath string
eventsURI url.URL
loggers ldlog.Loggers
client *http.Client
authKey credential.SDKCredential
baseHeaders http.Header
closer chan<- struct{}
closeOnce sync.Once
wg sync.WaitGroup
inputQueue chan interface{}
// Acts as a signal to tell the publisher any future events can just be
// dropped.
//
// When event sending encounters an unrecoverable failure, we don't want to
// continue sending events. However, the publisher needs to continue draining
// the publisher channel to prevent go routines from backing up.
disableQueue chan interface{}
disabled bool
queues map[EventPayloadMetadata]*publisherQueue
capacity int
overflowed bool
lock sync.RWMutex
}
type eventBatch struct {
metadata EventPayloadMetadata
events []json.RawMessage
}
type publisherQueue struct {
events []json.RawMessage
}
type flush struct{}
// OptionType defines optional parameters for NewHTTPEventPublisher.
type OptionType interface {
apply(*HTTPEventPublisher) error
}
// OptionBaseURI specifies a custom base URI for the events service.
type OptionBaseURI string
func (o OptionBaseURI) apply(p *HTTPEventPublisher) error {
_, err := url.Parse(strings.TrimRight(string(o), "/") + defaultEventsURIPath)
if err == nil {
p.baseURI = string(o)
}
return nil
}
// OptionURIPath specifies a custom endpoint URI path for the events service.
type OptionURIPath string
func (o OptionURIPath) apply(p *HTTPEventPublisher) error {
p.uriPath = string(o)
return nil
}
// OptionFlushInterval specifies the interval for automatic flushes.
type OptionFlushInterval time.Duration
func (o OptionFlushInterval) apply(p *HTTPEventPublisher) error {
return nil
}
// OptionCapacity specifies the event queue capacity.
type OptionCapacity int
func (o OptionCapacity) apply(p *HTTPEventPublisher) error {
p.capacity = int(o)
return nil
}
// NewHTTPEventPublisher creates a new HTTPEventPublisher.
func NewHTTPEventPublisher(authKey credential.SDKCredential, httpConfig httpconfig.HTTPConfig, loggers ldlog.Loggers, options ...OptionType) (*HTTPEventPublisher, error) {
closer := make(chan struct{})
client := httpConfig.Client()
baseHeaders := make(http.Header)
for k, v := range httpConfig.SDKHTTPConfig.DefaultHeaders {
baseHeaders[k] = v
}
baseHeaders.Del("Authorization") // we don't necessarily want an SDK key here - we'll decide in makeEventSender()
inputQueue := make(chan interface{}, inputQueueSize)
disableQueue := make(chan interface{}, 1)
p := &HTTPEventPublisher{
baseHeaders: baseHeaders,
client: client,
eventsURI: *defaultEventsBaseURI,
authKey: authKey,
closer: closer,
capacity: defaultCapacity,
inputQueue: inputQueue,
disableQueue: disableQueue,
loggers: loggers,
}
flushInterval := defaultFlushInterval
for _, o := range options {
err := o.apply(p)
if err != nil {
return nil, err // COVERAGE: can't happen in unit tests
}
if o, ok := o.(OptionFlushInterval); ok {
if o > 0 {
flushInterval = time.Duration(o)
}
}
}
p.queues = make(map[EventPayloadMetadata]*publisherQueue)
p.wg.Add(1)
ticker := time.NewTicker(flushInterval)
go func() {
for {
if err := recover(); err != nil { // COVERAGE: can't happen in unit tests
p.loggers.Errorf("Unexpected panic in event relay : %+v", err)
continue
}
EventLoop:
for {
select {
case <-disableQueue:
p.loggers.Warnf("Discarding in-memory and all future events due to unrecoverable failure when sending events.")
ticker.Stop()
// Ensure we free up as much memory as we can by clearing any pending events
p.queues = make(map[EventPayloadMetadata]*publisherQueue)
p.disabled = true
case e := <-inputQueue:
if p.disabled {
continue
}
switch e := e.(type) {
case flush:
p.flush()
case eventBatch:
p.append(e)
}
case <-ticker.C:
p.flush()
case <-closer:
break EventLoop
}
}
ticker.Stop()
p.wg.Done()
break
}
}()
return p, nil
}
func (p *HTTPEventPublisher) append(batch eventBatch) {
queue := p.queues[batch.metadata]
if queue == nil {
queue = &publisherQueue{events: make([]json.RawMessage, 0, p.capacity)}
p.queues[batch.metadata] = queue
}
available := p.capacity - len(queue.events)
taken := len(batch.events)
if available < len(batch.events) {
if !p.overflowed {
p.loggers.Warnf("Exceeded event queue capacity of %d. Increase capacity to avoid dropping events.", p.capacity)
p.overflowed = true
}
taken = available
} else {
p.overflowed = false
}
queue.events = append(queue.events, batch.events[:taken]...)
}
func (p *HTTPEventPublisher) ReplaceCredential(newCredential credential.SDKCredential) { //nolint:golint // method is already documented in interface
p.lock.Lock()
if reflect.TypeOf(newCredential) == reflect.TypeOf(p.authKey) {
p.authKey = newCredential
}
p.lock.Unlock()
}
func (p *HTTPEventPublisher) Publish(metadata EventPayloadMetadata, events ...json.RawMessage) { //nolint:golint // method is already documented in interface
p.inputQueue <- eventBatch{metadata, events}
}
func (p *HTTPEventPublisher) Flush() { //nolint:golint // method is already documented in interface
p.inputQueue <- flush{}
}
func (p *HTTPEventPublisher) flush() {
// Notes on implementation of this method:
// - We are creating a new ldevents.EventSender for each payload delivery, because potentially
// each one could have different headers (based on EventPayloadMetadata) and also because the
// authorization key could change at any time. See comment on makeEventSender().
// - In the common case where we do *not* receive events with multiple distinct EventsMetadata
// values, we can save a tiny bit of overhead by reusing a single buffer. But if there are
// multiple values (and therefore multiple queues), we don't want to keep accumulating buffers
// that are never deallocated just because we received different metadata at some point. So in
// the multiple-queue case, we will discard any buffers that haven't been used since last flush.
if len(p.queues) == 0 {
return
}
queues := p.queues
discardingUnusedBuffers := false
if len(p.queues) > 1 {
// Recreate the map - we will re-add only the used buffers to it
p.queues = make(map[EventPayloadMetadata]*publisherQueue)
discardingUnusedBuffers = true
}
// We access p.authKey under lock because it can change
p.lock.RLock()
authKey := p.authKey
p.lock.RUnlock()
for metadata, queue := range queues {
count := len(queue.events)
if count == 0 {
continue
}
payload, err := json.Marshal(queue.events)
queue.events = queue.events[0:0]
if discardingUnusedBuffers {
p.queues[metadata] = queue
}
if err != nil { // COVERAGE: can't happen in unit tests
p.loggers.Errorf("Unexpected error marshalling event json: %+v", err)
continue
}
p.wg.Add(1)
schemaVersion := metadata.SchemaVersion
tags := metadata.Tags
getBaseHeaders := func() http.Header {
ret := make(http.Header)
for k, v := range p.baseHeaders {
ret[k] = v
}
if authKey != nil && authKey.GetAuthorizationHeaderValue() != "" {
ret.Set("Authorization", authKey.GetAuthorizationHeaderValue())
}
if tags != "" {
ret.Set(TagsHeader, tags)
}
return ret
}
go func() {
// The EventSender created by ldevents.NewDefaultEventSender implements the standard retry behavior,
// and error logging, in its SendEventData method. Retries could cause this call to block for a while,
// so it's run on a separate goroutine.
sendConfig := ldevents.EventSenderConfiguration{
Client: p.client,
BaseURI: p.baseURI,
BaseHeaders: getBaseHeaders,
SchemaVersion: schemaVersion,
Loggers: p.loggers,
}
result := ldevents.SendEventDataWithRetry(sendConfig, ldevents.AnalyticsEventDataKind, p.uriPath, payload, count)
p.wg.Done()
if result.MustShutDown {
p.disableQueue <- struct{}{}
}
}()
}
}
func (p *HTTPEventPublisher) Close() { //nolint:golint // method is already documented in interface
p.closeOnce.Do(func() {
close(p.closer)
p.wg.Wait()
close(p.disableQueue)
})
}