Skip to content

Commit

Permalink
chore: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
thoas committed Jul 7, 2019
1 parent 8e66fda commit d1fb48c
Show file tree
Hide file tree
Showing 26 changed files with 2,917 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
test:
go test -v -p 1
180 changes: 180 additions & 0 deletions bokchoy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package bokchoy

import (
"context"
"fmt"
"sync"

"github.com/thoas/bokchoy/logging"

"github.com/pkg/errors"
)

// Bokchoy is the main object which stores all configuration, queues
// and broker.
type Bokchoy struct {
logger logging.Logger
tracer Tracer
serializer Serializer
cfg Config
queue *Queue
wg *sync.WaitGroup
defaultOptions *Options
broker Broker
queues map[string]*Queue
middlewares []Subscriber
}

// New initializes a new Bokchoy instance.
func New(ctx context.Context, cfg Config, options ...Option) (*Bokchoy, error) {
opts := newOptions()
for i := range options {
options[i](opts)
}

var (
err error
tracer Tracer
)

logger, _ := logging.NewNopLogger()
if opts.Logger != nil {
logger = opts.Logger
}

if opts.Tracer == nil {
tracer = NewTracerLogger(logger)
}

bok := &Bokchoy{
cfg: cfg,
serializer: newSerializer(cfg.Serializer),
queues: make(map[string]*Queue),
wg: &sync.WaitGroup{},
logger: logger,
tracer: tracer,
defaultOptions: opts,
}

bok.broker, err = newBroker(ctx, cfg.Broker, logger.With(logging.String("component", "broker")))
if err != nil {
return nil, errors.Wrap(err, "unable to initialize broker")
}

for i := range cfg.Queues {
bok.Queue(cfg.Queues[i].Name)
}

return bok, nil
}

func (b *Bokchoy) Use(sub Subscriber) *Bokchoy {
b.middlewares = append(b.middlewares, sub)

return b
}

// Empty empties initialized queues.
func (b *Bokchoy) Empty(ctx context.Context) error {
for i := range b.queues {
err := b.queues[i].Empty(ctx)
if err != nil {
return err
}
}

return nil
}

// Flush flushes data of the entire system.
func (b *Bokchoy) Flush() error {
return b.broker.Flush()
}

// Queue gets or creates a new queue.
func (b *Bokchoy) Queue(name string) *Queue {
queue, ok := b.queues[name]
if !ok {
queue = &Queue{
name: name,
broker: b.broker,
serializer: b.serializer,
logger: b.logger.With(logging.String("component", "queue")),
tracer: b.tracer,
wg: b.wg,
defaultOptions: b.defaultOptions,
middlewares: b.middlewares,
}

b.queues[name] = queue
}

return queue
}

// Stop stops all queues and consumers.
func (b *Bokchoy) Stop(ctx context.Context) {
fields := make([]logging.Field, len(b.queues))
for i, name := range b.QueueNames() {
fields[i] = logging.String(fmt.Sprintf("queue_%d", i), name)
}

b.logger.Debug(ctx, "Stopping queues...", fields...)

for i := range b.queues {
b.queues[i].stop(ctx)
}

b.logger.Debug(ctx, "Queues stopped", fields...)
}

// QueueNames returns the managed queue names.
func (b *Bokchoy) QueueNames() []string {
names := make([]string, 0, len(b.queues))

for k := range b.queues {
names = append(names, k)
}

return names
}

// Run runs the system and block the current goroutine.
func (b *Bokchoy) Run(ctx context.Context) error {
err := b.broker.Ping()
if err != nil {
return err
}

fields := make([]logging.Field, len(b.queues))
for i, name := range b.QueueNames() {
fields[i] = logging.String(fmt.Sprintf("queue_%d", i), name)
}

b.logger.Debug(ctx, "Starting queues...", fields...)

for i := range b.queues {
b.queues[i].start(ctx)
}

b.logger.Debug(ctx, "Queues started", fields...)

b.wg.Wait()

return nil
}

// Publish publishes a new payload to a queue.
func (b *Bokchoy) Publish(ctx context.Context, queueName string, payload interface{}, options ...Option) (*Task, error) {
return b.Queue(queueName).Publish(ctx, payload, options...)
}

// Subscribe registers a new subscriber to consume tasks for a queue.
func (b *Bokchoy) Subscribe(queueName string, sub Subscriber, options ...Option) {
b.SubscribeFunc(queueName, sub.Consume)
}

// Subscribe registers a new subscriber function to consume tasks for a queue.
func (b *Bokchoy) SubscribeFunc(queueName string, f SubscriberFunc, options ...Option) {
b.Queue(queueName).SubscribeFunc(f, options...)
}
16 changes: 16 additions & 0 deletions bokchoy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package bokchoy

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBokchoy_Queue(t *testing.T) {
run(t, func(t *testing.T, s *suite) {
is := assert.New(t)
queue := s.bokchoy.Queue("tests.task.message")
is.NotZero(queue)
is.Equal(queue.name, "tests.task.message")
})
}
52 changes: 52 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package bokchoy

import (
"context"
"time"

"github.com/thoas/bokchoy/logging"
)

type Broker interface {
// Ping pings the redis broker to ensure it's well connected.
Ping() error

// Get returns raw data stored in broker.
Get(string) (map[string]interface{}, error)

// Empty empties a queue.
Empty(string) error

// Flush flushes the entire broker.
Flush() error

// Count returns number of items from a queue name.
Count(string) (int, error)

// Save synchronizes the stored item.
Set(string, map[string]interface{}, time.Duration) error

// Publish publishes raw data.
Publish(string, string, string, map[string]interface{}, time.Time) error

// Consume returns an array of raw data.
Consume(string, string, time.Time) ([]map[string]interface{}, error)
}

// newBroker initializes a new Broker instance.
func newBroker(ctx context.Context, cfg BrokerConfig, logger logging.Logger) (Broker, error) {
var (
broker Broker
err error
)

switch cfg.Type {
default:
broker, err = newRedisBroker(ctx, cfg.Redis, logger)
if err != nil {
return nil, err
}
}

return broker, err
}
Loading

0 comments on commit d1fb48c

Please sign in to comment.