Skip to content

Commit

Permalink
Merge pull request #73 from junmipan/master
Browse files Browse the repository at this point in the history
Pagination for iterators
  • Loading branch information
jozef-slezak committed Aug 16, 2017
2 parents e52d491 + 0683d28 commit 77c4124
Show file tree
Hide file tree
Showing 59 changed files with 1,156 additions and 9,289 deletions.
616 changes: 200 additions & 416 deletions db/keyval/redis/bytes_broker_impl.go

Large diffs are not rendered by default.

355 changes: 148 additions & 207 deletions db/keyval/redis/bytes_suite_test.go

Large diffs are not rendered by default.

64 changes: 16 additions & 48 deletions db/keyval/redis/bytes_txn_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
goredis "github.com/go-redis/redis"
"github.com/howeyc/crc16"
"github.com/ligato/cn-infra/db/keyval"
"github.com/ligato/cn-infra/utils/safeclose"
)

type op struct {
Expand All @@ -33,13 +32,9 @@ type op struct {
// Txn allows to group operations into the transaction. Transaction executes multiple operations
// in a more efficient way in contrast to executing them one by one.
type Txn struct {
db *BytesConnectionRedis
ops []op
prefix string
}

func (tx *Txn) addPrefix(key string) string {
return tx.prefix + key
db *BytesConnectionRedis
ops []op
addPrefix func(key string) string
}

// Put adds a new 'put' operation to a previously created transaction.
Expand All @@ -48,14 +43,20 @@ func (tx *Txn) addPrefix(key string) string {
// the existing value will be overwritten with the value from this
// operation.
func (tx *Txn) Put(key string, value []byte) keyval.BytesTxn {
tx.ops = append(tx.ops, op{tx.addPrefix(key), value, false})
if tx.addPrefix != nil {
key = tx.addPrefix(key)
}
tx.ops = append(tx.ops, op{key, value, false})
return tx
}

// Delete adds a new 'delete' operation to a previously created
// transaction.
func (tx *Txn) Delete(key string) keyval.BytesTxn {
tx.ops = append(tx.ops, op{tx.addPrefix(key), nil, true})
if tx.addPrefix != nil {
key = tx.addPrefix(key)
}
tx.ops = append(tx.ops, op{key, nil, true})
return tx
}

Expand All @@ -72,11 +73,6 @@ func (tx *Txn) Commit() (err error) {
return nil
}

// redigo
if tx.db.pool != nil {
return redigoPseudoTxn(tx)
}

// go-redis

pipeline := tx.db.client.TxPipeline()
Expand All @@ -99,36 +95,6 @@ func (tx *Txn) Commit() (err error) {
return nil
}

func redigoPseudoTxn(tx *Txn) error {
toBeDeleted := []interface{}{}
msetArgs := []interface{}{}
for _, op := range tx.ops {
if op.del {
toBeDeleted = append(toBeDeleted, op.key)
} else {
msetArgs = append(msetArgs, op.key)
msetArgs = append(msetArgs, string(op.value))
}
}

conn := tx.db.pool.Get()
defer safeclose.Close(conn)

if len(toBeDeleted) > 0 {
_, err := conn.Do("DEL", toBeDeleted...)
if err != nil {
return fmt.Errorf("Do(DEL) failed: %s", err)
}
}
if len(msetArgs) > 0 {
_, err := conn.Do("MSET", msetArgs...)
if err != nil {
return fmt.Errorf("Do(MSET) failed: %s", err)
}
}
return nil
}

// CROSSSLOT Keys in request don't hash to the same slot
// https://stackoverflow.com/questions/38042629/redis-cross-slot-error
// https://redis.io/topics/cluster-spec#keys-hash-tags
Expand Down Expand Up @@ -158,14 +124,16 @@ func checkCrossSlot(tx *Txn) bool {
}

func getHashSlot(key string) uint16 {
var tag string
start := strings.Index(key, "{")
if start != -1 {
start++
end := strings.Index(key[start:], "}")
tagSlice := key[start:]
end := strings.Index(tagSlice, "}")
if end != -1 {
key = key[start:end]
tag = tagSlice[:end]
}
}
const redisHashSlotCount = 16384
return crc16.ChecksumCCITT([]byte(key)) % redisHashSlotCount
return crc16.ChecksumCCITT([]byte(tag)) % redisHashSlotCount
}
119 changes: 7 additions & 112 deletions db/keyval/redis/bytes_watcher_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"fmt"

redigo "github.com/garyburd/redigo/redis"
goredis "github.com/go-redis/redis"
"github.com/ligato/cn-infra/db"
"github.com/ligato/cn-infra/db/keyval"
Expand Down Expand Up @@ -97,18 +96,16 @@ func (db *BytesConnectionRedis) Watch(respChan chan keyval.BytesWatchResp, keys
if db.closed {
return fmt.Errorf("Watch(%v) called on a closed connection", keys)
}
return watch(db, respChan, db.closeCh, nil, keys...)
return watch(db, respChan, db.closeCh, nil, nil, keys...)
}

func watch(db *BytesConnectionRedis, respChan chan<- keyval.BytesWatchResp,
closeChan <-chan struct{}, trimPrefix func(key string) string, keys ...string) error {

if db.pool != nil {
return redigoWatch(db, respChan, closeChan, trimPrefix, keys...)
}

func watch(db *BytesConnectionRedis, respChan chan<- keyval.BytesWatchResp, closeChan <-chan struct{},
addPrefix func(key string) string, trimPrefix func(key string) string, keys ...string) error {
patterns := make([]string, len(keys))
for i, k := range keys {
if addPrefix != nil {
k = addPrefix(k)
}
patterns[i] = keySpaceEventPrefix + wildcard(k)
}
pubSub := db.client.PSubscribe(patterns...)
Expand Down Expand Up @@ -177,112 +174,10 @@ func startWatch(db *BytesConnectionRedis, pubSub *goredis.PubSub,
}()
}

func redigoWatch(db *BytesConnectionRedis, respChan chan<- keyval.BytesWatchResp,
closeChan <-chan struct{}, trimPrefix func(key string) string, keys ...string) error {

patterns := make([]interface{}, len(keys))
for i, k := range keys {
patterns[i] = keySpaceEventPrefix + wildcard(k)
}

// Allocate 1 connection per watch...
conn := db.pool.Get()
pubSub := redigo.PubSubConn{Conn: conn}
err := pubSub.PSubscribe(patterns...)
if err != nil {
safeclose.Close(pubSub)
db.Errorf("PSubscribe %v failed: %s", patterns, err)
return err
}
go func() {
defer func() { db.Debugf("Watch(%v) exited", patterns) }()
for {
val := pubSub.Receive()
closing, err := db.redigoHandleChange(val, respChan, closeChan, trimPrefix)
if err != nil && !db.closed {
db.Errorf("Watch(%v) encountered error: %s", patterns, err)
}
if closing {
return
}
}
}()
go func() {
_, active := <-closeChan
if !active {
db.Debugf("Received signal to close Watch(%v)", patterns)
err := pubSub.PUnsubscribe(patterns...)
if err != nil {
db.Errorf("PUnsubscribe %v failed: %s", patterns, err)
}
safeclose.Close(pubSub)
}
}()
return nil
}

func (db *BytesConnectionRedis) redigoHandleChange(val interface{}, respChan chan<- keyval.BytesWatchResp,
closeChan <-chan struct{}, trimPrefix func(key string) string) (close bool, err error) {
defer func() {
if r := recover(); r != nil {
// In case something like this happens:
// panic: send on closed channel
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("pkg: %v", r)
}
}
}()

switch n := val.(type) {
case redigo.Subscription:
db.Debugf("Subscription: %s %s %d", n.Kind, n.Channel, n.Count)
if n.Count == 0 {
return true, nil
}
case redigo.PMessage:
db.Debugf("PMessage: %s %s %s", n.Pattern, n.Channel, n.Data)
key := n.Channel[strings.Index(n.Channel, ":")+1:]
switch cmd := string(n.Data); cmd {
case "set":
// Ouch, keyspace event does not convey value. Need to retrieve it.
val, _, rev, err := db.GetValue(key)
if err != nil {
db.Errorf("GetValue(%s) failed with error %s", key, err)
}
if val == nil {
db.Errorf("GetValue(%s) returned nil", key)
}
if trimPrefix != nil {
key = trimPrefix(key)
}
respChan <- NewBytesWatchPutResp(key, val, rev)
case "del", "expired":
if trimPrefix != nil {
key = trimPrefix(key)
}
respChan <- NewBytesWatchDelResp(key, 0)
}
//TODO NICE-to-HAVE no block here if buffer is overflown
case redigo.Message:
// Not subscribing to this event type yet
db.Debugf("Message: %s %s which I did not subscribe !", n.Channel, n.Data)
case error:
return true, n
}

return false, nil
}

// Watch starts subscription for changes associated with the selected key. Watch events will be delivered to respChan.
func (pdb *BytesBrokerWatcherRedis) Watch(respChan chan keyval.BytesWatchResp, keys ...string) error {
if pdb.delegate.closed {
return fmt.Errorf("Watch(%v) called on a closed connection", keys)
}
prefixedKeys := make([]string, len(keys))
for i, k := range keys {
prefixedKeys[i] = pdb.prefix + k
}
return watch(pdb.delegate, respChan, pdb.closeCh, pdb.trimPrefix, prefixedKeys...)
return watch(pdb.delegate, respChan, pdb.closeCh, pdb.addPrefix, pdb.trimPrefix, keys...)
}
49 changes: 5 additions & 44 deletions db/keyval/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"

"github.com/coreos/etcd/pkg/tlsutil"
redigo "github.com/garyburd/redigo/redis"
"github.com/ghodss/yaml"
goredis "github.com/go-redis/redis"
)
Expand Down Expand Up @@ -99,11 +98,11 @@ type Client interface {

// ClientConfig Configuration common to all types of Redis clients
type ClientConfig struct {
Password string `json:"password"` // password, if required
DialTimeout time.Duration `json:"dial-timeout"` // timeout for connection operations, in seconds
ReadTimeout time.Duration `json:"read-timeout"` // timeout for read operations, in seconds
WriteTimeout time.Duration `json:"write-timeout"` // timeout for write operations, in seconds
Pool PoolConfig `json:"pool"` // connection pool configuration
Password string `json:"password"` // Password for authentication, if required
DialTimeout time.Duration `json:"dial-timeout"` // Dial timeout for establishing new connections. Default is 5 seconds.
ReadTimeout time.Duration `json:"read-timeout"` // Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking. Default is 3 seconds.
WriteTimeout time.Duration `json:"write-timeout"` // Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking. Default is ReadTimeout.
Pool PoolConfig `json:"pool"` // Connection pool configuration
}

// NodeConfig Node client configuration
Expand Down Expand Up @@ -338,41 +337,3 @@ func LoadConfig(configFile string) (cfg interface{}, err error) {
}
return c, nil
}

///////////////////////////////////////////////////////////////////////////////
// Redigo - https://github.com/garyburd/redigo/redis

// ConnPool provides abstraction of connection pool.
//
// Deprecated: See the documentation of CreateNodeClientConnPool().
type ConnPool interface {
// Get returns a vlid connection. The application must close the returned connection.
Get() redigo.Conn
// Close releases the resources used by the pool.
Close() error
}

// CreateNodeClientConnPool creates a Redis connection pool
//
// Deprecated: Use CreateNodeClient() or CreateClient() instead for single node connection.
func CreateNodeClientConnPool(config NodeConfig) (ConnPool, error) {
options := append([]redigo.DialOption{}, redigo.DialDatabase(config.DB))
options = append(options, redigo.DialPassword(config.Password))
options = append(options, redigo.DialReadTimeout(config.ReadTimeout))
options = append(options, redigo.DialWriteTimeout(config.WriteTimeout))
if config.TLS.Enabled {
tlsConfig, err := createTLSConfig(config.TLS)
if err != nil {
return nil, err
}
options = append(options, redigo.DialTLSConfig(tlsConfig))
options = append(options, redigo.DialTLSSkipVerify(config.TLS.SkipVerify))
}
return &redigo.Pool{
MaxIdle: config.Pool.PoolSize,
MaxActive: config.Pool.PoolSize,
IdleTimeout: config.Pool.IdleTimeout,
Wait: true,
Dial: func() (redigo.Conn, error) { return redigo.Dial("tcp", config.Endpoint, options...) },
}, nil
}
Loading

0 comments on commit 77c4124

Please sign in to comment.