Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ jobs:
RABBITMQ_DEFAULT_USER: rabbitmq
RABBITMQ_DEFAULT_PASS: rabbitmq
RABBITMQ_DEFAULT_VHOST: /
- image: postgres
environment:
POSTGRES_PASSWORD: rhinof
POSTGRES_USER: rhinof
- image: mysql
command: --default-authentication-plugin=mysql_native_password
environment:
Expand All @@ -28,9 +24,6 @@ jobs:
command: wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz
environment:
DOCKERIZE_VERSION: v0.6.1
- run:
name: Wait for pg
command: dockerize -wait tcp://localhost:5432 -timeout 1m
- run:
name: Wait for mysql
command: dockerize -wait tcp://localhost:3306 -timeout 1m
Expand All @@ -48,3 +41,8 @@ jobs:
command: |
go get github.com/mattn/goveralls
goveralls -coverprofile=coverage.out -service=circle-ci -repotoken=$COVERALLS_TOKEN
- run:
name: Run linter
command: |
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin v1.16.0
golangci-lint run --skip-dirs=tests
20 changes: 13 additions & 7 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package builder

import (
"fmt"
"go/types"
"sync"
"time"

Expand All @@ -14,7 +13,6 @@ import (
)

type defaultBuilder struct {
handlers []types.Type
PrefetchCount uint
connStr string
purgeOnStartup bool
Expand All @@ -34,9 +32,11 @@ type defaultBuilder struct {
func (builder *defaultBuilder) Build(svcName string) gbus.Bus {

gb := &gbus.DefaultBus{
AmqpConnStr: builder.connStr,
PrefetchCount: 1,
Outgoing: &gbus.AMQPOutbox{},
AmqpConnStr: builder.connStr,
PrefetchCount: 1,
Outgoing: &gbus.AMQPOutbox{
SvcName: svcName,
},
SvcName: svcName,
PurgeOnStartup: builder.purgeOnStartup,
DelayedSubscriptions: [][]string{},
Expand Down Expand Up @@ -74,7 +74,10 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
gb.TxProvider = mysqltx
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
if builder.purgeOnStartup {
sagaStore.Purge()
err := sagaStore.Purge()
if err != nil {
panic(err)
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)

Expand All @@ -91,7 +94,10 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}

if builder.purgeOnStartup {
sagaStore.Purge()
err := sagaStore.Purge()
if err != nil {
panic(err)
}
}
gb.Glue = saga.NewGlue(gb, sagaStore, svcName)
return gb
Expand Down
71 changes: 55 additions & 16 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ type DefaultBus struct {

RPCHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
msgs <-chan amqp.Delivery
rpcMsgs <-chan amqp.Delivery
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
Expand Down Expand Up @@ -118,17 +116,25 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
return q, e
}

func (b *DefaultBus) bindServiceQueue() {
func (b *DefaultBus) bindServiceQueue() error {

if b.deadletterHandler != nil && b.DLX != "" {
b.AMQPChannel.ExchangeDeclare(b.DLX, /*name*/
err := b.AMQPChannel.ExchangeDeclare(b.DLX, /*name*/
"fanout", /*kind*/
true, /*durable*/
false, /*autoDelete*/
false, /*internal*/
false, /*noWait*/
nil /*args amqp.Table*/)
b.bindQueue("", b.DLX)
if err != nil {
b.log().WithError(err).Error("could not declare exchange")
return err
}
err = b.bindQueue("", b.DLX)
if err != nil {
b.log().WithError(err).Error("could not bind exchange")
return err
}
}
for _, subscription := range b.DelayedSubscriptions {
topic := subscription[0]
Expand All @@ -142,13 +148,16 @@ func (b *DefaultBus) bindServiceQueue() {
nil /*args amqp.Table*/)
if e != nil {
b.log().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange")
return e
} else {
e = b.bindQueue(topic, exchange)
if e != nil {
b.log().WithError(e).WithFields(log.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange")
return e
}
}
}
return nil
}

func (b *DefaultBus) createAMQPChannel(conn *amqp.Connection) (*amqp.Channel, error) {
Expand Down Expand Up @@ -184,7 +193,7 @@ func (b *DefaultBus) Start() error {
//TODO:Figure out what should be done

//init the outbox that sends the messages to the amqp transport and handles publisher confirms
if b.Outgoing.init(b.outAMQPChannel, b.Confirm, true); e != nil {
if e := b.Outgoing.init(b.outAMQPChannel, b.Confirm, true); e != nil {
return e
}
/*
Expand All @@ -199,8 +208,14 @@ func (b *DefaultBus) Start() error {
return e
}
amqpChan.NotifyClose(b.amqpErrors)
amqpOutbox := &AMQPOutbox{}
amqpOutbox.init(amqpChan, b.Confirm, false)
amqpOutbox := &AMQPOutbox{
SvcName: b.SvcName,
}
err := amqpOutbox.init(amqpChan, b.Confirm, false)
if err != nil {
b.log().WithError(err).Error("failed initializing amqpOutbox")
return err
}
if startErr := b.Outbox.Start(amqpOutbox); startErr != nil {
b.log().WithError(startErr).Error("failed to start transactional outbox")
return startErr
Expand All @@ -216,7 +231,11 @@ func (b *DefaultBus) Start() error {
b.serviceQueue = q

//bind queue
b.bindServiceQueue()
err := b.bindServiceQueue()
if err != nil {
b.log().WithError(err).Error("could not bind service to queue")
return err
}

//declare rpc queue

Expand Down Expand Up @@ -273,7 +292,12 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors}
go w.Start()
go func() {
err := w.Start()
if err != nil {
log.WithError(err)
}
}()

workers = append(workers, w)
}
Expand All @@ -293,15 +317,22 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
}()

for _, worker := range b.workers {
worker.Stop()
err := worker.Stop()
if err != nil {
b.log().WithError(err).Error("could not stop worker")
}
}

b.Outgoing.shutdown()
b.started = false

if b.IsTxnl {
b.TxProvider.Dispose()
b.Outbox.Stop()
err := b.Outbox.Stop()
if err != nil {
b.log().WithError(err).Error("could not shutdown outbox")
return err
}
}
return nil
}
Expand Down Expand Up @@ -366,10 +397,14 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er
*/
if b.IsTxnl && shouldCommitTx {
if actionErr != nil {
activeTx.Rollback()
err := activeTx.Rollback()
if err != nil {
b.log().WithError(err).Error("could not rollback transaction")
}
} else {
commitErr := activeTx.Commit()
if commitErr != nil {
b.log().WithError(commitErr).Error("could not commit transaction")
return commitErr
}
}
Expand Down Expand Up @@ -406,7 +441,11 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu
rpcID: rpcID}

b.Serializer.Register(reply.Payload)
b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc)
err := b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc)
if err != nil {
b.log().WithError(err).Error("could not send message")
return nil, err
}

//wait for reply or timeout
select {
Expand Down Expand Up @@ -549,10 +588,10 @@ func (b *DefaultBus) monitorAMQPErrors() {
}
}

func (b *DefaultBus) sendImpl(ctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) {
func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) {
b.SenderLock.Lock()
defer b.SenderLock.Unlock()
span, ctx := opentracing.StartSpanFromContext(ctx, "sendImpl")
span, _ := opentracing.StartSpanFromContext(sctx, "sendImpl")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the wrapped context (the one that is returned from the StartSpanFromContext method) be used ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use it anywhere down the road...

defer func() {
if err := recover(); err != nil {
errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack())
Expand Down
16 changes: 12 additions & 4 deletions gbus/outbox.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package gbus

import (
"log"
"sync"

log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

Expand All @@ -19,6 +19,7 @@ type AMQPOutbox struct {
locker *sync.Mutex
pending map[uint64]pendingConfirmation
stop chan bool
SvcName string
}

func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) error {
Expand Down Expand Up @@ -96,23 +97,26 @@ func (out *AMQPOutbox) confirmationLoop() {
out.locker.Lock()
pending := out.pending[ack]
if pending.deliveryTag > 0 {
log.Printf("ack received for a pending delivery with tag %v", ack)
out.log().WithField("tag", ack).Debug("ack received for a pending delivery with tag")
}
delete(out.pending, ack)
out.locker.Unlock()
case nack := <-out.nack:
if nack <= 0 {
continue
}
log.Printf("nack received for delivery tag %v", nack)
out.log().WithField("tag", nack).Debug("nack received for a pending delivery with tag")
out.locker.Lock()
pending := out.pending[nack]
pending.deliveryTag = nack
out.resends <- pending
delete(out.pending, nack)
out.locker.Unlock()
case resend := <-out.resends:
out.Post(resend.exchange, resend.routingKey, resend.amqpMessage)
_, err := out.Post(resend.exchange, resend.routingKey, resend.amqpMessage)
if err != nil {
out.log().WithError(err).Error("could not post message to exchange")
}
}
}
}
Expand All @@ -131,6 +135,10 @@ func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64) {
out.channel.NotifyConfirm(ack, nack)
}

func (out *AMQPOutbox) log() *log.Entry {
return log.WithField("_service", out.SvcName)
}

type pendingConfirmation struct {
deliveryTag uint64
exchange string
Expand Down
2 changes: 0 additions & 2 deletions gbus/saga/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ type Def struct {
sagaType reflect.Type
startedBy []string
lock *sync.Mutex
instances []*Instance
sagaConfFns []gbus.SagaConfFn
msgToFunc []*MsgToFuncPair
msgHandler gbus.MessageHandler
}

//HandleMessage implements HandlerRegister interface
Expand Down
19 changes: 9 additions & 10 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ var _ gbus.SagaRegister = &Glue{}

//Glue ties the incoming messages from the Bus with the needed Saga instances
type Glue struct {
svcName string
bus gbus.Bus
sagaDefs []*Def
lock *sync.Mutex
alreadyRegistred map[string]bool
msgToDefMap map[string][]*Def
sagaStore Store
timeoutManger TimeoutManager
subscribedOnTimeouts bool
svcName string
bus gbus.Bus
sagaDefs []*Def
lock *sync.Mutex
alreadyRegistred map[string]bool
msgToDefMap map[string][]*Def
sagaStore Store
timeoutManger TimeoutManager
}

func (imsm *Glue) isSagaAlreadyRegistered(sagaType reflect.Type) bool {
Expand Down Expand Up @@ -139,7 +138,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
return e
}

if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout == true {
if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout {
imsm.log().WithFields(log.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout")
imsm.timeoutManger.RequestTimeout(imsm.svcName, newInstance.ID, duration)
}
Expand Down
2 changes: 1 addition & 1 deletion gbus/saga/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati
returns := method.Call(params)

val := returns[0]
if val.IsNil() == false {
if !val.IsNil() {
return val.Interface().(error)
}

Expand Down
2 changes: 1 addition & 1 deletion gbus/saga/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bo

message.CorrelationID = si.inboundMsg.ID

if isEvent == false {
if !isEvent {
//support saga-to-saga communication
if si.inboundMsg.SagaID != "" {
message.SagaCorrelationID = message.SagaID
Expand Down
9 changes: 0 additions & 9 deletions gbus/serialization/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,6 @@ func (as *Avro) RegisterAvroMessage(schemaName, namespace, schema string, obj Av
return
}

//getSchema get schema id from schema-registry service
func (as *Avro) getSchema(id int) (codec *goavro.Codec, err error) {
codec, err = as.schemaRegistryClient.GetSchema(id)
if err != nil {
return nil, err
}
return codec, nil
}

//registerOrGetSchemaID get schema id from schema-registry service
func (as *Avro) registerOrGetSchemaID(topic string, avroCodec *goavro.Codec) (schemaID int, err error) {
schemaID = 0
Expand Down
Loading