Skip to content

Commit

Permalink
change latency awareness in runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
funny-falcon committed Apr 9, 2019
1 parent 8aed481 commit 8a1b7b8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
17 changes: 17 additions & 0 deletions rediscluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/joomcode/errorx"
Expand Down Expand Up @@ -111,6 +112,8 @@ type Cluster struct {

opts Opts

latencyAwareness uint32

m sync.Mutex

config *clusterConfig
Expand Down Expand Up @@ -215,6 +218,11 @@ func NewCluster(ctx context.Context, initAddrs []string, opts Opts) (*Cluster, e
cluster.opts.WaitToMigrate = 100 * time.Millisecond
}

cluster.latencyAwareness = 0
if cluster.opts.LatencyOrientedRR {
cluster.latencyAwareness = 1
}

config := &clusterConfig{
nodes: make(nodeMap),
shards: make(shardMap),
Expand Down Expand Up @@ -278,6 +286,15 @@ func (c *Cluster) Handle() interface{} {
return c.opts.Handle
}

// SetLatencyOrientedRR changes "latency awareness" on the fly.
func (c *Cluster) SetLatencyOrientedRR(v bool) {
if v {
atomic.StoreUint32(&c.latencyAwareness, 1)
} else {
atomic.StoreUint32(&c.latencyAwareness, 0)
}
}

func (c *Cluster) control() {
t := time.NewTicker(c.opts.CheckInterval)
defer t.Stop()
Expand Down
15 changes: 7 additions & 8 deletions rediscluster/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,13 @@ func (cfg *clusterConfig) slot2shard(slot uint16) *shard {
}

var rr, rs = func() ([]uint32, []uint32) {
rr := make([]uint32, 32)
rs := make([]uint32, 32)
rr := make([]uint32, 32) // {1, 1, 1, ...}
rs := make([]uint32, 32) // {1, 3, 3, ...}
for i := range rr {
rr[i] = 3
rr[i] = 1
rs[i] = 3
if i == 0 {
rs[i] = 1
}
}
rs[0] = 1
return rr, rs
}()

Expand All @@ -228,7 +226,7 @@ func (c *Cluster) connForSlot(slot uint16, policy ReplicaPolicyEnum, seen []*red
conn = node.getConn(c.opts.ConnHostPolicy, preferConnected, seen)
case MasterAndSlaves, PreferSlaves:
weights := shard.weights
if !c.opts.LatencyOrientedRR {
if atomic.LoadUint32(&c.latencyAwareness) == 0 {
weights = rr
if policy == PreferSlaves {
weights = rs
Expand All @@ -242,10 +240,11 @@ func (c *Cluster) connForSlot(slot uint16, policy ReplicaPolicyEnum, seen []*red
// a bit of quadratic algorithms
for mask != 0 && conn == nil {
sumWeight := uint32(0)
for k, mm := uint32(0), mask; mm != 0; k, mm = k+1, mm&^(1<<k) {
for k, mm := uint32(0), mask; mm != 0; k++ {
if mm&(1<<k) == 0 {
continue
}
mm &^= 1 << k
sumWeight += atomic.LoadUint32(&weights[k])
}

Expand Down

0 comments on commit 8a1b7b8

Please sign in to comment.