Skip to content

Commit

Permalink
Support pipeline and batch commands
Browse files Browse the repository at this point in the history
pipeline and mset/mget commands are supported now.

* PipeLiner is a new redis.Conn interface used to send pipeline requests
  to all nodes according to the slots concurrently and build a response
  to the caller when all responses returned from the nodes.
  If redirect errors occur, the PipeLiner will handle them automatically.

* The mset/mget commands are implemented based on the PipeLiner.
  • Loading branch information
rmker committed Oct 27, 2022
1 parent f91cdaf commit 04ab71f
Show file tree
Hide file tree
Showing 5 changed files with 505 additions and 14 deletions.
67 changes: 67 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,70 @@ func (c *Cluster) Stats() map[string]redis.PoolStats {
}
return stats
}

// return the addr list corresponding slots
func (c *Cluster) getAddrForSlots(slots []int, readOnly bool) (addrs []string, err error) {
var (
sliceAddrs [][]string
)
if len(slots) <= 0 {
return nil, errors.New("empty slots")
}
sliceAddrs = make([][]string, len(slots))
addrs = make([]string, len(slots))
rslot := -1
c.mu.Lock()
for i, slot := range slots {
// if slot is out of c.mapping range then set to a fixed random value
if slot < 0 || slot >= HashSlots {
if rslot < 0 {
rnd.Lock()
rslot = rnd.Intn(HashSlots - 1)
rnd.Unlock()
}
slot = rslot
}
sliceAddrs[i] = c.mapping[slot]
}
c.mu.Unlock()

for i, as := range sliceAddrs {
// mapping slices are never altered, they are replaced when refreshing
// or on a MOVED response, so it's non-racy to read them outside the lock.
if len(as) > 0 {
addr := as[0]
// get the address of a replica
if len(as) == 2 {
addr = as[1]
} else if len(as) > 2 {
rnd.Lock()
ix := rnd.Intn(len(as) - 1)
rnd.Unlock()
addr = as[ix+1] // +1 because 0 is the master
}
addrs[i] = addr
}
}
return
}

// GetPipelinerConn returns a Conn interface that can handle Redis Pipeline
func (c *Cluster) GetPipelinerConn() redis.Conn {
return newPipeliner(c)
}

// GetConnForAddr returns a Conn interface bound to the addr
func (c *Cluster) GetConnForAddr(addr string, forceDial, readOnly bool) redis.Conn {
c.mu.Lock()
err := c.err
c.mu.Unlock()

conn := &Conn{
cluster: c,
err: err,
forceDial: forceDial,
readOnly: readOnly,
}
conn.BindAddr(addr)
return conn
}
4 changes: 2 additions & 2 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,13 +856,13 @@ func TestCommands(t *testing.T) {
{"APPEND", redis.Args{"s1", "a"}, int64(1), ""},
{"BITCOUNT", redis.Args{"s1"}, int64(3), ""},
{"GET", redis.Args{"s1"}, []byte("a"), ""},
{"MSET", redis.Args{"s2", "b", "s3", "c"}, "", "CROSSSLOT"},
{"MSET", redis.Args{"s2", "b", "s3", "c"}, "OK", ""},
{"SET", redis.Args{"s{b}", "b"}, "OK", ""},
{"SET", redis.Args{"s{bcd}", "c"}, "OK", ""},
// keys "b" (3300) and "bcd" (1872) are both in a hash slot < 5000, so on same node for this test
// yet it still fails with CROSSSLOT (i.e. redis does not accept multi-key commands that don't
// strictly hash to the same slot, regardless of which host serves them).
{"MGET", redis.Args{"s{b}", "s{bcd}"}, "", "CROSSSLOT"},
{"MGET", redis.Args{"s{b}", "s{bcd}"}, []interface{}{[]uint8{uint8('b')}, []uint8{uint8('c')}}, ""},
},
"transactions": {
{"DISCARD", nil, "", "ERR DISCARD without MULTI"},
Expand Down
76 changes: 64 additions & 12 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,36 @@ import (
"github.com/gomodule/redigo/redis"
)

const (
CMDMSET = "MSET"
CMDMGET = "MGET"
CMDEVAL = "EVAL"
CMDEVALSHA = "EVALSHA"
)

var _ redis.ConnWithTimeout = (*Conn)(nil)

// Conn is a redis cluster connection. When returned by Get or Dial, it is not
// yet bound to any node in the cluster. Only when a call to Do, Send, Receive
// or Bind is made is a connection to a specific node established:
//
// - if Do or Send is called first, the command's first parameter is
// - if Do or Send is called first, the command's first parameter is
// assumed to be the key, and its slot is used to find the node
// - if Receive is called first, or if Do or Send is called first but with
// - if Receive is called first, or if Do or Send is called first but with
// no parameter for the command (or no command), a random node is selected
// in the cluster
// - if Bind is called first, the node corresponding to the slot of the
// - if Bind is called first, the node corresponding to the slot of the
// specified key(s) is selected
//
// Because Get and Dial return a redis.Conn interface, a type assertion can be
// used to call Bind or ReadOnly on this concrete Conn type:
//
// redisConn := cluster.Get()
// if conn, ok := redisConn.(*redisc.Conn); ok {
// if err := conn.Bind("my-key"); err != nil {
// // handle error
// }
// }
// redisConn := cluster.Get()
// if conn, ok := redisConn.(*redisc.Conn); ok {
// if err := conn.Bind("my-key"); err != nil {
// // handle error
// }
// }
//
// Alternatively, the package-level BindConn or ReadOnlyConn helper functions
// may be used.
Expand Down Expand Up @@ -137,10 +144,17 @@ func (c *Conn) bind(slot int) (rc redis.Conn, ok bool, err error) {
return rc, ok, err
}

func cmdSlot(_ string, args []interface{}) int {
func cmdSlot(commandName string, args []interface{}) int {
slot := -1
if len(args) > 0 {
key := fmt.Sprintf("%s", args[0])
sk := 0

// for script command, use the first key to calculate slot, so we can do a script with only one key handling
// NOTE: you should not handle muliple keys in one script command in a redis cluster
if commandName == CMDEVAL || commandName == CMDEVALSHA {
sk = 2
}
if len(args) > sk {
key := fmt.Sprintf("%s", args[sk])
slot = Slot(key)
}
return slot
Expand Down Expand Up @@ -249,6 +263,13 @@ func (c *Conn) DoWithTimeout(timeout time.Duration, cmd string, args ...interfac
}
}

// hooked for special command like mset/mget
hooked := false
hooked, v, err = c.hookCommand(timeout, cmd, args...)
if hooked {
return
}

rc, _, err := c.bind(cmdSlot(cmd, args))
if err != nil {
return nil, err
Expand Down Expand Up @@ -359,3 +380,34 @@ func (c *Conn) closeLocked() (err error) {
}
return err
}

func (c *Conn) hookCommand(timeout time.Duration, cmd string, args ...interface{}) (bool, interface{}, error) {
if cmd == CMDMSET {
v, err := multiset(c, args...)
return true, v, err
} else if cmd == CMDMGET {
v, err := multiget(c, args...)
return true, v, err
}
return false, nil, nil
}

// BindAddr binds the connection to a specific node with addr
func (c *Conn) BindAddr(addr string) (rc redis.Conn, ok bool, err error) {
c.mu.Lock()
rc, err = c.rc, c.err
if err == nil {
if rc == nil {
conn, err2 := c.cluster.getConnForAddr(addr, c.forceDial)
if err2 != nil {
err = err2
} else {
c.rc, rc = conn, conn
c.boundAddr = addr
ok = true
}
}
}
c.mu.Unlock()
return rc, ok, err
}
123 changes: 123 additions & 0 deletions multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package redisc

import (
"errors"
"fmt"
)

// Support MSET/MGET command based on the PipeLiner, which is a redis.Conn interface for redis pipeline.
// multixxx function receives keys from args, calculates all slots of keys, and then generates multiple new MSET/MGET commands based on slots.
// All these commands will be sent to the corresponding nodes by PipeLiner. The PipeLiner would handle that automatically if redirect error occurs.
// Eventually, all responses from different nodes will be rebuilt into a single response based on the command key order.

// multiset supports MSET command for a redis cluster based on pipeLiner
func multiset(c *Conn, args ...interface{}) (interface{}, error) {
var (
orderSlots []int
res interface{}
)
if len(args) <= 0 {
return nil, nil
}
cmdMap := make(map[int][]interface{})
for i := 0; i < len(args); i = i + 2 {
vk := i + 1
if vk >= len(args) {
// args length exeption for mset
return nil, errors.New("args length exeption for mset")
}
key := fmt.Sprintf("%s", args[i])
slot := Slot(key)
_, exist := cmdMap[slot]
if !exist {
cmdMap[slot] = []interface{}{key, args[vk]}
} else {
cmdMap[slot] = append(cmdMap[slot], key, args[vk])
}
}

pipeLiner := c.cluster.GetPipelinerConn()
defer pipeLiner.Close()
for slot := range cmdMap {
err := pipeLiner.Send(CMDMSET, cmdMap[slot]...)
if err != nil {
return nil, err
}
orderSlots = append(orderSlots, slot)
}
err := pipeLiner.Flush()
if err != nil {
return nil, err
}
for range orderSlots {
reply, err := pipeLiner.Receive()
if err != nil {
return nil, err
}
res = reply
}
return res, nil
}

// multiget supports MGET command for a redis cluster based on pipeLiner
func multiget(c *Conn, args ...interface{}) (interface{}, error) {
var (
orderSlots []int
res []interface{}
keys []string
)
if len(args) <= 0 {
return nil, nil
}
cmdMap := make(map[int][]interface{})
for _, arg := range args {
key := fmt.Sprintf("%s", arg)
keys = append(keys, key)
slot := Slot(key)
_, exist := cmdMap[slot]
if !exist {
cmdMap[slot] = []interface{}{key}
} else {
cmdMap[slot] = append(cmdMap[slot], key)
}
}
pipeLiner := c.cluster.GetPipelinerConn()
defer pipeLiner.Close()
for slot := range cmdMap {
err := pipeLiner.Send(CMDMGET, cmdMap[slot]...)
if err != nil {
return nil, err
}
orderSlots = append(orderSlots, slot)
}
err := pipeLiner.Flush()
if err != nil {
return nil, err
}

resMap := make(map[string]interface{})
for _, slot := range orderSlots {
reply, err := pipeLiner.Receive()
if err != nil {
return nil, err
}
keyCount := len(cmdMap[slot])
replySlice, ok := reply.([]interface{})
validReply := false
if ok && keyCount == len(replySlice) {
validReply = true
}
for i := 0; i < keyCount; i++ {
key := fmt.Sprintf("%s", cmdMap[slot][i])
if validReply {
resMap[key] = replySlice[i]
} else {
resMap[key] = nil
}
}
}
for i := range keys {
res = append(res, resMap[keys[i]])
}
return res, nil
}
Loading

0 comments on commit 04ab71f

Please sign in to comment.