Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: upgrade to redis v8 #28

Merged
merged 1 commit into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bokchoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
"sync"

"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"
"github.com/thoas/bokchoy/logging"
"github.com/thoas/go-funk"

Expand Down Expand Up @@ -136,8 +136,8 @@ func (b *Bokchoy) Empty(ctx context.Context) error {
}

// Flush flushes data of the entire system.
func (b *Bokchoy) Flush() error {
return b.broker.Flush()
func (b *Bokchoy) Flush(ctx context.Context) error {
return b.broker.Flush(ctx)
}

// Queue gets or creates a new queue.
Expand Down Expand Up @@ -265,7 +265,7 @@ func (b *Bokchoy) Run(ctx context.Context, options ...Option) error {
b.servers = opts.Servers
}

err := b.broker.Ping()
err := b.broker.Ping(ctx)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion bokchoy_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bokchoy_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -18,7 +19,7 @@ func TestBokchoy_Queue(t *testing.T) {
func TestBokchoy_Flush(t *testing.T) {
run(t, func(t *testing.T, s *suite) {
is := assert.New(t)
err := s.bokchoy.Flush()
err := s.bokchoy.Flush(context.Background())
is.NoError(err)
})
}
18 changes: 9 additions & 9 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,31 @@ type Broker interface {
Initialize(context.Context) error

// Ping pings the broker to ensure it's well connected.
Ping() error
Ping(context.Context) error

// Get returns raw data stored in broker.
Get(string) (map[string]interface{}, error)
Get(context.Context, string) (map[string]interface{}, error)

// Delete deletes raw data in broker based on key.
Delete(string, string) error
Delete(context.Context, string, string) error

// List returns raw data stored in broker.
List(string) ([]map[string]interface{}, error)
List(context.Context, string) ([]map[string]interface{}, error)

// Empty empties a queue.
Empty(string) error
Empty(context.Context, string) error

// Flush flushes the entire broker.
Flush() error
Flush(context.Context) error

// Count returns number of items from a queue name.
Count(string) (BrokerStats, error)
Count(context.Context, string) (BrokerStats, error)

// Save synchronizes the stored item.
Set(string, map[string]interface{}, time.Duration) error
Set(context.Context, string, map[string]interface{}, time.Duration) error

// Publish publishes raw data.
Publish(string, string, map[string]interface{}, time.Time) error
Publish(context.Context, string, string, map[string]interface{}, time.Time) error

// Consume returns an array of raw data.
Consume(context.Context, string, time.Time) ([]map[string]interface{}, error)
Expand Down
80 changes: 40 additions & 40 deletions broker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/thoas/bokchoy/logging"

"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -149,14 +149,14 @@ func (p RedisBroker) String() string {

// Initialize initializes the redis broker.
func (p *RedisBroker) Initialize(ctx context.Context) error {
err := p.Client.Ping().Err()
err := p.Client.Ping(ctx).Err()
if err != nil {
return err
}

p.scripts = make(map[string]string)
for key := range redisScripts {
sha, err := p.Client.ScriptLoad(redisScripts[key]).Result()
sha, err := p.Client.ScriptLoad(ctx, redisScripts[key]).Result()
if err != nil {
return errors.Wrapf(err, "Unable to load script %s", key)
}
Expand All @@ -168,8 +168,8 @@ func (p *RedisBroker) Initialize(ctx context.Context) error {
}

// Ping pings the redis broker to ensure it's well connected.
func (p RedisBroker) Ping() error {
_, err := p.Client.Ping().Result()
func (p RedisBroker) Ping(ctx context.Context) error {
_, err := p.Client.Ping(ctx).Result()
if err != nil {
return errors.Wrapf(err, "unable to ping redis %s", p.ClientType)
}
Expand Down Expand Up @@ -206,21 +206,21 @@ func (p *RedisBroker) consumeDelayed(ctx context.Context, name string, duration
continue
}

_, err = p.Client.TxPipelined(func(pipe redis.Pipeliner) error {
_, err = p.Client.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
for i := range results {
taskID, ok := results[i]["id"].(string)
if !ok {
continue
}

err := p.publish(pipe, name, taskID, results[i], time.Time{})
err := p.publish(ctx, pipe, name, taskID, results[i], time.Time{})
if err != nil {
return err
}
}

// To avoid data loss, we only remove the range when results are processed
_, err = pipe.ZRemRangeByScore(delayName, "0", fmt.Sprintf("%d", max.Unix())).Result()
_, err = pipe.ZRemRangeByScore(ctx, delayName, "0", fmt.Sprintf("%d", max.Unix())).Result()
if err != nil {
return err
}
Expand All @@ -247,14 +247,14 @@ func (p *RedisBroker) consume(ctx context.Context, name string, taskPrefix strin
if eta.IsZero() {
p.consumeDelayed(ctx, name, 1*time.Second)

result, err = p.Client.BRPop(1*time.Second, queueKey).Result()
result, err = p.Client.BRPop(ctx, 1*time.Second, queueKey).Result()

if err != nil && err != redis.Nil {
return nil, errors.Wrapf(err, "unable to BRPOP %s", queueKey)
}
} else {
max := fmt.Sprintf("%d", eta.UTC().Unix())
results := p.Client.ZRangeByScore(queueKey, &redis.ZRangeBy{
results := p.Client.ZRangeByScore(ctx, queueKey, &redis.ZRangeBy{
Min: "0",
Max: max,
})
Expand All @@ -279,7 +279,7 @@ func (p *RedisBroker) consume(ctx context.Context, name string, taskPrefix strin
taskKeys = append(taskKeys, p.prefixed(taskPrefix, ":", result[i]))
}

values, err := p.payloadsFromKeys(taskKeys)
values, err := p.payloadsFromKeys(ctx, taskKeys)
if err != nil {
return nil, err
}
Expand All @@ -302,8 +302,8 @@ func (p *RedisBroker) Consume(ctx context.Context, name string, eta time.Time) (

}

func (p *RedisBroker) payloadsFromKeys(taskKeys []string) (map[string]map[string]interface{}, error) {
vals, err := p.Client.EvalSha(p.scripts["MULTIHGETALL"], taskKeys).Result()
func (p *RedisBroker) payloadsFromKeys(ctx context.Context, taskKeys []string) (map[string]map[string]interface{}, error) {
vals, err := p.Client.EvalSha(ctx, p.scripts["MULTIHGETALL"], taskKeys).Result()
if err != nil {
return nil, errors.Wrapf(err, "unable to MULTIHGETALL %s", strings.Join(taskKeys, ", "))
}
Expand All @@ -318,10 +318,10 @@ func (p *RedisBroker) payloadsFromKeys(taskKeys []string) (map[string]map[string
}

// Get returns stored raw data from task key.
func (p *RedisBroker) Get(taskKey string) (map[string]interface{}, error) {
func (p *RedisBroker) Get(ctx context.Context, taskKey string) (map[string]interface{}, error) {
taskKey = p.prefixed(taskKey)

res, err := p.Client.HGetAll(taskKey).Result()
res, err := p.Client.HGetAll(ctx, taskKey).Result()
if err != nil {
return nil, errors.Wrapf(err, "unable to HGETALL %s", taskKey)
}
Expand All @@ -335,25 +335,25 @@ func (p *RedisBroker) Get(taskKey string) (map[string]interface{}, error) {
}

// Delete deletes raw data in broker based on key.
func (p *RedisBroker) Delete(name string, taskID string) error {
return p.delete(p.Client, name, taskID)
func (p *RedisBroker) Delete(ctx context.Context, name string, taskID string) error {
return p.delete(ctx, p.Client, name, taskID)
}

func (p *RedisBroker) delete(client redis.Cmdable, name string, taskID string) error {
func (p *RedisBroker) delete(ctx context.Context, client redis.Cmdable, name string, taskID string) error {
var (
prefixedTaskKey = p.prefixed(name, ":", taskID)
)

_, err := client.Del(prefixedTaskKey).Result()
_, err := client.Del(ctx, prefixedTaskKey).Result()
if err != nil {
return errors.Wrapf(err, "unable to DEL %s", prefixedTaskKey)
}

return nil
}

func (p *RedisBroker) List(name string) ([]map[string]interface{}, error) {
taskIDs, err := p.Client.LRange(name, 0, -1).Result()
func (p *RedisBroker) List(ctx context.Context, name string) ([]map[string]interface{}, error) {
taskIDs, err := p.Client.LRange(ctx, name, 0, -1).Result()
if err != nil {
return nil, errors.Wrapf(err, "unable to LRANGE %s", name)
}
Expand All @@ -363,7 +363,7 @@ func (p *RedisBroker) List(name string) ([]map[string]interface{}, error) {
taskKeys = append(taskKeys, p.prefixed(name, ":", taskIDs[i]))
}

payloads, err := p.payloadsFromKeys(taskKeys)
payloads, err := p.payloadsFromKeys(ctx, taskKeys)
if err != nil {
return nil, err
}
Expand All @@ -381,21 +381,21 @@ func (p *RedisBroker) List(name string) ([]map[string]interface{}, error) {
}

// Count returns number of items from a queue name.
func (p *RedisBroker) Count(queueName string) (BrokerStats, error) {
func (p *RedisBroker) Count(ctx context.Context, queueName string) (BrokerStats, error) {
var (
stats = BrokerStats{}
err error
)

queueName = p.prefixed(queueName)
direct, err := p.Client.LLen(queueName).Result()
direct, err := p.Client.LLen(ctx, queueName).Result()
if err != nil && err != redis.Nil {
return stats, err
}

stats.Direct = int(direct)

delayed, err := p.Client.ZCount(fmt.Sprint(queueName, ":delay"), "-inf", "+inf").Result()
delayed, err := p.Client.ZCount(ctx, fmt.Sprint(queueName, ":delay"), "-inf", "+inf").Result()
if err != nil && err != redis.Nil {
return stats, err
}
Expand All @@ -408,11 +408,11 @@ func (p *RedisBroker) Count(queueName string) (BrokerStats, error) {
}

// Save synchronizes the stored item in redis.
func (p *RedisBroker) Set(taskKey string, data map[string]interface{}, expiration time.Duration) error {
func (p *RedisBroker) Set(ctx context.Context, taskKey string, data map[string]interface{}, expiration time.Duration) error {
prefixedTaskKey := p.prefixed(taskKey)

if int(expiration.Seconds()) == 0 {
_, err := p.Client.HMSet(prefixedTaskKey, data).Result()
_, err := p.Client.HMSet(ctx, prefixedTaskKey, data).Result()
if err != nil {
return errors.Wrapf(err, "unable to HMSET %s", prefixedTaskKey)
}
Expand All @@ -423,7 +423,7 @@ func (p *RedisBroker) Set(taskKey string, data map[string]interface{}, expiratio
values := []interface{}{int(expiration.Seconds())}
values = append(values, unpack(data)...)

_, err := p.Client.EvalSha(p.scripts["HMSETEXPIRE"], []string{prefixedTaskKey}, values...).Result()
_, err := p.Client.EvalSha(ctx, p.scripts["HMSETEXPIRE"], []string{prefixedTaskKey}, values...).Result()
if err != nil {
return errors.Wrapf(err, "unable to HMSETEXPIRE %s", prefixedTaskKey)
}
Expand All @@ -434,11 +434,11 @@ func (p *RedisBroker) Set(taskKey string, data map[string]interface{}, expiratio
// Publish publishes raw data.
// it uses a hash to store the task itself
// pushes the task id to the list or a zset if the task is delayed.
func (p *RedisBroker) Publish(queueName string,
func (p *RedisBroker) Publish(ctx context.Context, queueName string,
taskID string, data map[string]interface{}, eta time.Time) error {

_, err := p.Client.Pipelined(func(pipe redis.Pipeliner) error {
return p.publish(pipe, queueName, taskID, data, eta)
_, err := p.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
return p.publish(ctx, pipe, queueName, taskID, data, eta)
})
if err != nil {
return err
Expand All @@ -447,25 +447,25 @@ func (p *RedisBroker) Publish(queueName string,
return nil
}

func (p *RedisBroker) publish(client redis.Cmdable, queueName string,
func (p *RedisBroker) publish(ctx context.Context, client redis.Cmdable, queueName string,
taskID string, data map[string]interface{}, eta time.Time) error {

var (
prefixedTaskKey = p.prefixed(queueName, ":", taskID)
err error
)

err = client.HMSet(prefixedTaskKey, data).Err()
err = client.HMSet(ctx, prefixedTaskKey, data).Err()
if err == nil {
if eta.IsZero() {
err = client.RPush(p.prefixed(queueName), taskID).Err()
err = client.RPush(ctx, p.prefixed(queueName), taskID).Err()
} else {
// if eta is before now, then we should push this
// taskID in priority
if eta.Before(time.Now().UTC()) {
err = client.LPush(p.prefixed(queueName), taskID).Err()
err = client.LPush(ctx, p.prefixed(queueName), taskID).Err()
} else {
err = client.ZAdd(p.prefixed(fmt.Sprint(queueName, ":delay")), &redis.Z{
err = client.ZAdd(ctx, p.prefixed(fmt.Sprint(queueName, ":delay")), &redis.Z{
Score: float64(eta.UTC().Unix()),
Member: taskID,
}).Err()
Expand All @@ -480,8 +480,8 @@ func (p *RedisBroker) publish(client redis.Cmdable, queueName string,
}

// Empty removes the redis key for a queue.
func (p *RedisBroker) Empty(name string) error {
err := p.Client.Del(p.prefixed(name)).Err()
func (p *RedisBroker) Empty(ctx context.Context, name string) error {
err := p.Client.Del(ctx, p.prefixed(name)).Err()
if err != nil && err != redis.Nil {
return errors.Wrapf(err, "unable to DEL %s", p.prefixed(name))
}
Expand All @@ -490,8 +490,8 @@ func (p *RedisBroker) Empty(name string) error {
}

// Flush flushes the entire redis database.
func (p *RedisBroker) Flush() error {
err := p.Client.FlushDB().Err()
func (p *RedisBroker) Flush(ctx context.Context) error {
err := p.Client.FlushDB(ctx).Err()
if err != nil {
return errors.Wrap(err, "unable to FLUSHDB")
}
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package bokchoy

import "github.com/go-redis/redis/v7"
import "github.com/go-redis/redis/v8"

// RedisClusterConfig contains the redis cluster configuration.
type RedisClusterConfig redis.ClusterOptions
Expand Down
2 changes: 1 addition & 1 deletion examples/crawler/handler/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (h *CrawlHandler) Crawls() []string {
h.mu.RLock()
crawls := make([]string, len(h.crawls))
i := 0
for url, _ := range h.crawls {
for url := range h.crawls {
crawls[i] = url
i++
}
Expand Down
4 changes: 2 additions & 2 deletions examples/custom-broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"log"
"time"

"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"

"github.com/thoas/bokchoy"
"github.com/thoas/bokchoy/logging"
Expand All @@ -20,7 +20,7 @@ func main() {
clt := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

// define a new Redis broker with the 'tasks' prefix
bkr := bokchoy.NewRedisBroker(clt, "client", "tasks", logger)

Expand Down
Loading