This repository has been archived by the owner on Oct 11, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
/
redis.go
469 lines (393 loc) · 15.7 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
// Package redis provides a Redis-backed persistent feature store for the LaunchDarkly Go SDK.
//
// For more details about how and why you can use a persistent feature store, see:
// https://docs.launchdarkly.com/v2.0/docs/using-a-persistent-feature-store
//
// To use the Redis feature store with the LaunchDarkly client:
//
// store, err := redis.NewRedisFeatureStoreWithDefaults()
// if err != nil { ... }
//
// config := ld.DefaultConfig
// config.FeatureStore = store
// client, err := ld.MakeCustomClient("sdk-key", config, 5*time.Second)
//
// The default Redis pool configuration uses an address of localhost:6379, a maximum of 16
// concurrent connections, and blocking connection requests. To customize any properties of
// the connection pool, use the Pool option or the NewRedisFeatureStoreWithPool constructor.
// You may also customize other properties of the feature store by providing options to
// NewRedisFeatureStoreWithDefaults, for example:
//
// store, err := redis.NewRedisFeatureStoreWithDefaults(redis.URL(myRedisURL),
// redis.CacheTTL(30*time.Second))
//
// If you are also using Redis for other purposes, the feature store can coexist with
// other data as long as you are not using the same keys. By default, the keys used by the
// feature store will always start with "launchdarkly:"; you can change this to another
// prefix if desired.
package redis
import (
"encoding/json"
"fmt"
"log"
"os"
"time"
r "github.com/garyburd/redigo/redis"
ld "gopkg.in/launchdarkly/go-client.v4"
"gopkg.in/launchdarkly/go-client.v4/utils"
)
const (
// DefaultURL is the default URL for connecting to Redis, if you use
// NewRedisFeatureStoreWithDefaults. You can specify otherwise with the RedisURL option.
// If you are using the other constructors, you must specify the URL explicitly.
DefaultURL = "redis://localhost:6379"
// DefaultPrefix is a string that is prepended (along with a colon) to all Redis keys used
// by the feature store. You can change this value with the Prefix() option for
// NewRedisFeatureStoreWithDefaults, or with the "prefix" parameter to the other constructors.
DefaultPrefix = "launchdarkly"
// DefaultCacheTTL is the default amount of time that recently read or updated items will
// be cached in memory, if you use NewRedisFeatureStoreWithDefaults. You can specify otherwise
// with the CacheTTL option. If you are using the other constructors, their "timeout"
// parameter serves the same purpose and there is no default.
DefaultCacheTTL = 15 * time.Second
)
// FeatureStoreOption is the interface for optional configuration parameters that can be
// passed to NewRedisFeatureStoreWithDefaults. These include UseConfig, Prefix, CacheTTL, and UseLogger.
type FeatureStoreOption interface {
apply(store *redisFeatureStoreCore) error
}
type redisURLOption struct {
url string
}
func (o redisURLOption) apply(store *redisFeatureStoreCore) error {
store.redisURL = o.url
return nil
}
// URL creates an option for NewRedisFeatureStoreWithDefaults to specify the Redis host URL.
// If not specified, the default value is DefaultURL.
//
// store, err := redis.NewRedisFeatureStoreWithDefaults(redis.URL("redis://my-redis-host:6379"))
func URL(url string) FeatureStoreOption {
return redisURLOption{url}
}
// HostAndPort creates an option for NewRedisFeatureStoreWithDefaults to specify the Redis host
// address as a hostname and port.
//
// store, err := redis.NewRedisFeatureStoreWithDefaults(redis.HostAndPort("my-redis-host", 6379))
func HostAndPort(host string, port int) FeatureStoreOption {
return redisURLOption{fmt.Sprintf("redis://%s:%d", host, port)}
}
type redisPoolOption struct {
pool *r.Pool
}
func (o redisPoolOption) apply(store *redisFeatureStoreCore) error {
store.pool = o.pool
return nil
}
// Pool creates an option for NewRedisFeatureStoreWithDefaults to make the feature store
// use a specific connection pool configuration. If not specified, it will create a default
// configuration (see package description). Specifying this option will cause any address
// specified with RedisURL or RedisHostAndPort to be ignored.
//
// store, err := redis.NewRedisFeatureStoreWithDefaults(redis.Pool(myPool))
func Pool(pool *r.Pool) FeatureStoreOption {
return redisPoolOption{pool}
}
type prefixOption struct {
prefix string
}
func (o prefixOption) apply(store *redisFeatureStoreCore) error {
if o.prefix == "" {
store.prefix = DefaultPrefix
} else {
store.prefix = o.prefix
}
return nil
}
// Prefix creates an option for NewRedisFeatureStoreWithDefaults to specify a string
// that should be prepended to all Redis keys used by the feature store. A colon will be
// added to this automatically. If this is unspecified or empty, DefaultPrefix will be used.
//
// store, err := redis.NewRedisFeatureStoreWithDefaults(redis.Prefix("ld-data"))
func Prefix(prefix string) FeatureStoreOption {
return prefixOption{prefix}
}
type cacheTTLOption struct {
cacheTTL time.Duration
}
func (o cacheTTLOption) apply(store *redisFeatureStoreCore) error {
store.cacheTTL = o.cacheTTL
return nil
}
// CacheTTL creates an option for NewRedisFeatureStoreWithDefaults to set the amount of time
// that recently read or updated items should remain in an in-memory cache. This reduces the
// amount of database access if the same feature flags are being evaluated repeatedly. If it
// is zero, there will be no in-memory caching. The default value is DefaultCacheTTL.
//
// store, err := redis.NewRedisFeatureStoreWithDefaults(redis.CacheTTL(30*time.Second))
func CacheTTL(ttl time.Duration) FeatureStoreOption {
return cacheTTLOption{ttl}
}
type loggerOption struct {
logger ld.Logger
}
func (o loggerOption) apply(store *redisFeatureStoreCore) error {
store.logger = o.logger
return nil
}
// Logger creates an option for NewDynamoDBFeatureStore, to specify where to send log output.
// If not specified, a log.Logger is used.
//
// store, err := redis.NewRedisFeatureStoreWithDefaults(redis.Logger(myLogger))
func Logger(logger ld.Logger) FeatureStoreOption {
return loggerOption{logger}
}
// RedisFeatureStore is a Redis-backed feature store implementation.
type RedisFeatureStore struct { // nolint:golint // package name in type name
wrapper *utils.FeatureStoreWrapper
}
// redisFeatureStoreCore is the internal implementation, using the simpler interface defined in
// utils.FeatureStoreCore. The FeatureStoreWrapper wraps this to add caching. The only reason that
// there is a separate RedisFeatureStore type, instead of just using the FeatureStoreWrapper itself
// as the outermost object, is a historical one: the NewRedisFeatureStore constructors had already
// been defined as returning *RedisFeatureStore rather than the interface type.
type redisFeatureStoreCore struct {
prefix string
pool *r.Pool
redisURL string
cacheTTL time.Duration
logger ld.Logger
testTxHook func()
}
func newPool(url string) *r.Pool {
pool := &r.Pool{
MaxIdle: 20,
MaxActive: 16,
Wait: true,
IdleTimeout: 300 * time.Second,
Dial: func() (c r.Conn, err error) {
c, err = r.DialURL(url)
return
},
TestOnBorrow: func(c r.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
return pool
}
const initedKey = "$inited"
// NewRedisFeatureStoreFromUrl constructs a new Redis-backed feature store connecting to the
// specified URL. It uses a default connection pool configuration (see package description for details).
// The "prefix", "timeout", and "logger" parameters are equivalent to the Prefix, CacheTTL, and
// Logger options for NewRedisFeatureStoreWithDefaults.
//
// Deprecated: It is simpler to use NewRedisFeatureStoreWithDefaults(redis.URL(url)) and override
// any other defaults as needed.
func NewRedisFeatureStoreFromUrl(url, prefix string, timeout time.Duration, logger ld.Logger) *RedisFeatureStore {
return newStoreForDeprecatedConstructors(URL(url), Prefix(prefix), CacheTTL(timeout), Logger(logger))
}
// NewRedisFeatureStoreWithPool constructs a new Redis-backed feature store with the specified
// redigo pool configuration. The "prefix", "timeout", and "logger" parameters are equivalent to
// the Prefix, CacheTTL, and Logger options for NewRedisFeatureStoreWithDefaults.
//
// Deprecated: It is simpler to use NewRedisFeatureStoreWithDefaults(redis.Pool(pool)) and override
// any other defaults as needed.
func NewRedisFeatureStoreWithPool(pool *r.Pool, prefix string, timeout time.Duration, logger ld.Logger) *RedisFeatureStore {
return newStoreForDeprecatedConstructors(Pool(pool), Prefix(prefix), CacheTTL(timeout), Logger(logger))
}
// NewRedisFeatureStore constructs a new Redis-backed feature store connecting to the specified
// host and port. It uses a default connection pool configuration (see package description for details).
// The "prefix", "timeout", and "logger" parameters are equivalent to the Prefix, CacheTTL, and
// Logger options for NewRedisFeatureStoreWithDefaults.
//
// Deprecated: It is simpler to use NewRedisFeatureStoreWithDefaults(redis.HostAndPort(host, port))
// and override any other defaults as needed.
func NewRedisFeatureStore(host string, port int, prefix string, timeout time.Duration, logger ld.Logger) *RedisFeatureStore {
return newStoreForDeprecatedConstructors(HostAndPort(host, port), Prefix(prefix), CacheTTL(timeout), Logger(logger))
}
// NewRedisFeatureStoreWithDefaults constructs a new Redis-backed feature store
//
// By default, it uses DefaultURL as the Redis address, DefaultPrefix as the prefix for all keys,
// DefaultCacheTTL as the duration for in-memory caching, and a default connection pool configuration
// (see package description for details). You may override any of these with FeatureStoreOption values
// created with RedisURL, RedisHostAndPort, RedisPool, Prefix, CacheTTL, or Logger.
func NewRedisFeatureStoreWithDefaults(options ...FeatureStoreOption) (ld.FeatureStore, error) {
core, err := newRedisFeatureStoreInternal(options...)
if err != nil {
return nil, err
}
return utils.NewFeatureStoreWrapper(core), nil
}
func newStoreForDeprecatedConstructors(options ...FeatureStoreOption) *RedisFeatureStore {
core, err := newRedisFeatureStoreInternal(options...)
if err != nil {
return nil
}
return &RedisFeatureStore{wrapper: utils.NewFeatureStoreWrapper(core)}
}
func newRedisFeatureStoreInternal(options ...FeatureStoreOption) (*redisFeatureStoreCore, error) {
core := redisFeatureStoreCore{
prefix: DefaultPrefix,
redisURL: DefaultURL,
cacheTTL: DefaultCacheTTL,
}
for _, o := range options {
err := o.apply(&core)
if err != nil {
return nil, err
}
}
if core.logger == nil {
core.logger = defaultLogger()
}
if core.pool == nil {
core.logger.Printf("RedisFeatureStore: Using url: %s", core.redisURL)
core.pool = newPool(core.redisURL)
}
return &core, nil
}
// Get returns an individual object of a given type from the store
func (store *RedisFeatureStore) Get(kind ld.VersionedDataKind, key string) (ld.VersionedData, error) {
return store.wrapper.Get(kind, key)
}
// All returns all the objects of a given kind from the store
func (store *RedisFeatureStore) All(kind ld.VersionedDataKind) (map[string]ld.VersionedData, error) {
return store.wrapper.All(kind)
}
// Init populates the store with a complete set of versioned data
func (store *RedisFeatureStore) Init(allData map[ld.VersionedDataKind]map[string]ld.VersionedData) error {
return store.wrapper.Init(allData)
}
// Upsert inserts or replaces an item in the store unless there it already contains an item with an equal or larger version
func (store *RedisFeatureStore) Upsert(kind ld.VersionedDataKind, item ld.VersionedData) error {
return store.wrapper.Upsert(kind, item)
}
// Delete removes an item of a given kind from the store
func (store *RedisFeatureStore) Delete(kind ld.VersionedDataKind, key string, version int) error {
return store.wrapper.Delete(kind, key, version)
}
// Initialized returns whether redis contains an entry for this environment
func (store *RedisFeatureStore) Initialized() bool {
return store.wrapper.Initialized()
}
// Actual implementation methods are below - these are called by FeatureStoreWrapper, which adds
// caching behavior if necessary.
func (store *redisFeatureStoreCore) GetCacheTTL() time.Duration {
return store.cacheTTL
}
func (store *redisFeatureStoreCore) GetInternal(kind ld.VersionedDataKind, key string) (ld.VersionedData, error) {
c := store.getConn()
defer c.Close() // nolint:errcheck
jsonStr, err := r.String(c.Do("HGET", store.featuresKey(kind), key))
if err != nil {
if err == r.ErrNil {
store.logger.Printf("RedisFeatureStore: DEBUG: Key: %s not found in \"%s\"", key, kind.GetNamespace())
return nil, nil
}
return nil, err
}
item, jsonErr := utils.UnmarshalItem(kind, []byte(jsonStr))
if jsonErr != nil {
return nil, jsonErr
}
return item, nil
}
func (store *redisFeatureStoreCore) GetAllInternal(kind ld.VersionedDataKind) (map[string]ld.VersionedData, error) {
results := make(map[string]ld.VersionedData)
c := store.getConn()
defer c.Close() // nolint:errcheck
values, err := r.StringMap(c.Do("HGETALL", store.featuresKey(kind)))
if err != nil && err != r.ErrNil {
return nil, err
}
for k, v := range values {
item, jsonErr := utils.UnmarshalItem(kind, []byte(v))
if jsonErr != nil {
return nil, err
}
results[k] = item
}
return results, nil
}
// Init populates the store with a complete set of versioned data
func (store *redisFeatureStoreCore) InitInternal(allData map[ld.VersionedDataKind]map[string]ld.VersionedData) error {
c := store.getConn()
defer c.Close() // nolint:errcheck
_ = c.Send("MULTI")
for kind, items := range allData {
baseKey := store.featuresKey(kind)
_ = c.Send("DEL", baseKey)
for k, v := range items {
data, jsonErr := json.Marshal(v)
if jsonErr != nil {
return jsonErr
}
_ = c.Send("HSET", baseKey, k, data)
}
}
_ = c.Send("SET", store.initedKey(), "")
_, err := c.Do("EXEC")
return err
}
func (store *redisFeatureStoreCore) UpsertInternal(kind ld.VersionedDataKind, newItem ld.VersionedData) (ld.VersionedData, error) {
baseKey := store.featuresKey(kind)
key := newItem.GetKey()
for {
// We accept that we can acquire multiple connections here and defer inside loop but we don't expect many
c := store.getConn()
defer c.Close() // nolint:errcheck
_, err := c.Do("WATCH", baseKey)
if err != nil {
return nil, err
}
defer c.Send("UNWATCH") // nolint:errcheck // this should always succeed
if store.testTxHook != nil { // instrumentation for unit tests
store.testTxHook()
}
oldItem, err := store.GetInternal(kind, key)
if err != nil {
return nil, err
}
if oldItem != nil && oldItem.GetVersion() >= newItem.GetVersion() {
return oldItem, nil
}
data, jsonErr := json.Marshal(newItem)
if jsonErr != nil {
return nil, jsonErr
}
_ = c.Send("MULTI")
err = c.Send("HSET", baseKey, key, data)
if err == nil {
var result interface{}
result, err = c.Do("EXEC")
if err == nil {
if result == nil {
// if exec returned nothing, it means the watch was triggered and we should retry
store.logger.Printf("RedisFeatureStore: DEBUG: Concurrent modification detected, retrying")
continue
}
}
return newItem, nil
}
return nil, err
}
}
func (store *redisFeatureStoreCore) InitializedInternal() bool {
c := store.getConn()
defer c.Close() // nolint:errcheck
inited, _ := r.Bool(c.Do("EXISTS", store.initedKey()))
return inited
}
func (store *redisFeatureStoreCore) featuresKey(kind ld.VersionedDataKind) string {
return store.prefix + ":" + kind.GetNamespace()
}
func (store *redisFeatureStoreCore) initedKey() string {
return store.prefix + ":" + initedKey
}
func (store *redisFeatureStoreCore) getConn() r.Conn {
return store.pool.Get()
}
func defaultLogger() *log.Logger {
return log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags)
}