/
pushredisdb.go
660 lines (591 loc) · 23 KB
/
pushredisdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
/*
* Copyright 2011 Nan Deng
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package db
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/go-redis/redis"
"github.com/uniqush/log"
"github.com/uniqush/uniqush-push/push"
)
// PushRedisDB is currently the only uniqush pushRawDatabase implementation.
// It stores push service providers, delivery points, etc. in redis.
type PushRedisDB struct {
client redisClient
psm *push.PushServiceManager
}
type redisClient interface {
Decr(key string) *redis.IntCmd
Del(keys ...string) *redis.IntCmd
Exists(keys ...string) *redis.IntCmd
FlushDB() *redis.StatusCmd // for tests only
Get(key string) *redis.StringCmd
Incr(key string) *redis.IntCmd
Keys(key string) *redis.StringSliceCmd
MGet(keys ...string) *redis.SliceCmd
Save() *redis.StatusCmd
SAdd(key string, members ...interface{}) *redis.IntCmd
SRem(key string, members ...interface{}) *redis.IntCmd
Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd
SMembers(key string) *redis.StringSliceCmd
}
type redisMultiClient struct {
masterClient *redis.Client
slaveClient *redis.Client
}
func (mc *redisMultiClient) Decr(key string) *redis.IntCmd {
return mc.masterClient.Decr(key)
}
func (mc *redisMultiClient) Del(keys ...string) *redis.IntCmd {
return mc.masterClient.Del(keys...)
}
func (mc *redisMultiClient) Exists(keys ...string) *redis.IntCmd {
return mc.slaveClient.Exists(keys...)
}
func (mc *redisMultiClient) FlushDB() *redis.StatusCmd {
return mc.masterClient.FlushDB()
}
func (mc *redisMultiClient) Get(key string) *redis.StringCmd {
return mc.slaveClient.Get(key)
}
func (mc *redisMultiClient) Incr(key string) *redis.IntCmd {
return mc.masterClient.Incr(key)
}
func (mc *redisMultiClient) Keys(key string) *redis.StringSliceCmd {
return mc.slaveClient.Keys(key)
}
func (mc *redisMultiClient) MGet(keys ...string) *redis.SliceCmd {
return mc.slaveClient.MGet(keys...)
}
func (mc *redisMultiClient) Save() *redis.StatusCmd {
return mc.masterClient.Save()
}
func (mc *redisMultiClient) SAdd(key string, members ...interface{}) *redis.IntCmd {
return mc.masterClient.SAdd(key, members...)
}
func (mc *redisMultiClient) SRem(key string, members ...interface{}) *redis.IntCmd {
return mc.masterClient.SRem(key, members...)
}
func (mc *redisMultiClient) Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
return mc.masterClient.Set(key, value, expiration)
}
func (mc *redisMultiClient) SMembers(key string) *redis.StringSliceCmd {
return mc.slaveClient.SMembers(key)
}
var _ redisClient = &redis.Client{}
var _ pushRawDatabase = &PushRedisDB{}
const (
// DeliveryPointPrefix is the prefix of keys for a redis STRING - Maps the delivery point name to a json blob of information about a delivery point.
DeliveryPointPrefix string = "delivery.point:"
// PushServiceProviderPrefix is the prefix of keys for a redis STRING - Maps a push service provider name to a json blob of information about it.
PushServiceProviderPrefix string = "push.service.provider:"
// ServiceSubscriberToDeliveryPointsPrefix is the prefix of keys for a redis SET - Maps a service name + subscriber to a set of delivery point names
ServiceSubscriberToDeliveryPointsPrefix string = "srv.sub-2-dp:"
// ServiceDeliveryPointToPushServiceProviderPrefix is the prefix of keys for a redis STRING - Maps a service name + delivery point name to the push service provider
ServiceDeliveryPointToPushServiceProviderPrefix string = "srv.dp-2-psp:"
// ServiceToPushServiceProvidersPrefix is the prefix of keys for a redis SET - Maps a service name to a set of PSP names
ServiceToPushServiceProvidersPrefix string = "srv-2-psp:"
// DeliveryPointCounterPrefix is the prefix of keys for a redis STRING - Maps a delivery point name to the number of subcribers(summed across each service).
DeliveryPointCounterPrefix string = "delivery.point.counter:"
// ServicesSet is the key for a redis SET - This is a set of service names.
ServicesSet string = "services{0}"
)
// buildRedisSlaveClient will optionally returns a redis client for uniqush-push to use for read-only operations (such as fetching subscriptions and services).
func buildRedisSlaveClient(c *DatabaseConfig) (*redis.Client, error) {
host := c.SlaveHost
port := c.SlavePort
name := c.Name
if host == "" && port <= 0 {
return nil, nil
}
if host == "" || port <= 0 {
return nil, errors.New("Missing redis slave host or port")
}
db, err := strconv.ParseInt(name, 10, 64)
if err != nil {
db = 0
}
ret := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
Password: c.Password,
DB: int(db),
})
return ret, nil
}
// buildRedisClient will build the client used to fetch and update subscriptions, services, etc.
func buildRedisClient(c *DatabaseConfig) (redisClient, error) {
if c == nil {
return nil, errors.New("Invalid Database Config")
}
if strings.ToLower(c.Engine) != "redis" {
return nil, errors.New("Unsupported Database Engine")
}
if c.Host == "" {
c.Host = "localhost"
}
if c.Port <= 0 {
c.Port = 6379
}
if c.Name == "" {
c.Name = "0"
}
db, err := strconv.ParseInt(c.Name, 10, 64)
if err != nil {
db = 0
}
client := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", c.Host, c.Port),
Password: c.Password,
DB: int(db),
})
if slaveClient, err := buildRedisSlaveClient(c); slaveClient != nil || err != nil {
if err != nil {
return nil, fmt.Errorf("Invalid Redis Slave Database Config: %s", err.Error())
}
dualClient := &redisMultiClient{
masterClient: client,
slaveClient: slaveClient,
}
return dualClient, nil
}
return client, nil
}
func buildPushRedisDB(client redisClient, psm *push.PushServiceManager) *PushRedisDB {
ret := new(PushRedisDB)
ret.client = client
ret.psm = psm
if ret.psm == nil {
ret.psm = push.GetPushServiceManager()
}
return ret
}
func newPushRedisDB(c *DatabaseConfig) (*PushRedisDB, error) {
client, err := buildRedisClient(c)
if err != nil {
return nil, err
}
ret := buildPushRedisDB(client, c.PushServiceManager)
return ret, nil
}
func (r *PushRedisDB) keyValueToDeliveryPoint(value []byte) (dp *push.DeliveryPoint, err error) {
psm := r.psm
dp, err = psm.BuildDeliveryPointFromBytes(value)
if err != nil {
dp = nil
}
return
}
func (r *PushRedisDB) keyValueToPushServiceProvider(value []byte) (psp *push.PushServiceProvider, err error) {
psm := r.psm
psp, err = psm.BuildPushServiceProviderFromBytes(value)
if err != nil {
psp = nil
}
return
}
func deliveryPointToValue(dp *push.DeliveryPoint) []byte {
return dp.Marshal()
}
func pushServiceProviderToValue(psp *push.PushServiceProvider) []byte {
return psp.Marshal()
}
func (r *PushRedisDB) mgetStrings(keys ...string) ([][]byte, error) {
data, err := r.client.MGet(keys...).Result()
if err != nil {
return nil, err
}
results := make([][]byte, len(data))
for i, result := range data {
if r, ok := result.(string); ok {
results[i] = []byte(r)
} else if result == nil {
results[i] = nil
} else {
// Nil?
return nil, fmt.Errorf("Unexpected mget result type got %T %#v", result, result)
}
}
return results, nil
}
func (r *PushRedisDB) mgetRawDeliveryPoints(deliveryPointNames ...string) ([][]byte, error) {
var deliveryPointKeys []string
for _, deliveryPointName := range deliveryPointNames {
deliveryPointKeys = append(deliveryPointKeys, DeliveryPointPrefix+deliveryPointName)
}
deliveryPointData, err := r.mgetStrings(deliveryPointKeys...)
if err != nil {
return nil, fmt.Errorf("Error getting deliveryPointKeys: %v", err)
}
return deliveryPointData, nil
}
// GetDeliveryPoint fetches the delivery point with a given generated name.
func (r *PushRedisDB) GetDeliveryPoint(name string) (*push.DeliveryPoint, error) {
b, err := r.client.Get(DeliveryPointPrefix + name).Bytes()
if err != nil {
return nil, fmt.Errorf("GetDeliveryPoint failed: %v", err)
}
if len(b) == 0 {
return nil, nil
}
return r.keyValueToDeliveryPoint(b)
}
// SetDeliveryPoint sets (adds or updates) the delivery point representation in the database.
func (r *PushRedisDB) SetDeliveryPoint(dp *push.DeliveryPoint) error {
err := r.client.Set(DeliveryPointPrefix+dp.Name(), deliveryPointToValue(dp), 0).Err()
return err
}
// GetPushServiceProvider will fetch and unserialize the push service provider with the given name.
func (r *PushRedisDB) GetPushServiceProvider(name string) (*push.PushServiceProvider, error) {
cmd := r.client.Get(PushServiceProviderPrefix + name)
b, err := cmd.Bytes()
if err != nil {
return nil, fmt.Errorf("GetPushServiceProvider failed: %v", err)
}
if len(b) == 0 {
return nil, nil
}
return r.keyValueToPushServiceProvider(b)
}
// GetPushServiceProviderConfigs will fetch and unserialize the push service providers with the given names.
func (r *PushRedisDB) GetPushServiceProviderConfigs(names []string) ([]*push.PushServiceProvider, []error) {
if len(names) == 0 {
return nil, nil
}
keys := make([]string, len(names))
for i, name := range names {
keys[i] = PushServiceProviderPrefix + name
}
values, err := r.mgetStrings(keys...)
if err != nil {
return nil, []error{fmt.Errorf("GetPushServiceProviderConfigs: %v", err)}
}
errors := make([]error, 0)
psps := make([]*push.PushServiceProvider, 0)
for i, value := range values {
if value == nil {
errors = append(errors, fmt.Errorf("Missing a PushServiceProvider for %q, key %q", names[i], keys[i]))
continue
}
psp, err := r.keyValueToPushServiceProvider(value)
if err != nil {
errors = append(errors, fmt.Errorf("Invalid psp for %s: %v", names[i], err))
} else {
psps = append(psps, psp)
}
}
return psps, errors
}
// SetPushServiceProvider will add or update the push service provider psp. The redis key is based on a hash of FixedData.
func (r *PushRedisDB) SetPushServiceProvider(psp *push.PushServiceProvider) error {
if err := r.client.Set(PushServiceProviderPrefix+psp.Name(), pushServiceProviderToValue(psp), 0).Err(); err != nil {
return fmt.Errorf("SetPushServiceProvider %q failed: %v", psp.Name(), err)
}
return nil
}
// RemoveDeliveryPoint will remove the data for a delivery point.
func (r *PushRedisDB) RemoveDeliveryPoint(dp string) error {
err := r.client.Del(DeliveryPointPrefix + dp).Err()
if err != nil {
return fmt.Errorf("RemoveDP %q failed: %v", dp, err)
}
return nil
}
// RemovePushServiceProvider will remove a push service provider's configuration
func (r *PushRedisDB) RemovePushServiceProvider(psp string) error {
err := r.client.Del(PushServiceProviderPrefix + psp).Err()
if err != nil {
return fmt.Errorf("RemovePSP %q failed: %v", psp, err)
}
return nil
}
// GetDeliveryPointsNameByServiceSubscriber will get the delivery point for a service and it's subscriber
func (r *PushRedisDB) GetDeliveryPointsNameByServiceSubscriber(srv, sub string) (map[string][]string, error) {
keys := make([]string, 1)
if !strings.Contains(sub, "*") && !strings.Contains(srv, "*") {
keys[0] = ServiceSubscriberToDeliveryPointsPrefix + srv + ":" + sub
} else {
var err error
keys, err = r.client.Keys(ServiceSubscriberToDeliveryPointsPrefix + srv + ":" + sub).Result()
if err != nil {
return nil, fmt.Errorf("GetDPsNameByServiceSubscriber dp lookup '%s:%s' failed: %v", srv, sub, err)
}
}
ret := make(map[string][]string, len(keys))
for _, k := range keys {
m, err := r.client.SMembers(k).Result()
if err != nil {
return nil, fmt.Errorf("GetDPsNameByServiceSubscriber smembers %q failed: %v", k, err)
}
if m == nil {
continue
}
elem := strings.Split(k, ":")
s := elem[1]
if l, ok := ret[s]; !ok || l == nil {
ret[s] = make([]string, 0, len(keys))
}
for _, bm := range m {
dpl := ret[s]
dpl = append(dpl, bm)
ret[s] = dpl
}
}
return ret, nil
}
// GetPushServiceProviderNameByServiceDeliveryPoint returns the push service provider name of a delivery point belonging to a given service name.
func (r *PushRedisDB) GetPushServiceProviderNameByServiceDeliveryPoint(srv, dp string) (string, error) {
b, err := r.client.Get(ServiceDeliveryPointToPushServiceProviderPrefix + srv + ":" + dp).Result()
if err != nil {
return "", fmt.Errorf("GetPSPNameByServiceDP failed: %v", err)
}
return b, nil
}
// AddDeliveryPointToServiceSubscriber will associate the name of the given delivery point with the given service name and subscriber name.
func (r *PushRedisDB) AddDeliveryPointToServiceSubscriber(srv, sub, dp string) error {
i, err := r.client.SAdd(ServiceSubscriberToDeliveryPointsPrefix+srv+":"+sub, dp).Result()
if err != nil {
return fmt.Errorf("AddDPToServiceSubscriber failed: %v", err)
}
if i == 0 { // Already exists
return nil
}
err = r.client.Incr(DeliveryPointCounterPrefix + dp).Err()
if err != nil {
return fmt.Errorf("AddDPToServiceSubscriber count tracking failed: %v", err)
}
return nil
}
// RemoveDeliveryPointFromServiceSubscriber will remove the given delivery point's name from the subscriber of the provided service.
func (r *PushRedisDB) RemoveDeliveryPointFromServiceSubscriber(srv, sub, dp string) error {
j, err := r.client.SRem(ServiceSubscriberToDeliveryPointsPrefix+srv+":"+sub, dp).Result()
if err != nil {
return fmt.Errorf("Removing the delivery point pointer %q from \"%s:%s\" failed", dp, srv, sub)
}
if j == 0 {
return nil
}
i, e := r.client.Decr(DeliveryPointCounterPrefix + dp).Result()
if e != nil {
return fmt.Errorf("Failed to decrement number of subscribers using dp %q: %v", dp, e)
}
if i <= 0 {
e0 := r.client.Del(DeliveryPointCounterPrefix + dp).Err()
if e0 != nil {
return fmt.Errorf("Failed to remove counter for %q: %v", dp, e0)
}
e1 := r.client.Del(DeliveryPointPrefix + dp).Err()
if e1 != nil {
return fmt.Errorf("Failed to remove delivery point info for %q: %v", dp, e1)
}
}
return nil
}
// removeMissingDeliveryPointFromServiceSubscriber removes any associations from a subscription list to a dp with missing subscriptions.
func (r *PushRedisDB) removeMissingDeliveryPointFromServiceSubscriber(service, subscriber, dpName string, logger log.Logger) {
// Precondition: DeliveryPointPrefix + dp was already missing. No need to remove it.
e0 := r.client.SRem(ServiceSubscriberToDeliveryPointsPrefix+service+":"+subscriber, dpName).Err()
if e0 != nil {
logger.Errorf("Error cleaning up delivery point with missing data for dp %q service %q FROM user %q's delivery points: %v", dpName, subscriber, service, e0)
}
e1 := r.client.Del(DeliveryPointCounterPrefix + dpName).Err() // TODO: Err instead
if e1 != nil {
logger.Errorf("Error cleaning up count for delivery point with missing data for delivery point %q (while processing subscriber %q, service %q): %v", dpName, subscriber, service, e1)
}
}
// SetPushServiceProviderOfServiceDeliveryPoint will set the name of the push service provider
// to use when sending pushes to the given delivery point of this service name.
func (r *PushRedisDB) SetPushServiceProviderOfServiceDeliveryPoint(srv, dp, psp string) error {
err := r.client.Set(ServiceDeliveryPointToPushServiceProviderPrefix+srv+":"+dp, psp, 0).Err()
if err != nil {
return fmt.Errorf("SetPSPOfServiceDP failed for \"%s:%s\": %v", srv, dp, err)
}
return nil
}
// RemovePushServiceProviderOfServiceDeliveryPoint is used when removing a push service provider, to clean up the association to the name of the push service provider for the delivery point+service name.
func (r *PushRedisDB) RemovePushServiceProviderOfServiceDeliveryPoint(srv, dp string) error {
err := r.client.Del(ServiceDeliveryPointToPushServiceProviderPrefix + srv + ":" + dp).Err()
if err != nil {
return fmt.Errorf("RemovePSPOfServiceDP failed for \"%s:%s\": %v", srv, dp, err)
}
return err
}
// GetPushServiceProvidersByService will return a list of the names of push service providers belonging to the given service name
func (r *PushRedisDB) GetPushServiceProvidersByService(srv string) ([]string, error) {
m, err := r.client.SMembers(ServiceToPushServiceProvidersPrefix + srv).Result()
if err != nil {
return nil, fmt.Errorf("GetPSPsByService failed for %q: %v", srv, err)
}
if m == nil {
return nil, nil
}
ret := append([]string{}, m...)
return ret, nil
}
// RemovePushServiceProviderFromService will remove the given push service provider from the list of services (and remove the service from the list of services, if this results in the service having 0 subscriptions)
func (r *PushRedisDB) RemovePushServiceProviderFromService(srv, psp string) error {
err := r.client.SRem(ServiceToPushServiceProvidersPrefix+srv, psp).Err()
if err != nil {
return fmt.Errorf("RemovePSPFromService failed for psp %q of service %q: %v", psp, srv, err)
}
// A service name can be associated with multiple push service providers, so we must first check if there are no more push service providers of that type
// The API /addpsp allows psps with the same service name but different pushservicetypes (e.g. gcm, apns).
exists, err := r.client.Exists(ServiceToPushServiceProvidersPrefix + srv).Result()
if err != nil {
return fmt.Errorf("Unable to determine if service %q still exists after removing psp %q: %v", srv, psp, err)
}
if exists == 0 {
err := r.client.SRem(ServicesSet, srv).Err() // Non-essential. Used to list services in API.
if err != nil {
return fmt.Errorf("Unable to remove %q from set of services", srv)
}
}
return nil
}
// AddPushServiceProviderToService will add the push service provider's name to the list of PSPs for this service.
func (r *PushRedisDB) AddPushServiceProviderToService(srv, psp string) error {
// TODO: pipelined
err := r.client.SAdd(ServicesSet, srv).Err() // Used to list services in API.
if err != nil {
return fmt.Errorf("Unable to add %q to set of services", srv)
}
err = r.client.SAdd(ServiceToPushServiceProvidersPrefix+srv, psp).Err()
if err != nil {
return fmt.Errorf("AddPSPToService failed for psp %q of service %q: %v", psp, srv, err)
}
return nil
}
// GetServiceNames will return the list of all services that have 1 or more push service providers.
func (r *PushRedisDB) GetServiceNames() ([]string, error) {
serviceList, err := r.client.SMembers(ServicesSet).Result()
if err != nil {
return nil, fmt.Errorf("Could not get services from redis: %v", err)
}
return serviceList, nil
}
// RebuildServiceSet builds the set of unique service. It should only be needed for migrating from old uniqush installations.
func (r *PushRedisDB) RebuildServiceSet() error {
// Run KEYS, then replace the PSP set with the result of KEYS.
// If any step fails, then return an error.
pspKeys, err := r.client.Keys(PushServiceProviderPrefix + "*").Result()
if err != nil {
return fmt.Errorf("Failed to fetch PSPs using redis KEYS command: %v", err)
}
if len(pspKeys) == 0 {
return nil
}
pspNames := make([]string, len(pspKeys))
N := len(PushServiceProviderPrefix)
for i, key := range pspKeys {
if len(key) < N || key[:N] != PushServiceProviderPrefix {
return fmt.Errorf("KEYS %s* returned %q - this shouldn't happen", PushServiceProviderPrefix, key)
}
pspNames[i] = key[N:]
}
psps, errs := r.GetPushServiceProviderConfigs(pspNames)
if len(errs) > 0 {
return fmt.Errorf("RebuildServiceSet: found one or more invalid psps: %v", errs)
}
serviceNameSet := make(map[string]bool)
for i, psp := range psps {
serviceName, ok := psp.FixedData["service"]
if !ok || serviceName == "" {
return fmt.Errorf("RebuildServiceSet: found PSP %q with empty service name: data=%v", pspNames[i], psp)
}
serviceNameSet[serviceName] = true
}
var serviceNameList []interface{}
for serviceName := range serviceNameSet {
serviceNameList = append(serviceNameList, serviceName)
}
if len(serviceNameList) > 0 {
err := r.client.SAdd(ServicesSet, serviceNameList...).Err()
if err != nil {
return err
}
}
return nil
}
// FlushCache will ensure that redis data has been saved to disk.
func (r *PushRedisDB) FlushCache() error {
// TODO: Make this configurable, allow uniqush configs to prevent redis flushes, e.g. if redis backups are set up already.
return r.client.Save().Err()
}
// GetSubscriptions will fetch the subscriptions of the given subscriber belonging to the given service list.
// If queryServices is empty, then this will fetch subscriptions from all known services.
func (r *PushRedisDB) GetSubscriptions(queryServices []string, subscriber string, logger log.Logger) ([]map[string]string, error) {
if len(queryServices) == 0 {
definedServices, err := r.GetServiceNames()
if err != nil {
return nil, fmt.Errorf("GetSubscriptions: %v", err)
}
queryServices = definedServices
}
var serviceForDeliveryPointNames []string
var deliveryPointNames []string
for _, service := range queryServices {
if service == "" {
logger.Errorf("empty service defined")
continue
}
deliveryPoints, err := r.client.SMembers(ServiceSubscriberToDeliveryPointsPrefix + service + ":" + subscriber).Result()
if err != nil {
return nil, fmt.Errorf("Could not get subscriber information")
}
if len(deliveryPoints) == 0 {
// it is OK to not have delivery points for a service
continue
}
for _, deliveryPointName := range deliveryPoints {
deliveryPointNames = append(deliveryPointNames, deliveryPointName)
serviceForDeliveryPointNames = append(serviceForDeliveryPointNames, service)
}
}
if len(deliveryPointNames) == 0 {
// Return empty map without error.
return make([]map[string]string, 0), nil
}
deliveryPointData, err := r.mgetRawDeliveryPoints(deliveryPointNames...)
if err != nil {
return nil, err
}
// Unserialize the subscriptions. If there are any invalid subscriptions, remove them and log it.
// serviceForDeliveryPointNames, deliveryPointNames, and deliveryPointData all use the same index i.
var subscriptions []map[string]string
for i, data := range deliveryPointData {
dpName := deliveryPointNames[i]
service := serviceForDeliveryPointNames[i]
if data != nil {
subscriptionData, err := push.UnserializeSubscription(data)
if err != nil {
logger.Errorf("Error unserializing subscription for delivery point data for dp %q user %q service %q data %v: %v", dpName, subscriber, service, subscriptionData, err)
continue
}
// DeliveryPointID is for use by clients which wish to remove subscriptions unambiguously
subscriptionData[DeliveryPointID] = dpName
subscriptions = append(subscriptions, subscriptionData)
} else {
logger.Errorf("Redis error fetching subscriber delivery point data for dp %q user %q service %q, removing...", dpName, subscriber, service)
// The multi-get returned nil, so this key is missing.
// Try to remove this delivery point as cleanly as possible, removing counts, etc.
r.removeMissingDeliveryPointFromServiceSubscriber(service, subscriber, dpName, logger)
}
}
return subscriptions, nil
}