Skip to content

Commit

Permalink
Merge branch 'v1.x' into backpressure-typo-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron committed May 30, 2019
2 parents 592852b + 853a7d6 commit 60f76d1
Show file tree
Hide file tree
Showing 17 changed files with 428 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
command: go get -v -t -d ./...
- run:
name: Run tests
command: go test -v -covermode=count -coverprofile=coverage.out -coverpkg=./... -timeout 30s ./...
command: go test -v -covermode=count -coverprofile=coverage.out -coverpkg=./... -timeout 60s ./...
- run:
name: Report Coverage
command: |
Expand Down
6 changes: 3 additions & 3 deletions docs/SAGA.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,21 @@ In order to define a timeout for the saga and have grabbit call the saga instanc
```go
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Timeout(invocation Invocation, message *BusMessage) error
Timeout(tx *sql.Tx, bus Messaging) error
}
```

So in order to fulfill our requerment we will need to add the following to our saga

```go

func (s *BookVacationSaga) RequestTimeout() time.Duration {
func (s *BookVacationSaga) TimeoutDuration() time.Duration {
//request to timeout if after 15 minutes the saga is not complete
return time.Minute * 15
}

func (s *BookVacationSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error {
return invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{}))
return bus.Publish(context.Background(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{}))
}

```
Expand Down
2 changes: 1 addition & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type RegisterDeadletterHandler interface {
//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Timeout(invocation Invocation, message *BusMessage) error
Timeout(tx *sql.Tx, bus Messaging) error
}

//SagaConfFn is a function to allow configuration of a saga in the context of the gbus
Expand Down
4 changes: 2 additions & 2 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
RPCHandlers: make(map[string]gbus.MessageHandler),
Serializer: builder.serializer,
DLX: builder.dlx,
DefaultPolicies: make([]gbus.MessagePolicy, 0),
DefaultPolicies: builder.defaultPolicies,
DbPingTimeout: 3}

gb.Confirm = builder.confirm
Expand Down Expand Up @@ -99,7 +99,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
panic(err)
}
}
gb.Glue = saga.NewGlue(gb, sagaStore, svcName)
gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider)
return gb
}

Expand Down
21 changes: 11 additions & 10 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type DefaultBus struct {
Confirm bool
healthChan chan error
backpressure bool
rabbitFailure bool
DbPingTimeout time.Duration
amqpConnected bool
}

var (
Expand Down Expand Up @@ -257,7 +257,7 @@ func (b *DefaultBus) Start() error {
//start monitoring on amqp related errors
go b.monitorAMQPErrors()
//start consuming messags from service queue

b.amqpConnected = true
return nil
}

Expand Down Expand Up @@ -293,12 +293,11 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors}
go func() {
err := w.Start()
if err != nil {
log.WithError(err)
}
}()

err := w.Start()
if err != nil {
log.WithError(err).Error("failed to start worker")
}

workers = append(workers, w)
}
Expand All @@ -321,6 +320,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
err := worker.Stop()
if err != nil {
b.log().WithError(err).Error("could not stop worker")
return err
}
}
b.Outgoing.shutdown()
Expand Down Expand Up @@ -359,7 +359,8 @@ func (b *DefaultBus) GetHealth() HealthCard {
return HealthCard{
DbConnected: dbConnected,
RabbitBackPressure: b.backpressure,
RabbitConnected: !b.rabbitFailure,
RabbitConnected: b.amqpConnected,

}
}

Expand Down Expand Up @@ -577,7 +578,7 @@ func (b *DefaultBus) monitorAMQPErrors() {
}
b.backpressure = blocked.Active
case amqpErr := <-b.amqpErrors:
b.rabbitFailure = true
b.amqpConnected = false
b.log().WithField("amqp_error", amqpErr).Error("amqp error")
if b.healthChan != nil {
b.healthChan <- amqpErr
Expand Down
10 changes: 9 additions & 1 deletion gbus/saga/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,16 @@ func getFunNameFromHandler(handler gbus.MessageHandler) string {
}

func (sd *Def) newInstance() *Instance {
return NewInstance(sd.sagaType, sd.msgToFunc, sd.sagaConfFns...)
instance := NewInstance(sd.sagaType, sd.msgToFunc)
return sd.configureSaga(instance)
}

func (sd *Def) configureSaga(instance *Instance) *Instance {
saga := instance.UnderlyingInstance
for _, conf := range sd.sagaConfFns {
saga = conf(saga)
}
return instance
}

func (sd *Def) shouldStartNewSaga(message *gbus.BusMessage) bool {
Expand Down
44 changes: 24 additions & 20 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,6 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error {
WithFields(log.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}).
Info("registered saga with messages")

//register on timeout messages
timeoutEtfs, requestsTimeout := saga.(gbus.RequestSagaTimeout)
if requestsTimeout {
timeoutMessage := gbus.SagaTimeoutMessage{}
timeoutMsgName := timeoutMessage.SchemaName()
_ = def.HandleMessage(timeoutMessage, timeoutEtfs.Timeout)
imsm.addMsgNameToDef(timeoutMsgName, def)

}
return nil
}

Expand Down Expand Up @@ -154,13 +145,13 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
e := fmt.Errorf("Warning:Failed message routed with SagaCorrelationID:%v but no saga instance with the same id found ", message.SagaCorrelationID)
return e
}

def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}

return imsm.completeOrUpdateSaga(invocation.Tx(), instance, message)
return imsm.completeOrUpdateSaga(invocation.Tx(), instance)

} else if message.Semantics == gbus.CMD {
e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name())
Expand All @@ -176,12 +167,12 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
imsm.log().WithFields(log.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances")

for _, instance := range instances {

def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance, message)
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance)
if e != nil {
return e
}
Expand All @@ -205,11 +196,9 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
return instance.invoke(exchange, routingKey, sginv, message)
}

func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance, lastMessage *gbus.BusMessage) error {

_, timedOut := lastMessage.Payload.(gbus.SagaTimeoutMessage)
func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {

if instance.isComplete() || timedOut {
if instance.isComplete() {
imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted")

return imsm.sagaStore.DeleteSaga(tx, instance)
Expand All @@ -236,20 +225,35 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler)
}

func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error {

saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID)
if err != nil {
return err
}
timeoutErr := saga.timeout(tx, imsm.bus)
if timeoutErr != nil {
imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
return timeoutErr
}
return imsm.completeOrUpdateSaga(tx, saga)
}

func (imsm *Glue) log() *log.Entry {
return log.WithField("_service", imsm.svcName)
}

//NewGlue creates a new Sagamanager
func NewGlue(bus gbus.Bus, sagaStore Store, svcName string) *Glue {
return &Glue{
func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) *Glue {
g := &Glue{
svcName: svcName,
bus: bus,
sagaDefs: make([]*Def, 0),
lock: &sync.Mutex{},
alreadyRegistred: make(map[string]bool),
msgToDefMap: make(map[string][]*Def),
timeoutManger: TimeoutManager{bus: bus},
sagaStore: sagaStore,
}
g.timeoutManger = TimeoutManager{bus: bus, txp: txp, glue: g}
return g
}
18 changes: 13 additions & 5 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package saga

import (
"database/sql"
"fmt"
"log"
"reflect"
Expand Down Expand Up @@ -77,9 +78,18 @@ func (si *Instance) requestsTimeout() (bool, time.Duration) {
return canTimeout, timeoutDuration
}

//NewInstance create a new instance of a Saga
func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error {

saga, canTimeout := si.UnderlyingInstance.(gbus.RequestSagaTimeout)
if !canTimeout {
return fmt.Errorf("saga instance does not support timeouts")

}
return saga.Timeout(tx, bus)
}

func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance {

func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns ...gbus.SagaConfFn) *Instance {

var newSagaPtr interface{}
if sagaType.Kind() == reflect.Ptr {
Expand All @@ -91,9 +101,7 @@ func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns
saga := newSagaPtr.(gbus.Saga)

newSaga := saga.New()
for _, conf := range confFns {
newSaga = conf(newSaga)
}

//newSagaPtr := reflect.New(sagaType).Elem()
newInstance := &Instance{
ID: xid.New().String(),
Expand Down
38 changes: 28 additions & 10 deletions gbus/saga/timeout.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package saga

import (
"context"
"time"

"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
)

//TimeoutManager manages timeouts for sagas
//TODO:Make it persistent
type TimeoutManager struct {
bus gbus.Bus
bus gbus.Bus
glue *Glue
txp gbus.TxProvider
}

//RequestTimeout requests a timeout from the timeout manager
Expand All @@ -20,13 +20,31 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D
go func(svcName, sagaID string, tm *TimeoutManager) {
c := time.After(duration)
<-c
reuqestTimeout := gbus.SagaTimeoutMessage{
SagaID: sagaID}
msg := gbus.NewBusMessage(reuqestTimeout)
msg.SagaCorrelationID = sagaID
if err := tm.bus.Send(context.Background(), svcName, msg); err != nil {
//TODO: add logger
logrus.WithError(err).Error("could not send timeout to bus")
//TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus
if tm.txp == nil {
tme := tm.glue.timeoutSaga(nil, sagaID)
if tme != nil {
tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed")
}
return
}
tx, txe := tm.txp.New()
if txe != nil {
tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction")
} else {
callErr := tm.glue.timeoutSaga(tx, sagaID)
if callErr != nil {
tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed")
rlbe := tx.Rollback()
if rlbe != nil {
tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction")
}
} else {
cmte := tx.Commit()
if cmte != nil {
tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction")
}
}
}

}(svcName, sagaID, tm)
Expand Down
3 changes: 2 additions & 1 deletion gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type worker struct {
func (worker *worker) Start() error {

worker.log().Info("starting worker")
worker.stop = make(chan bool)
worker.channel.NotifyClose(worker.amqpErrors)

var (
Expand All @@ -62,7 +63,7 @@ func (worker *worker) Start() error {
}
worker.messages = messages
worker.rpcMessages = rpcmsgs
worker.stop = make(chan bool)

go worker.consumeMessages()

return nil
Expand Down
Loading

0 comments on commit 60f76d1

Please sign in to comment.