diff --git a/Makefile b/Makefile index 7f9d1cf..d6c335c 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,5 @@ test: scripts/test.sh + +run-cover: + go tool cover -html=coverage.out diff --git a/bokchoy.go b/bokchoy.go index 1c9d28c..b4a9a7d 100644 --- a/bokchoy.go +++ b/bokchoy.go @@ -37,7 +37,7 @@ func New(ctx context.Context, cfg Config, options ...Option) (*Bokchoy, error) { tracer Tracer ) - logger, _ := logging.NewNopLogger() + logger := logging.NewNopLogger() if opts.Logger != nil { logger = opts.Logger } @@ -60,9 +60,16 @@ func New(ctx context.Context, cfg Config, options ...Option) (*Bokchoy, error) { bok.serializer = opts.Serializer } - bok.broker, err = newBroker(ctx, cfg.Broker, logger.With(logging.String("component", "broker"))) - if err != nil { - return nil, errors.Wrap(err, "unable to initialize broker") + bok.broker = newBroker(ctx, cfg.Broker, + logger.With(logging.String("component", "broker"))) + + if opts.Initialize { + err = bok.broker.Initialize(ctx) + if err != nil { + if err != nil { + return nil, errors.Wrap(err, "unable to initialize broker") + } + } } for i := range cfg.Queues { diff --git a/bokchoy_test.go b/bokchoy_test.go index 1e3b7b0..fc5847e 100644 --- a/bokchoy_test.go +++ b/bokchoy_test.go @@ -14,3 +14,11 @@ func TestBokchoy_Queue(t *testing.T) { is.Equal(queue.Name(), "tests.task.message") }) } + +func TestBokchoy_Flush(t *testing.T) { + run(t, func(t *testing.T, s *suite) { + is := assert.New(t) + err := s.bokchoy.Flush() + is.NoError(err) + }) +} diff --git a/broker.go b/broker.go index 39bd10c..1299a68 100644 --- a/broker.go +++ b/broker.go @@ -9,7 +9,10 @@ import ( // Broker is the common interface to define a Broker. type Broker interface { - // Ping pings the redis broker to ensure it's well connected. + // Initialize initializes the broker. + Initialize(context.Context) error + + // Ping pings the broker to ensure it's well connected. Ping() error // Get returns raw data stored in broker. @@ -35,19 +38,15 @@ type Broker interface { } // newBroker initializes a new Broker instance. -func newBroker(ctx context.Context, cfg BrokerConfig, logger logging.Logger) (Broker, error) { +func newBroker(ctx context.Context, cfg BrokerConfig, logger logging.Logger) Broker { var ( broker Broker - err error ) switch cfg.Type { default: - broker, err = newRedisBroker(ctx, cfg.Redis, logger) - if err != nil { - return nil, err - } + broker = newRedisBroker(ctx, cfg.Redis, logger) } - return broker, err + return broker } diff --git a/broker_redis.go b/broker_redis.go index 4a61c55..b2fa44f 100644 --- a/broker_redis.go +++ b/broker_redis.go @@ -71,7 +71,7 @@ return cjson.encode(data)`, } // newRedisBroker initializes a new redis client. -func newRedisBroker(ctx context.Context, cfg RedisConfig, logger logging.Logger) (*redisBroker, error) { +func newRedisBroker(ctx context.Context, cfg RedisConfig, logger logging.Logger) *redisBroker { var clt redisClient switch cfg.Type { @@ -125,21 +125,15 @@ func newRedisBroker(ctx context.Context, cfg RedisConfig, logger logging.Logger) } - b := &redisBroker{ + return &redisBroker{ clt: clt, prefix: cfg.Prefix, logger: logger, } - - err := b.initialize(ctx) - if err != nil { - return nil, err - } - - return b, nil } -func (p *redisBroker) initialize(ctx context.Context) error { +// Initialize initializes the redis broker. +func (p *redisBroker) Initialize(ctx context.Context) error { err := p.clt.Ping().Err() if err != nil { return err diff --git a/broker_test.go b/broker_test.go new file mode 100644 index 0000000..849c825 --- /dev/null +++ b/broker_test.go @@ -0,0 +1,33 @@ +package bokchoy_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/thoas/bokchoy" +) + +func TestBroker_Redis(t *testing.T) { + is := assert.New(t) + ctx := context.Background() + + _, err := bokchoy.New(ctx, bokchoy.Config{ + Broker: bokchoy.BrokerConfig{ + Type: "redis", + Redis: bokchoy.RedisConfig{ + Type: "sentinel", + }, + }, + }, bokchoy.WithInitialize(false)) + _, err = bokchoy.New(ctx, bokchoy.Config{ + Broker: bokchoy.BrokerConfig{ + Type: "redis", + Redis: bokchoy.RedisConfig{ + Type: "cluster", + }, + }, + }, bokchoy.WithInitialize(false)) + + is.NoError(err) +} diff --git a/consumer_test.go b/consumer_test.go index 6e03f15..bbdff7f 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -10,6 +10,14 @@ import ( "github.com/thoas/bokchoy" ) +type noopconsumer struct { + ticker chan struct{} +} + +func (c noopconsumer) Consume(r *bokchoy.Request) error { + return nil +} + func TestConsumer_Consume(t *testing.T) { run(t, func(t *testing.T, s *suite) { is := assert.New(t) @@ -22,17 +30,24 @@ func TestConsumer_Consume(t *testing.T) { queue.SubscribeFunc(func(r *bokchoy.Request) error { time.Sleep(time.Millisecond * 500) + r.Task.Result = r.Task.Payload + ticker <- struct{}{} return nil - }) + }, bokchoy.WithConcurrency(1)) + consumer := &noopconsumer{} + queue.OnStart(consumer). + OnComplete(consumer). + OnFailure(consumer). + OnSuccess(consumer) go func() { err := s.bokchoy.Run(ctx) is.NoError(err) }() - task, err := queue.Publish(ctx, "world") + task, err := queue.Publish(ctx, "world", bokchoy.WithSerializer(&bokchoy.JSONSerializer{})) is.NoError(err) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -50,6 +65,7 @@ func TestConsumer_Consume(t *testing.T) { task, err = queue.Get(ctx, task.ID) is.NoError(err) is.True(task.IsStatusSucceeded()) + is.Equal(task.Payload, task.Result) is.Equal(fmt.Sprintf("%.1f", task.ExecTime), "0.5") }) diff --git a/logging/logging.go b/logging/logging.go index efb6f89..9a30918 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -8,9 +8,16 @@ import ( "go.uber.org/zap/zapcore" ) +// DefaultLogger is the default logger. +var DefaultLogger = NewNopLogger() + +// Field is the logger field. type Field = zapcore.Field + +// ObjectEncoder is the logger representation of a structure. type ObjectEncoder = zapcore.ObjectEncoder +// Logger is the standart logger interface. type Logger interface { Panic(context.Context, string, ...Field) Info(context.Context, string, ...Field) @@ -20,38 +27,47 @@ type Logger interface { With(fields ...Field) Logger } +// String appends a string field. func String(k, v string) Field { return zap.String(k, v) } +// Duration appends a duration field. func Duration(k string, d time.Duration) Field { return zap.Duration(k, d) } +// Float64 appends a float64 field. func Float64(key string, val float64) Field { return zap.Float64(key, val) } +// Time appends a time field. func Time(key string, val time.Time) Field { return zap.Time(key, val) } +// Int appends an int field. func Int(k string, i int) Field { return zap.Int(k, i) } +// Int64 appends an int64 field. func Int64(k string, i int64) Field { return zap.Int64(k, i) } +// Error appends an error field. func Error(v error) Field { return zap.Error(v) } +// Object appends an object field with implements ObjectMarshaler interface. func Object(key string, val zapcore.ObjectMarshaler) Field { return zap.Object(key, val) } +// NewProductionLogger initializes a production logger. func NewProductionLogger() (Logger, error) { logger, err := zap.NewProduction() if err != nil { @@ -61,6 +77,7 @@ func NewProductionLogger() (Logger, error) { return &wrapLogger{logger}, nil } +// NewDevelopmentLogger initializes a development logger. func NewDevelopmentLogger() (Logger, error) { logger, err := zap.NewDevelopment() if err != nil { @@ -70,30 +87,44 @@ func NewDevelopmentLogger() (Logger, error) { return &wrapLogger{logger}, nil } -func NewNopLogger() (Logger, error) { - return &wrapLogger{zap.NewNop()}, nil +// NewNopLogger initializes a noop logger. +func NewNopLogger() Logger { + return &wrapLogger{zap.NewNop()} } type wrapLogger struct { *zap.Logger } +// With creates a child logger and adds structured context to it. Fields added +// to the child don't affect the parent, and vice versa. func (l wrapLogger) With(fields ...Field) Logger { return &wrapLogger{l.Logger.With(fields...)} } +// Panic logs a message at PanicLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. +// +// The logger then panics, even if logging at PanicLevel is disabled. func (l wrapLogger) Panic(ctx context.Context, msg string, fields ...Field) { l.Logger.Panic(msg, fields...) } +// Info logs a message at InfoLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. func (l wrapLogger) Info(ctx context.Context, msg string, fields ...Field) { l.Logger.Info(msg, fields...) } + +// Error logs a message at ErrorLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. func (l wrapLogger) Error(ctx context.Context, msg string, fields ...Field) { l.Logger.Error(msg, fields...) } +// Debug logs a message at DebugLevel. The message includes any fields passed +// at the log site, as well as any fields accumulated on the logger. func (l wrapLogger) Debug(ctx context.Context, msg string, fields ...Field) { l.Logger.Debug(msg, fields...) } diff --git a/options.go b/options.go index 84a47e2..1c8c6f6 100644 --- a/options.go +++ b/options.go @@ -17,16 +17,26 @@ type Options struct { Timeout time.Duration RetryIntervals []time.Duration Serializer Serializer + Initialize bool } func newOptions() *Options { - return &Options{ - Concurrency: defaultConcurrency, - MaxRetries: defaultMaxRetries, - TTL: defaultTTL, - Timeout: defaultTimeout, - RetryIntervals: defaultRetryIntervals, + opts := &Options{} + + options := []Option{ + WithConcurrency(defaultConcurrency), + WithMaxRetries(defaultMaxRetries), + WithTTL(defaultTTL), + WithTimeout(defaultTimeout), + WithRetryIntervals(defaultRetryIntervals), + WithInitialize(true), + } + + for i := range options { + options[i](opts) } + + return opts } // Option is an option unit. @@ -39,6 +49,13 @@ func WithSerializer(serializer Serializer) Option { } } +// WithInitialize defines if the broker needs to be initialized. +func WithInitialize(initialize bool) Option { + return func(opts *Options) { + opts.Initialize = initialize + } +} + // WithTracer defines the Tracer. func WithTracer(tracer Tracer) Option { return func(opts *Options) { diff --git a/queue.go b/queue.go index 433123d..afbf87c 100644 --- a/queue.go +++ b/queue.go @@ -325,7 +325,8 @@ func (q *Queue) consume(ctx context.Context, name string, prefix string, eta tim return tasks, nil } -func (q *Queue) consumer() *consumer { +// Consumer returns a random consumer. +func (q *Queue) Consumer() *consumer { rand.Seed(time.Now().Unix()) n := rand.Int() % len(q.consumers) @@ -378,7 +379,7 @@ func (q *Queue) fireEvents(r *Request) error { // HandleRequest handles a request synchronously with a consumer. func (q *Queue) HandleRequest(ctx context.Context, r *Request) error { - consumer := q.consumer() + consumer := q.Consumer() return consumer.Consume(r) } diff --git a/queue_test.go b/queue_test.go index c0eb995..daa4ff4 100644 --- a/queue_test.go +++ b/queue_test.go @@ -9,6 +9,32 @@ import ( "github.com/thoas/bokchoy" ) +func TestQueue_Consumer(t *testing.T) { + run(t, func(t *testing.T, s *suite) { + is := assert.New(t) + queue := s.bokchoy.Queue("tests.task.message") + queue.SubscribeFunc(func(r *bokchoy.Request) error { + return nil + }) + is.NotZero(queue.Consumer()) + }) +} + +func TestQueue_Cancel(t *testing.T) { + run(t, func(t *testing.T, s *suite) { + is := assert.New(t) + ctx := context.Background() + queue := s.bokchoy.Queue("tests.task.message") + task1, err := queue.Publish(ctx, "hello", bokchoy.WithTTL(10*time.Second)) + is.NotZero(task1) + is.NoError(err) + task2, err := queue.Cancel(ctx, task1.ID) + is.NotZero(task2) + is.NoError(err) + is.True(task2.IsStatusCanceled()) + }) +} + func TestQueue_Save(t *testing.T) { run(t, func(t *testing.T, s *suite) { is := assert.New(t) @@ -70,26 +96,35 @@ func TestQueue_Publish(t *testing.T) { }) } +type consumer struct { + ticker chan struct{} +} + +func (c consumer) Consume(r *bokchoy.Request) error { + c.ticker <- struct{}{} + + return nil +} + func TestQueue_ConsumeDelayed(t *testing.T) { run(t, func(t *testing.T, s *suite) { is := assert.New(t) ctx := context.Background() - queue := s.bokchoy.Queue("tests.task.message") - ticker := make(chan struct{}) + consumer := &consumer{ + ticker: make(chan struct{}), + } - queue.SubscribeFunc(func(r *bokchoy.Request) error { - ticker <- struct{}{} + queueName := "tests.task.message" - return nil - }) + s.bokchoy.Subscribe(queueName, consumer) go func() { err := s.bokchoy.Run(ctx) is.NoError(err) }() - task, err := queue.Publish(ctx, "world", bokchoy.WithCountdown(2*time.Second)) + task, err := s.bokchoy.Publish(ctx, queueName, "world", bokchoy.WithCountdown(2*time.Second)) is.NotZero(task) is.NoError(err) @@ -99,7 +134,7 @@ func TestQueue_ConsumeDelayed(t *testing.T) { select { case <-ctx.Done(): is.True(false) - case <-ticker: + case <-consumer.ticker: is.True(true) } diff --git a/request_test.go b/request_test.go new file mode 100644 index 0000000..75fe6fd --- /dev/null +++ b/request_test.go @@ -0,0 +1,16 @@ +package bokchoy_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/thoas/bokchoy" +) + +func TestRequest_String(t *testing.T) { + is := assert.New(t) + + req := &bokchoy.Request{Task: &bokchoy.Task{}} + is.NotZero(req.String()) +} diff --git a/suite_test.go b/suite_test.go index dba9b6b..05f50e2 100644 --- a/suite_test.go +++ b/suite_test.go @@ -50,10 +50,6 @@ func run(t *testing.T, f FuncTest) { panic(err) } - defer func() { - bok.Stop(ctx) - }() - suite := &suite{bok} f(t, suite) diff --git a/task.go b/task.go index e55e242..e620167 100644 --- a/task.go +++ b/task.go @@ -162,6 +162,15 @@ func TaskFromPayload(data map[string]interface{}, serializer Serializer) (*Task, } } + rawResult, ok := data["result"].(string) + if ok { + err = serializer.Loads([]byte(rawResult), &t.Result) + + if err != nil { + return nil, errors.Wrapf(ErrAttributeError, "cannot unserialize `result`") + } + } + return t, nil } diff --git a/tracer.go b/tracer.go index 6d28d19..16b2195 100644 --- a/tracer.go +++ b/tracer.go @@ -6,6 +6,9 @@ import ( "github.com/thoas/bokchoy/logging" ) +// DefaultTracer is the default tracer. +var DefaultTracer = NewTracerLogger(logging.DefaultLogger) + // Tracer is a component used to trace errors. type Tracer interface { Log(context.Context, string, error) diff --git a/tracer_test.go b/tracer_test.go new file mode 100644 index 0000000..7d17e13 --- /dev/null +++ b/tracer_test.go @@ -0,0 +1,18 @@ +package bokchoy_test + +import ( + "context" + "fmt" + "testing" + + "github.com/thoas/bokchoy" +) + +func TestTracer_Error(t *testing.T) { + var ( + ctx = context.Background() + err = fmt.Errorf("Unexpected error") + ) + + bokchoy.DefaultTracer.Log(ctx, "An error has occured", err) +}