forked from saiya/dsps
/
redis_connection.go
109 lines (91 loc) · 3.23 KB
/
redis_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package internal
import (
"context"
"github.com/go-redis/redis/extra/redisotel"
"github.com/go-redis/redis/v8"
"github.com/m3dev/dsps/server/config"
"github.com/m3dev/dsps/server/logger"
"github.com/m3dev/dsps/server/storage/redis/internal/pubsub"
)
// RedisConnection represents Redis connection system
type RedisConnection struct {
RedisCmd RedisCmd
Close func() error
IsSingleNode bool
IsCluster bool
MaxConnections int
}
// NewRedisConnection establish connection pool to Redis server.
func NewRedisConnection(ctx context.Context, config *config.RedisStorageConfig) (RedisConnection, error) {
var conn RedisConnection
if config.SingleNode != nil {
conn = createClientSingleNode(ctx, config)
} else {
conn = createClientCluster(ctx, config)
}
if err := conn.RedisCmd.Ping(ctx); err != nil {
if err := conn.Close(); err != nil {
logger.Of(ctx).InfoError(logger.CatStorage, "Failed to close Redis connection after initial ping failure", err)
}
return RedisConnection{}, err
}
return conn, nil
}
func createClientSingleNode(ctx context.Context, config *config.RedisStorageConfig) RedisConnection {
c := redis.NewClient(&redis.Options{
Addr: *config.SingleNode,
DB: config.DBNumber,
Username: config.Username,
Password: config.Password,
DialTimeout: config.Timeout.Connect.Duration,
ReadTimeout: config.Timeout.Read.Duration,
WriteTimeout: config.Timeout.Write.Duration,
MaxRetries: *config.Retry.Count,
MinRetryBackoff: config.Retry.Interval.Duration - config.Retry.IntervalJitter.Duration,
MaxRetryBackoff: config.Retry.Interval.Duration + config.Retry.IntervalJitter.Duration,
MinIdleConns: *config.Connection.Min,
PoolSize: *config.Connection.Max,
IdleTimeout: config.Connection.MaxIdleTime.Duration,
})
c.AddHook(redisotel.TracingHook{})
return RedisConnection{
RedisCmd: NewRedisCmd(c, func(ctx context.Context, channel pubsub.RedisChannelID) pubsub.RedisRawPubSub {
return c.PSubscribe(ctx, string(channel))
}),
Close: func() error {
return c.Close()
},
IsSingleNode: true,
MaxConnections: *config.Connection.Max,
}
}
func createClientCluster(ctx context.Context, config *config.RedisStorageConfig) RedisConnection {
c := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: *config.Cluster,
Username: config.Username,
Password: config.Password,
DialTimeout: config.Timeout.Connect.Duration,
ReadTimeout: config.Timeout.Read.Duration,
WriteTimeout: config.Timeout.Write.Duration,
MaxRetries: *config.Retry.Count,
MinRetryBackoff: config.Retry.Interval.Duration - config.Retry.IntervalJitter.Duration,
MaxRetryBackoff: config.Retry.Interval.Duration + config.Retry.IntervalJitter.Duration,
MinIdleConns: *config.Connection.Min,
PoolSize: *config.Connection.Max,
IdleTimeout: config.Connection.MaxIdleTime.Duration,
// --- Cluster specific config items ---
MaxRedirects: 3,
ReadOnly: false,
})
c.AddHook(redisotel.TracingHook{})
return RedisConnection{
RedisCmd: NewRedisCmd(c, func(ctx context.Context, channel pubsub.RedisChannelID) pubsub.RedisRawPubSub {
return c.PSubscribe(ctx, string(channel))
}),
Close: func() error {
return c.Close()
},
IsCluster: true,
MaxConnections: *config.Connection.Max,
}
}