Skip to content

Commit

Permalink
feat: better coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
thoas committed Jul 9, 2019
1 parent cd2f316 commit 65c4bf9
Show file tree
Hide file tree
Showing 16 changed files with 232 additions and 46 deletions.
3 changes: 3 additions & 0 deletions Makefile
@@ -1,2 +1,5 @@
test:
scripts/test.sh

run-cover:
go tool cover -html=coverage.out
15 changes: 11 additions & 4 deletions bokchoy.go
Expand Up @@ -37,7 +37,7 @@ func New(ctx context.Context, cfg Config, options ...Option) (*Bokchoy, error) {
tracer Tracer
)

logger, _ := logging.NewNopLogger()
logger := logging.NewNopLogger()
if opts.Logger != nil {
logger = opts.Logger
}
Expand All @@ -60,9 +60,16 @@ func New(ctx context.Context, cfg Config, options ...Option) (*Bokchoy, error) {
bok.serializer = opts.Serializer
}

bok.broker, err = newBroker(ctx, cfg.Broker, logger.With(logging.String("component", "broker")))
if err != nil {
return nil, errors.Wrap(err, "unable to initialize broker")
bok.broker = newBroker(ctx, cfg.Broker,
logger.With(logging.String("component", "broker")))

if opts.Initialize {
err = bok.broker.Initialize(ctx)
if err != nil {
if err != nil {
return nil, errors.Wrap(err, "unable to initialize broker")
}
}
}

for i := range cfg.Queues {
Expand Down
8 changes: 8 additions & 0 deletions bokchoy_test.go
Expand Up @@ -14,3 +14,11 @@ func TestBokchoy_Queue(t *testing.T) {
is.Equal(queue.Name(), "tests.task.message")
})
}

func TestBokchoy_Flush(t *testing.T) {
run(t, func(t *testing.T, s *suite) {
is := assert.New(t)
err := s.bokchoy.Flush()
is.NoError(err)
})
}
15 changes: 7 additions & 8 deletions broker.go
Expand Up @@ -9,7 +9,10 @@ import (

// Broker is the common interface to define a Broker.
type Broker interface {
// Ping pings the redis broker to ensure it's well connected.
// Initialize initializes the broker.
Initialize(context.Context) error

// Ping pings the broker to ensure it's well connected.
Ping() error

// Get returns raw data stored in broker.
Expand All @@ -35,19 +38,15 @@ type Broker interface {
}

// newBroker initializes a new Broker instance.
func newBroker(ctx context.Context, cfg BrokerConfig, logger logging.Logger) (Broker, error) {
func newBroker(ctx context.Context, cfg BrokerConfig, logger logging.Logger) Broker {
var (
broker Broker
err error
)

switch cfg.Type {
default:
broker, err = newRedisBroker(ctx, cfg.Redis, logger)
if err != nil {
return nil, err
}
broker = newRedisBroker(ctx, cfg.Redis, logger)
}

return broker, err
return broker
}
14 changes: 4 additions & 10 deletions broker_redis.go
Expand Up @@ -71,7 +71,7 @@ return cjson.encode(data)`,
}

// newRedisBroker initializes a new redis client.
func newRedisBroker(ctx context.Context, cfg RedisConfig, logger logging.Logger) (*redisBroker, error) {
func newRedisBroker(ctx context.Context, cfg RedisConfig, logger logging.Logger) *redisBroker {
var clt redisClient

switch cfg.Type {
Expand Down Expand Up @@ -125,21 +125,15 @@ func newRedisBroker(ctx context.Context, cfg RedisConfig, logger logging.Logger)

}

b := &redisBroker{
return &redisBroker{
clt: clt,
prefix: cfg.Prefix,
logger: logger,
}

err := b.initialize(ctx)
if err != nil {
return nil, err
}

return b, nil
}

func (p *redisBroker) initialize(ctx context.Context) error {
// Initialize initializes the redis broker.
func (p *redisBroker) Initialize(ctx context.Context) error {
err := p.clt.Ping().Err()
if err != nil {
return err
Expand Down
33 changes: 33 additions & 0 deletions broker_test.go
@@ -0,0 +1,33 @@
package bokchoy_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/thoas/bokchoy"
)

func TestBroker_Redis(t *testing.T) {
is := assert.New(t)
ctx := context.Background()

_, err := bokchoy.New(ctx, bokchoy.Config{
Broker: bokchoy.BrokerConfig{
Type: "redis",
Redis: bokchoy.RedisConfig{
Type: "sentinel",
},
},
}, bokchoy.WithInitialize(false))
_, err = bokchoy.New(ctx, bokchoy.Config{
Broker: bokchoy.BrokerConfig{
Type: "redis",
Redis: bokchoy.RedisConfig{
Type: "cluster",
},
},
}, bokchoy.WithInitialize(false))

is.NoError(err)
}
20 changes: 18 additions & 2 deletions consumer_test.go
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/thoas/bokchoy"
)

type noopconsumer struct {
ticker chan struct{}
}

func (c noopconsumer) Consume(r *bokchoy.Request) error {
return nil
}

func TestConsumer_Consume(t *testing.T) {
run(t, func(t *testing.T, s *suite) {
is := assert.New(t)
Expand All @@ -22,17 +30,24 @@ func TestConsumer_Consume(t *testing.T) {
queue.SubscribeFunc(func(r *bokchoy.Request) error {
time.Sleep(time.Millisecond * 500)

r.Task.Result = r.Task.Payload

ticker <- struct{}{}

return nil
})
}, bokchoy.WithConcurrency(1))
consumer := &noopconsumer{}
queue.OnStart(consumer).
OnComplete(consumer).
OnFailure(consumer).
OnSuccess(consumer)

go func() {
err := s.bokchoy.Run(ctx)
is.NoError(err)
}()

task, err := queue.Publish(ctx, "world")
task, err := queue.Publish(ctx, "world", bokchoy.WithSerializer(&bokchoy.JSONSerializer{}))
is.NoError(err)

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand All @@ -50,6 +65,7 @@ func TestConsumer_Consume(t *testing.T) {
task, err = queue.Get(ctx, task.ID)
is.NoError(err)
is.True(task.IsStatusSucceeded())
is.Equal(task.Payload, task.Result)

is.Equal(fmt.Sprintf("%.1f", task.ExecTime), "0.5")
})
Expand Down
35 changes: 33 additions & 2 deletions logging/logging.go
Expand Up @@ -8,9 +8,16 @@ import (
"go.uber.org/zap/zapcore"
)

// DefaultLogger is the default logger.
var DefaultLogger = NewNopLogger()

// Field is the logger field.
type Field = zapcore.Field

// ObjectEncoder is the logger representation of a structure.
type ObjectEncoder = zapcore.ObjectEncoder

// Logger is the standart logger interface.
type Logger interface {
Panic(context.Context, string, ...Field)
Info(context.Context, string, ...Field)
Expand All @@ -20,38 +27,47 @@ type Logger interface {
With(fields ...Field) Logger
}

// String appends a string field.
func String(k, v string) Field {
return zap.String(k, v)
}

// Duration appends a duration field.
func Duration(k string, d time.Duration) Field {
return zap.Duration(k, d)
}

// Float64 appends a float64 field.
func Float64(key string, val float64) Field {
return zap.Float64(key, val)
}

// Time appends a time field.
func Time(key string, val time.Time) Field {
return zap.Time(key, val)
}

// Int appends an int field.
func Int(k string, i int) Field {
return zap.Int(k, i)
}

// Int64 appends an int64 field.
func Int64(k string, i int64) Field {
return zap.Int64(k, i)
}

// Error appends an error field.
func Error(v error) Field {
return zap.Error(v)
}

// Object appends an object field with implements ObjectMarshaler interface.
func Object(key string, val zapcore.ObjectMarshaler) Field {
return zap.Object(key, val)
}

// NewProductionLogger initializes a production logger.
func NewProductionLogger() (Logger, error) {
logger, err := zap.NewProduction()
if err != nil {
Expand All @@ -61,6 +77,7 @@ func NewProductionLogger() (Logger, error) {
return &wrapLogger{logger}, nil
}

// NewDevelopmentLogger initializes a development logger.
func NewDevelopmentLogger() (Logger, error) {
logger, err := zap.NewDevelopment()
if err != nil {
Expand All @@ -70,30 +87,44 @@ func NewDevelopmentLogger() (Logger, error) {
return &wrapLogger{logger}, nil
}

func NewNopLogger() (Logger, error) {
return &wrapLogger{zap.NewNop()}, nil
// NewNopLogger initializes a noop logger.
func NewNopLogger() Logger {
return &wrapLogger{zap.NewNop()}
}

type wrapLogger struct {
*zap.Logger
}

// With creates a child logger and adds structured context to it. Fields added
// to the child don't affect the parent, and vice versa.
func (l wrapLogger) With(fields ...Field) Logger {
return &wrapLogger{l.Logger.With(fields...)}
}

// Panic logs a message at PanicLevel. The message includes any fields passed
// at the log site, as well as any fields accumulated on the logger.
//
// The logger then panics, even if logging at PanicLevel is disabled.
func (l wrapLogger) Panic(ctx context.Context, msg string, fields ...Field) {
l.Logger.Panic(msg, fields...)
}

// Info logs a message at InfoLevel. The message includes any fields passed
// at the log site, as well as any fields accumulated on the logger.
func (l wrapLogger) Info(ctx context.Context, msg string, fields ...Field) {
l.Logger.Info(msg, fields...)

}

// Error logs a message at ErrorLevel. The message includes any fields passed
// at the log site, as well as any fields accumulated on the logger.
func (l wrapLogger) Error(ctx context.Context, msg string, fields ...Field) {
l.Logger.Error(msg, fields...)
}

// Debug logs a message at DebugLevel. The message includes any fields passed
// at the log site, as well as any fields accumulated on the logger.
func (l wrapLogger) Debug(ctx context.Context, msg string, fields ...Field) {
l.Logger.Debug(msg, fields...)
}
29 changes: 23 additions & 6 deletions options.go
Expand Up @@ -17,16 +17,26 @@ type Options struct {
Timeout time.Duration
RetryIntervals []time.Duration
Serializer Serializer
Initialize bool
}

func newOptions() *Options {
return &Options{
Concurrency: defaultConcurrency,
MaxRetries: defaultMaxRetries,
TTL: defaultTTL,
Timeout: defaultTimeout,
RetryIntervals: defaultRetryIntervals,
opts := &Options{}

options := []Option{
WithConcurrency(defaultConcurrency),
WithMaxRetries(defaultMaxRetries),
WithTTL(defaultTTL),
WithTimeout(defaultTimeout),
WithRetryIntervals(defaultRetryIntervals),
WithInitialize(true),
}

for i := range options {
options[i](opts)
}

return opts
}

// Option is an option unit.
Expand All @@ -39,6 +49,13 @@ func WithSerializer(serializer Serializer) Option {
}
}

// WithInitialize defines if the broker needs to be initialized.
func WithInitialize(initialize bool) Option {
return func(opts *Options) {
opts.Initialize = initialize
}
}

// WithTracer defines the Tracer.
func WithTracer(tracer Tracer) Option {
return func(opts *Options) {
Expand Down
5 changes: 3 additions & 2 deletions queue.go
Expand Up @@ -325,7 +325,8 @@ func (q *Queue) consume(ctx context.Context, name string, prefix string, eta tim
return tasks, nil
}

func (q *Queue) consumer() *consumer {
// Consumer returns a random consumer.
func (q *Queue) Consumer() *consumer {
rand.Seed(time.Now().Unix())

n := rand.Int() % len(q.consumers)
Expand Down Expand Up @@ -378,7 +379,7 @@ func (q *Queue) fireEvents(r *Request) error {

// HandleRequest handles a request synchronously with a consumer.
func (q *Queue) HandleRequest(ctx context.Context, r *Request) error {
consumer := q.consumer()
consumer := q.Consumer()

return consumer.Consume(r)
}
Expand Down

0 comments on commit 65c4bf9

Please sign in to comment.