Skip to content

Commit

Permalink
fix race in Cluster.connForSlot
Browse files Browse the repository at this point in the history
  • Loading branch information
funny-falcon committed Apr 22, 2019
1 parent 8a1b7b8 commit b212f1f
Showing 1 changed file with 30 additions and 23 deletions.
53 changes: 30 additions & 23 deletions rediscluster/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ func (cfg *clusterConfig) slot2shard(slot uint16) *shard {
return shard
}

var rr, rs = func() ([]uint32, []uint32) {
rr := make([]uint32, 32) // {1, 1, 1, ...}
rs := make([]uint32, 32) // {1, 3, 3, ...}
for i := range rr {
var rr, rs = func() ([32]uint32, [32]uint32) {
var rr [32]uint32 // {1, 1, 1, ...}
var rs [32]uint32 // {1, 3, 3, ...}
for i := range rr[:] {
rr[i] = 1
rs[i] = 3
}
Expand Down Expand Up @@ -225,44 +225,51 @@ func (c *Cluster) connForSlot(slot uint16, policy ReplicaPolicyEnum, seen []*red
}
conn = node.getConn(c.opts.ConnHostPolicy, preferConnected, seen)
case MasterAndSlaves, PreferSlaves:
weights := shard.weights
var ws [32]uint32
if atomic.LoadUint32(&c.latencyAwareness) == 0 {
weights = rr
ws = rr
if policy == PreferSlaves {
weights = rs
ws = rs
}
} else {
for i := range shard.weights {
ws[i] = atomic.LoadUint32(&shard.weights[i])
}
}
weights := ws[:len(shard.weights)]

health := atomic.LoadUint32(&shard.good) // load health information
healthWeight := uint32(0)
for i, w := range weights {
if health&(1<<uint(i)) == 0 {
continue
}
healthWeight += w
}

off := c.opts.RoundRobinSeed.Current()

// First, we try already established connections.
// If no one found, then connections thar are connecting at the moment are tried.
for _, needState := range []int{needConnected, mayBeConnected} {
mask := atomic.LoadUint32(&shard.good) // load health information
mask, maskWeight := health, healthWeight
// a bit of quadratic algorithms
for mask != 0 && conn == nil {
sumWeight := uint32(0)
for k, mm := uint32(0), mask; mm != 0; k++ {
if mm&(1<<k) == 0 {
continue
}
mm &^= 1 << k
sumWeight += atomic.LoadUint32(&weights[k])
}

r := nextRng(&off, sumWeight)

k := uint32(0)
for ; ; k++ {
if mask&(1<<k) == 0 {
r := nextRng(&off, maskWeight)
k := uint(0)
for i, w := range weights {
if mask&(1<<uint(i)) == 0 {
continue
}
w := atomic.LoadUint32(&weights[k])
if r < w {
k = uint(i)
break
}
r -= w
}

mask &^= 1 << k
maskWeight -= weights[k]
addr = shard.addr[k]
node := nodes[addr]
if node == nil {
Expand Down

0 comments on commit b212f1f

Please sign in to comment.