Skip to content

Commit 2559f32

Browse files
committed
cluster: optimize newClusterState
1 parent 4939454 commit 2559f32

File tree

6 files changed

+109
-61
lines changed

6 files changed

+109
-61
lines changed

cluster.go

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"math"
99
"math/rand"
1010
"net"
11-
"strings"
11+
"sort"
1212
"sync"
1313
"sync/atomic"
1414
"time"
@@ -387,12 +387,31 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
387387

388388
//------------------------------------------------------------------------------
389389

390+
type clusterSlot struct {
391+
start, end int
392+
nodes []*clusterNode
393+
}
394+
395+
type clusterSlotSlice []*clusterSlot
396+
397+
func (p clusterSlotSlice) Len() int {
398+
return len(p)
399+
}
400+
401+
func (p clusterSlotSlice) Less(i, j int) bool {
402+
return p[i].start < p[j].start
403+
}
404+
405+
func (p clusterSlotSlice) Swap(i, j int) {
406+
p[i], p[j] = p[j], p[i]
407+
}
408+
390409
type clusterState struct {
391410
nodes *clusterNodes
392411
Masters []*clusterNode
393412
Slaves []*clusterNode
394413

395-
slots [][]*clusterNode
414+
slots []*clusterSlot
396415

397416
generation uint32
398417
createdAt time.Time
@@ -404,7 +423,7 @@ func newClusterState(
404423
c := clusterState{
405424
nodes: nodes,
406425

407-
slots: make([][]*clusterNode, hashtag.SlotNumber),
426+
slots: make([]*clusterSlot, 0, len(slots)),
408427

409428
generation: nodes.NextGeneration(),
410429
createdAt: time.Now(),
@@ -434,11 +453,15 @@ func newClusterState(
434453
}
435454
}
436455

437-
for i := slot.Start; i <= slot.End; i++ {
438-
c.slots[i] = nodes
439-
}
456+
c.slots = append(c.slots, &clusterSlot{
457+
start: slot.Start,
458+
end: slot.End,
459+
nodes: nodes,
460+
})
440461
}
441462

463+
sort.Sort(clusterSlotSlice(c.slots))
464+
442465
time.AfterFunc(time.Minute, func() {
443466
nodes.GC(c.generation)
444467
})
@@ -506,8 +529,15 @@ func (c *clusterState) slotRandomNode(slot int) *clusterNode {
506529
}
507530

508531
func (c *clusterState) slotNodes(slot int) []*clusterNode {
509-
if slot >= 0 && slot < len(c.slots) {
510-
return c.slots[slot]
532+
i := sort.Search(len(c.slots), func(i int) bool {
533+
return c.slots[i].end >= slot
534+
})
535+
if i >= len(c.slots) {
536+
return nil
537+
}
538+
x := c.slots[i]
539+
if slot >= x.start && slot <= x.end {
540+
return x.nodes
511541
}
512542
return nil
513543
}
@@ -516,26 +546,7 @@ func (c *clusterState) IsConsistent() bool {
516546
if c.nodes.opt.ClusterSlots != nil {
517547
return true
518548
}
519-
520-
if len(c.Masters) > len(c.Slaves) {
521-
return false
522-
}
523-
524-
for _, master := range c.Masters {
525-
s := master.Client.Info("replication").Val()
526-
if !strings.Contains(s, "role:master") {
527-
return false
528-
}
529-
}
530-
531-
for _, slave := range c.Slaves {
532-
s := slave.Client.Info("replication").Val()
533-
if !strings.Contains(s, "role:slave") {
534-
return false
535-
}
536-
}
537-
538-
return true
549+
return len(c.Masters) <= len(c.Slaves)
539550
}
540551

541552
//------------------------------------------------------------------------------
@@ -563,7 +574,7 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) {
563574
return nil, err
564575
}
565576
if !state.IsConsistent() {
566-
c.LazyReload()
577+
time.AfterFunc(time.Second, c.LazyReload)
567578
}
568579
return state, nil
569580
}
@@ -843,6 +854,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
843854
}
844855

845856
if internal.IsRetryableError(err, true) {
857+
c.state.LazyReload()
846858
continue
847859
}
848860

@@ -929,12 +941,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
929941
}
930942

931943
if internal.IsRetryableError(err, true) {
932-
// Firstly retry the same node.
944+
c.state.LazyReload()
945+
946+
// First retry the same node.
933947
if attempt == 0 {
934948
continue
935949
}
936950

937-
// Secondly try random node.
951+
// Second try random node.
938952
node, err = c.nodes.Random()
939953
if err != nil {
940954
break

cluster_test.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,19 @@ func (s *clusterScenario) addrs() []string {
5151
func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
5252
opt.Addrs = s.addrs()
5353
client := redis.NewClusterClient(opt)
54-
Eventually(func() bool {
54+
err := eventually(func() error {
5555
state, err := client.GetState()
5656
if err != nil {
57-
return false
57+
return err
58+
}
59+
if !state.IsConsistent() {
60+
return fmt.Errorf("cluster state is not conistent")
5861
}
59-
return state.IsConsistent()
60-
}, 30*time.Second).Should(BeTrue())
62+
return nil
63+
}, 30*time.Second)
64+
if err != nil {
65+
panic(err)
66+
}
6167
return client
6268
}
6369

@@ -935,18 +941,21 @@ var _ = Describe("ClusterClient timeout", func() {
935941

936942
//------------------------------------------------------------------------------
937943

938-
func BenchmarkRedisClusterPing(b *testing.B) {
939-
if testing.Short() {
940-
b.Skip("skipping in short mode")
941-
}
942-
943-
cluster := &clusterScenario{
944+
func newClusterScenario() *clusterScenario {
945+
return &clusterScenario{
944946
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
945947
nodeIds: make([]string, 6),
946948
processes: make(map[string]*redisProcess, 6),
947949
clients: make(map[string]*redis.Client, 6),
948950
}
951+
}
952+
953+
func BenchmarkRedisClusterPing(b *testing.B) {
954+
if testing.Short() {
955+
b.Skip("skipping in short mode")
956+
}
949957

958+
cluster := newClusterScenario()
950959
if err := startCluster(cluster); err != nil {
951960
b.Fatal(err)
952961
}
@@ -959,7 +968,8 @@ func BenchmarkRedisClusterPing(b *testing.B) {
959968

960969
b.RunParallel(func(pb *testing.PB) {
961970
for pb.Next() {
962-
if err := client.Ping().Err(); err != nil {
971+
err := client.Ping().Err()
972+
if err != nil {
963973
b.Fatal(err)
964974
}
965975
}
@@ -971,13 +981,7 @@ func BenchmarkRedisClusterSetString(b *testing.B) {
971981
b.Skip("skipping in short mode")
972982
}
973983

974-
cluster := &clusterScenario{
975-
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
976-
nodeIds: make([]string, 6),
977-
processes: make(map[string]*redisProcess, 6),
978-
clients: make(map[string]*redis.Client, 6),
979-
}
980-
984+
cluster := newClusterScenario()
981985
if err := startCluster(cluster); err != nil {
982986
b.Fatal(err)
983987
}
@@ -992,9 +996,34 @@ func BenchmarkRedisClusterSetString(b *testing.B) {
992996

993997
b.RunParallel(func(pb *testing.PB) {
994998
for pb.Next() {
995-
if err := client.Set("key", value, 0).Err(); err != nil {
999+
err := client.Set("key", value, 0).Err()
1000+
if err != nil {
9961001
b.Fatal(err)
9971002
}
9981003
}
9991004
})
10001005
}
1006+
1007+
func BenchmarkRedisClusterReloadState(b *testing.B) {
1008+
if testing.Short() {
1009+
b.Skip("skipping in short mode")
1010+
}
1011+
1012+
cluster := newClusterScenario()
1013+
if err := startCluster(cluster); err != nil {
1014+
b.Fatal(err)
1015+
}
1016+
defer stopCluster(cluster)
1017+
1018+
client := cluster.clusterClient(redisClusterOptions())
1019+
defer client.Close()
1020+
1021+
b.ResetTimer()
1022+
1023+
for i := 0; i < b.N; i++ {
1024+
err := client.ReloadState()
1025+
if err != nil {
1026+
b.Fatal(err)
1027+
}
1028+
}
1029+
}

export_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (c *ClusterClient) Nodes(key string) ([]*clusterNode, error) {
4949
}
5050

5151
slot := hashtag.Slot(key)
52-
nodes := state.slots[slot]
52+
nodes := state.slotNodes(slot)
5353
if len(nodes) != 2 {
5454
return nil, fmt.Errorf("slot=%d does not have enough nodes: %v", slot, nodes)
5555
}

internal/hashtag/hashtag.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"strings"
66
)
77

8-
const SlotNumber = 16384
8+
const slotNumber = 16384
99

1010
// CRC16 implementation according to CCITT standards.
1111
// Copyright 2001-2010 Georges Menie (www.menie.org)
@@ -56,7 +56,7 @@ func Key(key string) string {
5656
}
5757

5858
func RandomSlot() int {
59-
return rand.Intn(SlotNumber)
59+
return rand.Intn(slotNumber)
6060
}
6161

6262
// hashSlot returns a consistent slot number between 0 and 16383
@@ -66,7 +66,7 @@ func Slot(key string) int {
6666
return RandomSlot()
6767
}
6868
key = Key(key)
69-
return int(crc16sum(key)) % SlotNumber
69+
return int(crc16sum(key)) % slotNumber
7070
}
7171

7272
func crc16sum(key string) (crc uint16) {

main_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"os/exec"
99
"path/filepath"
1010
"sync"
11-
"sync/atomic"
1211
"testing"
1312
"time"
1413

@@ -169,32 +168,36 @@ func perform(n int, cbs ...func(int)) {
169168
}
170169

171170
func eventually(fn func() error, timeout time.Duration) error {
172-
var exit int32
173171
errCh := make(chan error)
174172
done := make(chan struct{})
173+
exit := make(chan struct{})
175174

176175
go func() {
177-
defer GinkgoRecover()
178-
179-
for atomic.LoadInt32(&exit) == 0 {
176+
for {
180177
err := fn()
181178
if err == nil {
182179
close(done)
183180
return
184181
}
182+
185183
select {
186184
case errCh <- err:
187185
default:
188186
}
189-
time.Sleep(timeout / 100)
187+
188+
select {
189+
case <-exit:
190+
return
191+
case <-time.After(timeout / 100):
192+
}
190193
}
191194
}()
192195

193196
select {
194197
case <-done:
195198
return nil
196199
case <-time.After(timeout):
197-
atomic.StoreInt32(&exit, 1)
200+
close(exit)
198201
select {
199202
case err := <-errCh:
200203
return err

ring.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ type ringShards struct {
170170

171171
func newRingShards(opt *RingOptions) *ringShards {
172172
return &ringShards{
173+
opt: opt,
174+
173175
hash: newConsistentHash(opt),
174176
shards: make(map[string]*ringShard),
175177
}

0 commit comments

Comments
 (0)