Skip to content

Commit

Permalink
Merge 274f0d6 into 13b26dc
Browse files Browse the repository at this point in the history
  • Loading branch information
theckman committed May 25, 2020
2 parents 13b26dc + 274f0d6 commit 756046e
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 28 deletions.
22 changes: 17 additions & 5 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ type ConsumerOptions struct {
BufferSize int
// Concurrency dictates how many goroutines to spawn to handle the messages.
Concurrency int
// RedisOptions is how you configure the underlying Redis connection. More
// info here: https://godoc.org/github.com/go-redis/redis#Options.
// RedisClient supersedes the RedisOptions field, and allows you to inject
// an already-made *redis.Client for use in the consumer.
RedisClient *redis.Client
// RedisOptions allows you to configure the underlying Redis connection.
// More info here: https://godoc.org/github.com/go-redis/redis#Options.
//
// This field is used if RedisClient field is nil.
RedisOptions *RedisOptions
}

Expand Down Expand Up @@ -115,9 +120,16 @@ func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
options.ReclaimInterval = 1 * time.Second
}

r, err := newRedisClient(options.RedisOptions)
if err != nil {
return nil, errors.Wrap(err, "error creating redis client")
var r *redis.Client

if options.RedisClient != nil {
r = options.RedisClient
} else {
r = newRedisClient(options.RedisOptions)
}

if err := redisPreflightChecks(r); err != nil {
return nil, err
}

return &Consumer{
Expand Down
6 changes: 5 additions & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ func TestNewConsumerWithOptions(t *testing.T) {
assert.Equal(tt, 1*time.Second, c.options.ReclaimInterval)
})

t.Run("allows override of Name, GroupName, BlockingTimeout, and ReclaimTimeout", func(tt *testing.T) {
t.Run("allows override of Name, GroupName, BlockingTimeout, ReclaimTimeout, and RedisClient", func(tt *testing.T) {
rc := newRedisClient(nil)

c, err := NewConsumerWithOptions(&ConsumerOptions{
Name: "test_name",
GroupName: "test_group_name",
BlockingTimeout: 10 * time.Second,
ReclaimInterval: 10 * time.Second,
RedisClient: rc,
})
require.NoError(tt, err)

assert.Equal(tt, rc, c.redis)
assert.Equal(tt, "test_name", c.options.Name)
assert.Equal(tt, "test_group_name", c.options.GroupName)
assert.Equal(tt, 10*time.Second, c.options.BlockingTimeout)
Expand Down
20 changes: 16 additions & 4 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ type ProducerOptions struct {
// option. This allows the stream trimming to done in a more efficient
// manner. More info here: https://redis.io/commands/xadd#capped-streams.
ApproximateMaxLength bool
// RedisOptions is how you configure the underlying Redis connection. More
// info here: https://godoc.org/github.com/go-redis/redis#Options.
// RedisClient supersedes the RedisOptions field, and allows you to inject
// an already-made *redis.Client for use in the consumer.
RedisClient *redis.Client
// RedisOptions allows you to configure the underlying Redis connection.
// More info here: https://godoc.org/github.com/go-redis/redis#Options.
//
// This field is used if RedisClient field is nil.
RedisOptions *RedisOptions
}

Expand All @@ -45,8 +50,15 @@ func NewProducer() (*Producer, error) {

// NewProducerWithOptions creates a Producer using custom ProducerOptions.
func NewProducerWithOptions(options *ProducerOptions) (*Producer, error) {
r, err := newRedisClient(options.RedisOptions)
if err != nil {
var r *redis.Client

if options.RedisClient != nil {
r = options.RedisClient
} else {
r = newRedisClient(options.RedisOptions)
}

if err := redisPreflightChecks(r); err != nil {
return nil, err
}

Expand Down
12 changes: 12 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ func TestNewProducerWithOptions(t *testing.T) {
assert.NotNil(tt, p)
})

t.Run("allows custom *redis.Client", func(tt *testing.T) {
rc := newRedisClient(nil)

p, err := NewProducerWithOptions(&ProducerOptions{
RedisClient: rc,
})
require.NoError(tt, err)

assert.NotNil(tt, p)
assert.Equal(tt, rc, p.redis)
})

t.Run("bubbles up errors", func(tt *testing.T) {
_, err := NewProducerWithOptions(&ProducerOptions{
RedisOptions: &RedisOptions{Addr: "localhost:0"},
Expand Down
25 changes: 14 additions & 11 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,39 @@ var redisVersionRE = regexp.MustCompile(`redis_version:(.+)`)
type RedisOptions = redis.Options

// newRedisClient creates a new Redis client with the given options. If options
// is nil, it will use default options. In addition to creating the client, it
// also ensures that it can connect to the actual instance and that the instance
// supports Redis streams (i.e. it's at least v5).
func newRedisClient(options *RedisOptions) (*redis.Client, error) {
// is nil, it will use default options.
func newRedisClient(options *RedisOptions) *redis.Client {
if options == nil {
options = &RedisOptions{}
}
client := redis.NewClient(options)
return redis.NewClient(options)
}

// make sure Redis supports streams (i.e. is at least v5)
// redisPreflightChecks makes sure the Redis instance backing the *redis.Client
// offers the functionality we need. Specifically, it also that it can connect
// to the actual instance and that the instance supports Redis streams (i.e.
// it's at least v5).
func redisPreflightChecks(client *redis.Client) error {
info, err := client.Info("server").Result()
if err != nil {
return nil, err
return err
}

match := redisVersionRE.FindAllStringSubmatch(info, -1)
if len(match) < 1 {
return nil, fmt.Errorf("could not extract redis version")
return fmt.Errorf("could not extract redis version")
}
version := strings.TrimSpace(match[0][1])
parts := strings.Split(version, ".")
major, err := strconv.Atoi(parts[0])
if err != nil {
return nil, err
return err
}
if major < 5 {
return nil, fmt.Errorf("redis streams are not supported in version %q", version)
return fmt.Errorf("redis streams are not supported in version %q", version)
}

return client, nil
return nil
}

// incrementMessageID takes in a message ID (e.g. 1564886140363-0) and
Expand Down
16 changes: 9 additions & 7 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,26 @@ import (
func TestNewRedisClient(t *testing.T) {
t.Run("returns a new redis client", func(tt *testing.T) {
options := &RedisOptions{}
r, err := newRedisClient(options)
require.NoError(tt, err)
r := newRedisClient(options)

err = r.Ping().Err()
err := r.Ping().Err()
assert.NoError(tt, err)
})

t.Run("defaults options if it's nil", func(tt *testing.T) {
r, err := newRedisClient(nil)
require.NoError(tt, err)
r := newRedisClient(nil)

err = r.Ping().Err()
err := r.Ping().Err()
assert.NoError(tt, err)
})
}

func TestRedisPreflightChecks(t *testing.T) {
t.Run("bubbles up errors", func(tt *testing.T) {
options := &RedisOptions{Addr: "localhost:0"}
_, err := newRedisClient(options)
r := newRedisClient(options)

err := redisPreflightChecks(r)
require.Error(tt, err)

assert.Contains(tt, err.Error(), "dial tcp")
Expand Down

0 comments on commit 756046e

Please sign in to comment.