11package redis
22
33import (
4- "errors"
5- "io"
64 "math/rand"
7- "net"
85 "strings"
96 "sync"
107 "sync/atomic"
118 "time"
129)
1310
11+ func removeDuplicates (slice []string ) []string {
12+ seen := make (map [string ]struct {}, len (slice ))
13+ for i := 0 ; i < len (slice ); {
14+ addr := slice [i ]
15+ if _ , ok := seen [addr ]; ok {
16+ slice = append (slice [:i ], slice [i + 1 :]... )
17+ } else {
18+ seen [addr ] = struct {}{}
19+ i ++
20+ }
21+ }
22+ return slice
23+ }
24+
1425type ClusterClient struct {
1526 commandable
1627
17- addrs map [ string ] struct {}
28+ addrs [] string
1829 slots [][]string
1930 slotsMx sync.RWMutex // protects slots & addrs cache
2031
21- conns map [string ]* Client
22- connsMx sync.Mutex // protects conns
32+ clients map [string ]* Client
33+ clientsMx sync.RWMutex // protects clients
2334
2435 opt * ClusterOptions
2536
@@ -28,95 +39,122 @@ type ClusterClient struct {
2839
2940// NewClusterClient initializes a new cluster-aware client using given options.
3041// A list of seed addresses must be provided.
31- func NewClusterClient (opt * ClusterOptions ) (* ClusterClient , error ) {
32- addrs , err := opt .getAddrSet ()
33- if err != nil {
34- return nil , err
35- }
36-
42+ func NewClusterClient (opt * ClusterOptions ) * ClusterClient {
3743 client := & ClusterClient {
38- addrs : addrs ,
39- conns : make (map [string ]* Client ),
44+ addrs : opt . getAddrs () ,
45+ clients : make (map [string ]* Client ),
4046 opt : opt ,
4147 _reload : 1 ,
4248 }
4349 client .commandable .process = client .process
44- client .reloadIfDue ()
4550 go client .reaper (time .NewTicker (5 * time .Minute ))
46- return client , nil
51+ return client
4752}
4853
49- // Close closes the cluster connection
54+ // Close closes the cluster client.
5055func (c * ClusterClient ) Close () error {
51- c .slotsMx .Lock ()
52- defer c .slotsMx .Unlock ()
53-
54- return c .reset ()
56+ // TODO: close should make client unusable
57+ c .setSlots (nil )
58+ return nil
5559}
5660
5761// ------------------------------------------------------------------------
5862
59- // Finds the current master address for a given hash slot
60- func (c * ClusterClient ) getMasterAddrBySlot (hashSlot int ) string {
61- if addrs := c .slots [hashSlot ]; len (addrs ) > 0 {
62- return addrs [0 ]
63+ // getClient returns a Client for a given address.
64+ func (c * ClusterClient ) getClient (addr string ) * Client {
65+ c .clientsMx .RLock ()
66+ client , ok := c .clients [addr ]
67+ if ok {
68+ c .clientsMx .RUnlock ()
69+ return client
6370 }
64- return ""
65- }
71+ c .clientsMx .RUnlock ()
6672
67- // Returns a node's client for a given address
68- func (c * ClusterClient ) getNodeClientByAddr (addr string ) * Client {
69- c .connsMx .Lock ()
70- client , ok := c .conns [addr ]
73+ c .clientsMx .Lock ()
74+ client , ok = c .clients [addr ]
7175 if ! ok {
7276 opt := c .opt .clientOptions ()
7377 opt .Addr = addr
7478 client = NewTCPClient (opt )
75- c .conns [addr ] = client
79+ c .clients [addr ] = client
7680 }
77- c .connsMx .Unlock ()
81+ c .clientsMx .Unlock ()
82+
7883 return client
7984}
8085
86+ // randomClient returns a Client for the first live node.
87+ func (c * ClusterClient ) randomClient () (client * Client , err error ) {
88+ for i := 0 ; i < 10 ; i ++ {
89+ n := rand .Intn (len (c .addrs ))
90+ client = c .getClient (c .addrs [n ])
91+ err = client .Ping ().Err ()
92+ if err == nil {
93+ return client , nil
94+ }
95+ }
96+ return nil , err
97+ }
98+
8199// Process a command
82100func (c * ClusterClient ) process (cmd Cmder ) {
101+ var client * Client
83102 var ask bool
84103
85104 c .reloadIfDue ()
86105
87- hashSlot := hashSlot (cmd .clusterKey ())
106+ slot := hashSlot (cmd .clusterKey ())
88107
89108 c .slotsMx .RLock ()
90109 defer c .slotsMx .RUnlock ()
91110
92- tried := make (map [string ]struct {}, len (c .addrs ))
93- addr := c .getMasterAddrBySlot (hashSlot )
94- for attempt := 0 ; attempt <= c .opt .getMaxRedirects (); attempt ++ {
95- tried [addr ] = struct {}{}
111+ addrs := c .slots [slot ]
112+ if len (addrs ) > 0 {
113+ // First address is master.
114+ client = c .getClient (addrs [0 ])
115+ } else {
116+ var err error
117+ client , err = c .randomClient ()
118+ if err != nil {
119+ cmd .setErr (err )
120+ return
121+ }
122+ }
123+
124+ // Index in the addrs slice pointing to the next replica.
125+ replicaIndex := 1
96126
97- // Pick the connection, process request
98- conn := c .getNodeClientByAddr (addr )
127+ for attempt := 0 ; attempt <= c .opt .getMaxRedirects (); attempt ++ {
99128 if ask {
100- pipe := conn .Pipeline ()
129+ pipe := client .Pipeline ()
101130 pipe .Process (NewCmd ("ASKING" ))
102131 pipe .Process (cmd )
103132 _ , _ = pipe .Exec ()
104133 ask = false
105134 } else {
106- conn .Process (cmd )
135+ client .Process (cmd )
107136 }
108137
109138 // If there is no (real) error, we are done!
110139 err := cmd .Err ()
111- if err == nil || err == Nil {
140+ if err == nil || err == Nil || err == TxFailedErr {
112141 return
113142 }
114143
115- // On connection errors, pick a random, previosuly untried connection
116- // and request again.
117- if _ , ok := err .(* net.OpError ); ok || err == io .EOF {
118- if addr = c .findNextAddr (tried ); addr == "" {
119- return
144+ // On network errors try another node.
145+ if isNetworkError (err ) {
146+ if replicaIndex < len (addrs ) {
147+ // Try next available replica.
148+ client = c .getClient (addrs [replicaIndex ])
149+ replicaIndex ++
150+ cmd .reset ()
151+ continue
152+ } else {
153+ // Otherwise try random node.
154+ client , err = c .randomClient ()
155+ if err != nil {
156+ return
157+ }
120158 }
121159 cmd .reset ()
122160 continue
@@ -131,94 +169,70 @@ func (c *ClusterClient) process(cmd Cmder) {
131169 // Handle MOVE and ASK redirections, return on any other error
132170 switch parts [0 ] {
133171 case "MOVED" :
134- c .forceReload ()
135- addr = parts [2 ]
172+ c .scheduleReload ()
173+ client = c . getClient ( parts [2 ])
136174 case "ASK" :
137175 ask = true
138- addr = parts [2 ]
176+ client = c . getClient ( parts [2 ])
139177 default :
140178 return
141179 }
142180 cmd .reset ()
143181 }
144182}
145183
146- // Closes all connections and reloads slot cache, if due
147- func (c * ClusterClient ) reloadIfDue () (err error ) {
148- if ! atomic .CompareAndSwapUint32 (& c ._reload , 1 , 0 ) {
149- return
184+ // Closes all clients and returns last error if there are any.
185+ func (c * ClusterClient ) resetClients () (err error ) {
186+ c .clientsMx .Lock ()
187+ for addr , client := range c .clients {
188+ if e := client .Close (); e != nil {
189+ err = e
190+ }
191+ delete (c .clients , addr )
150192 }
193+ c .clientsMx .Unlock ()
194+ return err
195+ }
151196
152- var infos []ClusterSlotInfo
153-
197+ func (c * ClusterClient ) setSlots (slots []ClusterSlotInfo ) {
154198 c .slotsMx .Lock ()
155- defer c .slotsMx .Unlock ()
156-
157- // Try known addresses in random order (map interation order is random in Go)
158- // http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections
159- // https://github.com/antirez/redis-rb-cluster/blob/fd931ed/cluster.rb#L157
160- for addr := range c .addrs {
161- c .reset ()
162199
163- infos , err = c . fetchClusterSlots ( addr )
164- if err == nil {
165- c . update ( infos )
166- break
200+ c . slots = make ([][] string , hashSlots )
201+ for _ , info := range slots {
202+ for i := info . Start ; i <= info . End ; i ++ {
203+ c . slots [ i ] = info . Addrs
167204 }
205+ c .addrs = append (c .addrs , info .Addrs ... )
168206 }
169- return
170- }
207+ c . addrs = removeDuplicates ( c . addrs )
208+ c . resetClients ()
171209
172- // Closes all connections and flushes slots cache
173- func (c * ClusterClient ) reset () (err error ) {
174- c .connsMx .Lock ()
175- for addr , client := range c .conns {
176- if e := client .Close (); e != nil {
177- err = e
178- }
179- delete (c .conns , addr )
180- }
181- c .connsMx .Unlock ()
182- c .slots = make ([][]string , hashSlots )
183- return
210+ c .slotsMx .Unlock ()
184211}
185212
186- // Forces a cache reload on next request
187- func (c * ClusterClient ) forceReload () {
188- atomic .StoreUint32 (& c ._reload , 1 )
189- }
213+ // Closes all connections and reloads slot cache, if due.
214+ func (c * ClusterClient ) reloadIfDue () (err error ) {
215+ if ! atomic .CompareAndSwapUint32 (& c ._reload , 1 , 0 ) {
216+ return
217+ }
190218
191- // Find the next untried address
192- func (c * ClusterClient ) findNextAddr (tried map [string ]struct {}) string {
193- for addr := range c .addrs {
194- if _ , ok := tried [addr ]; ! ok {
195- return addr
196- }
219+ client , err := c .randomClient ()
220+ if err != nil {
221+ return err
197222 }
198- return ""
199- }
200223
201- // Fetch slot information
202- func (c * ClusterClient ) fetchClusterSlots (addr string ) ([]ClusterSlotInfo , error ) {
203- opt := c .opt .clientOptions ()
204- opt .Addr = addr
205- client := NewClient (opt )
206- defer client .Close ()
224+ slots , err := client .ClusterSlots ().Result ()
225+ if err != nil {
226+ return err
227+ }
228+ c .setSlots (slots )
207229
208- return client . ClusterSlots (). Result ()
230+ return nil
209231}
210232
211- // Update slot information, populate slots
212- func (c * ClusterClient ) update (infos []ClusterSlotInfo ) {
213- for _ , info := range infos {
214- for i := info .Start ; i <= info .End ; i ++ {
215- c .slots [i ] = info .Addrs
216- }
217-
218- for _ , addr := range info .Addrs {
219- c .addrs [addr ] = struct {}{}
220- }
221- }
233+ // Schedules slots reload on next request.
234+ func (c * ClusterClient ) scheduleReload () {
235+ atomic .StoreUint32 (& c ._reload , 1 )
222236}
223237
224238// reaper closes idle connections to the cluster.
@@ -237,8 +251,6 @@ func (c *ClusterClient) reaper(ticker *time.Ticker) {
237251
238252//------------------------------------------------------------------------------
239253
240- var errNoAddrs = errors .New ("redis: no addresses" )
241-
242254type ClusterOptions struct {
243255 // A seed-list of host:port addresses of known cluster nodes
244256 Addrs []string
@@ -278,17 +290,9 @@ func (opt *ClusterOptions) getMaxRedirects() int {
278290 return opt .MaxRedirects
279291}
280292
281- func (opt * ClusterOptions ) getAddrSet () (map [string ]struct {}, error ) {
282- size := len (opt .Addrs )
283- if size < 1 {
284- return nil , errNoAddrs
285- }
286-
287- addrs := make (map [string ]struct {}, size )
288- for _ , addr := range opt .Addrs {
289- addrs [addr ] = struct {}{}
290- }
291- return addrs , nil
293+ func (opt * ClusterOptions ) getAddrs () []string {
294+ opt .Addrs = removeDuplicates (opt .Addrs )
295+ return opt .Addrs
292296}
293297
294298func (opt * ClusterOptions ) clientOptions () * Options {
0 commit comments