Skip to content

Commit

Permalink
Merge pull request #37 from rueian/fix-cluster-failover
Browse files Browse the repository at this point in the history
fix: cluster failover to the same hostname and broken pipe in sync mode
  • Loading branch information
rueian committed May 15, 2022
2 parents ba6444e + fdedc0a commit f7c3276
Show file tree
Hide file tree
Showing 13 changed files with 472 additions and 73 deletions.
8 changes: 8 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type mockConn struct {
AcquireFn func() wire
StoreFn func(w wire)
OverrideFn func(c conn)
IsFn func(addr string) bool
}

func (m *mockConn) Override(c conn) {
Expand Down Expand Up @@ -98,6 +99,13 @@ func (m *mockConn) Close() {
}
}

func (m *mockConn) Is(addr string) bool {
if m.IsFn != nil {
return m.IsFn(addr)
}
return false
}

func TestNewSingleClientNoNode(t *testing.T) {
if _, err := newSingleClient(&ClientOption{}, nil, func(dst string, opt *ClientOption) conn {
return nil
Expand Down
49 changes: 29 additions & 20 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,34 @@ retry:
groups := parseSlots(reply)

// TODO support read from replicas
masters := make(map[string]conn, len(groups))
for addr := range groups {
masters[addr] = c.connFn(addr, c.opt)
conns := make(map[string]conn, len(groups))
for _, g := range groups {
for _, addr := range g.nodes {
conns[addr] = c.connFn(addr, c.opt)
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = c.connFn(addr, c.opt)
}
}

var removes []conn

c.mu.RLock()
for addr, cc := range c.conns {
if _, ok := masters[addr]; ok {
masters[addr] = cc
if _, ok := conns[addr]; ok {
conns[addr] = cc
} else {
removes = append(removes, cc)
}
}
c.mu.RUnlock()

slots := [16384]conn{}
for addr, g := range groups {
cc := masters[addr]
for master, g := range groups {
cc := conns[master]
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
slots[i] = cc
Expand All @@ -123,7 +131,7 @@ retry:

c.mu.Lock()
c.slots = slots
c.conns = masters
c.conns = conns
c.mu.Unlock()

for _, cc := range removes {
Expand Down Expand Up @@ -197,17 +205,24 @@ func (c *clusterClient) pick(slot uint16) (p conn, err error) {
return p, nil
}

func (c *clusterClient) pickOrNew(addr string) (p conn) {
func (c *clusterClient) redirectOrNew(addr string) (p conn) {
c.mu.RLock()
p = c.conns[addr]
c.mu.RUnlock()
if p != nil {
if p != nil && !p.Is(addr) {
return p
}
c.mu.Lock()
if p = c.conns[addr]; p == nil {
p = c.connFn(addr, c.opt)
c.conns[addr] = p
} else if p.Is(addr) {
// try reconnection if the MOVED redirects to the same host,
// because the same hostname may actually be resolved into another destination
// depending on the fail-over implementation. ex: AWS MemoryDB's resize process.
go p.Close()
p = c.connFn(addr, c.opt)
c.conns[addr] = p
}
c.mu.Unlock()
return p
Expand All @@ -232,10 +247,10 @@ process:
if err := resp.RedisError(); err != nil {
if addr, ok := err.IsMoved(); ok {
go c.refresh()
resp = c.pickOrNew(addr).Do(ctx, cmd)
resp = c.redirectOrNew(addr).Do(ctx, cmd)
goto process
} else if addr, ok = err.IsAsk(); ok {
resp = c.pickOrNew(addr).DoMulti(ctx, cmds.AskingCmd, cmd)[1]
resp = c.redirectOrNew(addr).DoMulti(ctx, cmds.AskingCmd, cmd)[1]
goto process
} else if err.IsTryAgain() {
runtime.Gosched()
Expand All @@ -262,11 +277,11 @@ process:
if err := resp.RedisError(); err != nil {
if addr, ok := err.IsMoved(); ok {
go c.refresh()
resp = c.pickOrNew(addr).DoCache(ctx, cmd, ttl)
resp = c.redirectOrNew(addr).DoCache(ctx, cmd, ttl)
goto process
} else if addr, ok = err.IsAsk(); ok {
// TODO ASKING OPT-IN Caching
resp = c.pickOrNew(addr).DoMulti(ctx, cmds.AskingCmd, cmds.Completed(cmd))[1]
resp = c.redirectOrNew(addr).DoMulti(ctx, cmds.AskingCmd, cmds.Completed(cmd))[1]
goto process
} else if err.IsTryAgain() {
runtime.Gosched()
Expand Down Expand Up @@ -335,12 +350,6 @@ func (c *dedicatedClusterClient) check(slot uint16) {
}
}

func (c *dedicatedClusterClient) getConn() conn {
c.mu.Lock()
defer c.mu.Unlock()
return c.conn
}

func (c *dedicatedClusterClient) acquire() (wire wire, err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
116 changes: 111 additions & 5 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -40,6 +42,18 @@ var singleSlotResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{
}},
}}, nil)

var singleSlotResp2 = newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '*', values: []RedisMessage{
{typ: ':', integer: 0},
{typ: ':', integer: 0},
{typ: '*', values: []RedisMessage{ // master
{typ: '+', string: ""},
{typ: ':', integer: 3},
{typ: '+', string: ""},
}},
}},
}}, nil)

//gocyclo:ignore
func TestClusterClientInit(t *testing.T) {
t.Run("Init no nodes", func(t *testing.T) {
Expand Down Expand Up @@ -105,15 +119,41 @@ func TestClusterClientInit(t *testing.T) {
})

t.Run("Refresh replace", func(t *testing.T) {
if client, err := newClusterClient(&ClientOption{InitAddress: []string{":1", ":2"}}, func(dst string, opt *ClientOption) conn {
first := true
client, err := newClusterClient(&ClientOption{InitAddress: []string{":1", ":2"}}, func(dst string, opt *ClientOption) conn {
return &mockConn{
DoFn: func(cmd cmds.Completed) RedisResult {
return slotsResp
if first {
first = false
return slotsResp
}
return singleSlotResp2
},
}
}); err != nil {
})
if err != nil {
t.Fatalf("unexpected err %v", err)
} else if nodes := client.nodes(); len(nodes) != 1 || nodes[0] != ":0" {
}

nodes := client.nodes()
sort.Strings(nodes)
if len(nodes) != 3 ||
nodes[0] != ":0" ||
nodes[1] != ":1" ||
nodes[2] != ":2" {
t.Fatalf("unexpected nodes %v", nodes)
}

if err = client.refresh(); err != nil {
t.Fatalf("unexpected err %v", err)
}

nodes = client.nodes()
sort.Strings(nodes)
if len(nodes) != 3 ||
nodes[0] != ":1" ||
nodes[1] != ":2" ||
nodes[2] != ":3" {
t.Fatalf("unexpected nodes %v", nodes)
}
})
Expand Down Expand Up @@ -191,9 +231,10 @@ func TestClusterClient(t *testing.T) {
})

t.Run("Delegate Close", func(t *testing.T) {
once := sync.Once{}
called := make(chan struct{})
m.CloseFn = func() {
close(called)
once.Do(func() { close(called) })
}
client.Close()
<-called
Expand Down Expand Up @@ -368,6 +409,40 @@ func TestClusterClientErr(t *testing.T) {
}
})

t.Run("slot reconnect", func(t *testing.T) {
count := 0
check := 0
m := &mockConn{DoFn: func(cmd cmds.Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsResp
}
if count < 3 {
count++
return newResult(RedisMessage{typ: '-', string: "MOVED 0 :0"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "b"}, nil)
}, IsFn: func(addr string) bool {
is := addr == ":0"
if is {
check++
}
return is
}}

client, err := newClusterClient(&ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn {
return m
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
if v, err := client.Do(context.Background(), client.B().Get().Key("a").Build()).ToString(); err != nil || v != "b" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if check != 6 {
t.Fatalf("unexpected check count %v", check)
}
})

t.Run("slot moved", func(t *testing.T) {
count := 0
m := &mockConn{DoFn: func(cmd cmds.Completed) RedisResult {
Expand All @@ -391,6 +466,37 @@ func TestClusterClientErr(t *testing.T) {
}
})

t.Run("slot moved new", func(t *testing.T) {
count := 0
var check bool
m := &mockConn{DoFn: func(cmd cmds.Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsResp
}
if count < 3 {
count++
return newResult(RedisMessage{typ: '-', string: "MOVED 0 :2"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "b"}, nil)
}}

client, err := newClusterClient(&ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn {
if dst == ":2" {
check = true
}
return m
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
if v, err := client.Do(context.Background(), client.B().Get().Key("a").Build()).ToString(); err != nil || v != "b" {
t.Fatalf("unexpected resp %v %v", v, err)
}
if !check {
t.Fatalf("unexpected check value %v", check)
}
})

t.Run("slot moved (cache)", func(t *testing.T) {
count := 0
m := &mockConn{
Expand Down
4 changes: 4 additions & 0 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ var (
QuitCmd = Completed{
cs: &CommandSlice{s: []string{"QUIT"}},
}
// PingCmd is predefined PING
PingCmd = Completed{
cs: &CommandSlice{s: []string{"PING"}},
}
// SlotCmd is predefined CLUSTER SLOTS
SlotCmd = Completed{
cs: &CommandSlice{s: []string{"CLUSTER", "SLOTS"}},
Expand Down
26 changes: 24 additions & 2 deletions lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
type cache interface {
GetOrPrepare(key, cmd string, ttl time.Duration) (v RedisMessage, entry *entry)
Update(key, cmd string, value RedisMessage, pttl int64)
Cancel(key, cmd string, value RedisMessage, err error)
Delete(keys []RedisMessage)
FreeAndClose(notice RedisMessage)
}
Expand All @@ -30,12 +31,13 @@ type entry struct {
key string
cmd string
val RedisMessage
err error
size int
}

func (e *entry) Wait() RedisMessage {
func (e *entry) Wait() (RedisMessage, error) {
<-e.ch
return e.val
return e.val, e.err
}

type keyCache struct {
Expand Down Expand Up @@ -160,6 +162,26 @@ func (c *lru) Update(key, cmd string, value RedisMessage, pttl int64) {
}
}

func (c *lru) Cancel(key, cmd string, val RedisMessage, err error) {
var ch chan struct{}
c.mu.Lock()
if store, ok := c.store[key]; ok {
if ele, ok := store.cache[cmd]; ok {
if e := ele.Value.(*entry); e.val.typ == 0 {
e.val = val
e.err = err
ch = e.ch
delete(c.store[key].cache, cmd)
c.list.Remove(ele)
}
}
}
c.mu.Unlock()
if ch != nil {
close(ch)
}
}

func (c *lru) purge(store *keyCache) {
if store != nil {
for cmd, ele := range store.cache {
Expand Down

0 comments on commit f7c3276

Please sign in to comment.