/
redis.go
98 lines (84 loc) · 2.78 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package persistence
import (
redigo "github.com/gomodule/redigo/redis"
"github.com/morgeq/iotfast/server/mqtt/persistence/queue"
redis_queue "github.com/morgeq/iotfast/server/mqtt/persistence/queue/redis"
"github.com/morgeq/iotfast/server/mqtt/persistence/session"
redis_sess "github.com/morgeq/iotfast/server/mqtt/persistence/session/redis"
"github.com/morgeq/iotfast/server/mqtt/persistence/subscription"
redis_sub "github.com/morgeq/iotfast/server/mqtt/persistence/subscription/redis"
"github.com/morgeq/iotfast/server/mqtt/persistence/unack"
redis_unack "github.com/morgeq/iotfast/server/mqtt/persistence/unack/redis"
"github.com/morgeq/iotfast/server/mqtt/config"
"github.com/morgeq/iotfast/server/mqtt/server"
)
func init() {
server.RegisterPersistenceFactory("redis", NewRedis)
}
func NewRedis(config config.Config) (server.Persistence, error) {
return &redis{
config: config,
}, nil
}
type redis struct {
pool *redigo.Pool
config config.Config
onMsgDropped server.OnMsgDropped
}
func (r *redis) NewUnackStore(config config.Config, clientID string) (unack.Store, error) {
return redis_unack.New(redis_unack.Options{
ClientID: clientID,
Pool: r.pool,
}), nil
}
func (r *redis) NewSessionStore(config config.Config) (session.Store, error) {
return redis_sess.New(r.pool), nil
}
func newPool(config config.Config) *redigo.Pool {
return &redigo.Pool{
// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
Dial: func() (redigo.Conn, error) {
c, err := redigo.Dial("tcp", config.Persistence.Redis.Addr)
if err != nil {
return nil, err
}
if pswd := config.Persistence.Redis.Password; pswd != "" {
if _, err := c.Do("AUTH", pswd); err != nil {
c.Close()
return nil, err
}
}
if _, err := c.Do("SELECT", config.Persistence.Redis.Database); err != nil {
c.Close()
return nil, err
}
return c, nil
},
}
}
func (r *redis) Open() error {
r.pool = newPool(r.config)
r.pool.MaxIdle = int(*r.config.Persistence.Redis.MaxIdle)
r.pool.MaxActive = int(*r.config.Persistence.Redis.MaxActive)
r.pool.IdleTimeout = r.config.Persistence.Redis.IdleTimeout
conn := r.pool.Get()
defer conn.Close()
// Test the connection
_, err := conn.Do("PING")
return err
}
func (r *redis) NewQueueStore(config config.Config, defaultNotifier queue.Notifier, clientID string) (queue.Store, error) {
return redis_queue.New(redis_queue.Options{
MaxQueuedMsg: config.MQTT.MaxQueuedMsg,
InflightExpiry: config.MQTT.InflightExpiry,
ClientID: clientID,
Pool: r.pool,
DefaultNotifier: defaultNotifier,
})
}
func (r *redis) NewSubscriptionStore(config config.Config) (subscription.Store, error) {
return redis_sub.New(r.pool), nil
}
func (r *redis) Close() error {
return r.pool.Close()
}