Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
bbbeac7
Merge pull request #69 from wework/rhinof-patch-2
May 7, 2019
c875004
added a test for health prob and fixed a minor race condition in worker
May 7, 2019
592287c
fixed builder to set default bolicies when building the bus and chang…
May 7, 2019
a3069de
improved deadlettering test
May 7, 2019
665d2d1
Merge pull request #71 from wework/coverage
May 8, 2019
905f904
added tests for protobuf serlializer
May 9, 2019
9e5407b
added tests for protobuf serializer error scenarios
May 9, 2019
c1040a5
added more error testing for proto serializer
May 9, 2019
5490d3d
fixed validation in proto serialization error tests
May 9, 2019
11dc2d7
Merge pull request #72 from wework/coverage
May 12, 2019
6d1480d
fixing a bug where saga configuration functions were run only when sa…
May 13, 2019
3470c20
Merge pull request #75 from wework/sagainit
vladshub May 13, 2019
fc06b81
Update SAGA.md
May 15, 2019
526db83
Merge pull request #76 from wework/adiweiss-patch-1
May 16, 2019
a019efb
increasing test timeouts to 60s
May 16, 2019
4dfb283
Merge pull request #77 from wework/rhinof-patch-1
May 16, 2019
212181d
first commit towards supporting persistent timeouts
May 19, 2019
e45a8a1
a timed out saga only gets deleted if it is complete
May 19, 2019
dd62b43
deleteing the saga instance after timeout only if it is complete
May 19, 2019
d05664f
removing unused field in saga instance (linting error)
May 19, 2019
2d190e2
Merge branch 'v1.x' into fixsagatimeout
May 19, 2019
8d9296e
Merge pull request #78 from wework/fixsagatimeout
May 19, 2019
80be1cc
Update SAGA.md
May 22, 2019
853a7d6
Merge pull request #79 from wework/adiweiss-fix-doc
May 22, 2019
bcb1974
fixing a bug in which all handler retires executed within the same
Jun 5, 2019
382bf3f
Merge pull request #82 from wework/fixtx
Jun 5, 2019
bad2ebf
making the retry num configurable and more random apart
avigailberger Jun 5, 2019
4d54fc7
Merge branch 'v1.x' into allow-configuring-max-retries
avigailberger Jun 5, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
command: go get -v -t -d ./...
- run:
name: Run tests
command: go test -v -covermode=count -coverprofile=coverage.out -coverpkg=./... -timeout 30s ./...
command: go test -v -covermode=count -coverprofile=coverage.out -coverpkg=./... -timeout 60s ./...
- run:
name: Report Coverage
command: |
Expand Down
6 changes: 3 additions & 3 deletions docs/SAGA.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,21 @@ In order to define a timeout for the saga and have grabbit call the saga instanc
```go
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Timeout(invocation Invocation, message *BusMessage) error
Timeout(tx *sql.Tx, bus Messaging) error
}
```

So in order to fulfill our requerment we will need to add the following to our saga

```go

func (s *BookVacationSaga) RequestTimeout() time.Duration {
func (s *BookVacationSaga) TimeoutDuration() time.Duration {
//request to timeout if after 15 minutes the saga is not complete
return time.Minute * 15
}

func (s *BookVacationSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error {
return invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{}))
return bus.Publish(context.Background(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{}))
}

```
Expand Down
5 changes: 4 additions & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type RegisterDeadletterHandler interface {
//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Timeout(invocation Invocation, message *BusMessage) error
Timeout(tx *sql.Tx, bus Messaging) error
}

//SagaConfFn is a function to allow configuration of a saga in the context of the gbus
Expand Down Expand Up @@ -166,6 +166,9 @@ type Builder interface {
//ConfigureHealthCheck defines the default timeout in seconds for the db ping check
ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder

//RetriesNum defines the number of retries upon error
RetriesNum(retries uint) Builder

//Build the bus
Build(svcName string) Bus
}
Expand Down
9 changes: 7 additions & 2 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
RPCHandlers: make(map[string]gbus.MessageHandler),
Serializer: builder.serializer,
DLX: builder.dlx,
DefaultPolicies: make([]gbus.MessagePolicy, 0),
DefaultPolicies: builder.defaultPolicies,
DbPingTimeout: 3}

gb.Confirm = builder.confirm
Expand Down Expand Up @@ -99,7 +99,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
panic(err)
}
}
gb.Glue = saga.NewGlue(gb, sagaStore, svcName)
gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider)
return gb
}

Expand Down Expand Up @@ -170,6 +170,11 @@ func (builder *defaultBuilder) ConfigureHealthCheck(timeoutInSeconds time.Durati
return builder
}

func (builder *defaultBuilder) RetriesNum(retries uint) gbus.Builder {
gbus.MaxRetryCount = retries
return builder
}

//New :)
func New() Nu {
return Nu{}
Expand Down
25 changes: 12 additions & 13 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@ type DefaultBus struct {
Confirm bool
healthChan chan error
backpreasure bool
rabbitFailure bool
DbPingTimeout time.Duration
amqpConnected bool
}

var (
//TODO: Replace constants with configuration

//MaxRetryCount defines the max times a retry can run
//MaxRetryCount defines the max times a retry can run.
//Default is 3 but it is configurable
MaxRetryCount uint = 3
//RpcHeaderName used to define the header in grabbit for RPC
RpcHeaderName = "x-grabbit-msg-rpc-id"
Expand Down Expand Up @@ -257,7 +256,7 @@ func (b *DefaultBus) Start() error {
//start monitoring on amqp related errors
go b.monitorAMQPErrors()
//start consuming messags from service queue

b.amqpConnected = true
return nil
}

Expand Down Expand Up @@ -293,12 +292,11 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors}
go func() {
err := w.Start()
if err != nil {
log.WithError(err)
}
}()

err := w.Start()
if err != nil {
log.WithError(err).Error("failed to start worker")
}

workers = append(workers, w)
}
Expand All @@ -321,6 +319,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
err := worker.Stop()
if err != nil {
b.log().WithError(err).Error("could not stop worker")
return err
}
}
b.Outgoing.shutdown()
Expand Down Expand Up @@ -359,7 +358,7 @@ func (b *DefaultBus) GetHealth() HealthCard {
return HealthCard{
DbConnected: dbConnected,
RabbitBackPressure: b.backpreasure,
RabbitConnected: !b.rabbitFailure,
RabbitConnected: b.amqpConnected,
}
}

Expand Down Expand Up @@ -577,7 +576,7 @@ func (b *DefaultBus) monitorAMQPErrors() {
}
b.backpreasure = blocked.Active
case amqpErr := <-b.amqpErrors:
b.rabbitFailure = true
b.amqpConnected = false
b.log().WithField("amqp_error", amqpErr).Error("amqp error")
if b.healthChan != nil {
b.healthChan <- amqpErr
Expand Down
10 changes: 9 additions & 1 deletion gbus/saga/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,16 @@ func getFunNameFromHandler(handler gbus.MessageHandler) string {
}

func (sd *Def) newInstance() *Instance {
return NewInstance(sd.sagaType, sd.msgToFunc, sd.sagaConfFns...)
instance := NewInstance(sd.sagaType, sd.msgToFunc)
return sd.configureSaga(instance)
}

func (sd *Def) configureSaga(instance *Instance) *Instance {
saga := instance.UnderlyingInstance
for _, conf := range sd.sagaConfFns {
saga = conf(saga)
}
return instance
}

func (sd *Def) shouldStartNewSaga(message *gbus.BusMessage) bool {
Expand Down
44 changes: 24 additions & 20 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,6 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error {
WithFields(log.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}).
Info("registered saga with messages")

//register on timeout messages
timeoutEtfs, requestsTimeout := saga.(gbus.RequestSagaTimeout)
if requestsTimeout {
timeoutMessage := gbus.SagaTimeoutMessage{}
timeoutMsgName := timeoutMessage.SchemaName()
_ = def.HandleMessage(timeoutMessage, timeoutEtfs.Timeout)
imsm.addMsgNameToDef(timeoutMsgName, def)

}
return nil
}

Expand Down Expand Up @@ -154,13 +145,13 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
e := fmt.Errorf("Warning:Failed message routed with SagaCorrelationID:%v but no saga instance with the same id found ", message.SagaCorrelationID)
return e
}

def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}

return imsm.completeOrUpdateSaga(invocation.Tx(), instance, message)
return imsm.completeOrUpdateSaga(invocation.Tx(), instance)

} else if message.Semantics == gbus.CMD {
e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name())
Expand All @@ -176,12 +167,12 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
imsm.log().WithFields(log.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances")

for _, instance := range instances {

def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance, message)
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance)
if e != nil {
return e
}
Expand All @@ -205,11 +196,9 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
return instance.invoke(exchange, routingKey, sginv, message)
}

func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance, lastMessage *gbus.BusMessage) error {

_, timedOut := lastMessage.Payload.(gbus.SagaTimeoutMessage)
func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {

if instance.isComplete() || timedOut {
if instance.isComplete() {
imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted")

return imsm.sagaStore.DeleteSaga(tx, instance)
Expand All @@ -236,20 +225,35 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler)
}

func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error {

saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID)
if err != nil {
return err
}
timeoutErr := saga.timeout(tx, imsm.bus)
if timeoutErr != nil {
imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
return timeoutErr
}
return imsm.completeOrUpdateSaga(tx, saga)
}

func (imsm *Glue) log() *log.Entry {
return log.WithField("_service", imsm.svcName)
}

//NewGlue creates a new Sagamanager
func NewGlue(bus gbus.Bus, sagaStore Store, svcName string) *Glue {
return &Glue{
func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) *Glue {
g := &Glue{
svcName: svcName,
bus: bus,
sagaDefs: make([]*Def, 0),
lock: &sync.Mutex{},
alreadyRegistred: make(map[string]bool),
msgToDefMap: make(map[string][]*Def),
timeoutManger: TimeoutManager{bus: bus},
sagaStore: sagaStore,
}
g.timeoutManger = TimeoutManager{bus: bus, txp: txp, glue: g}
return g
}
18 changes: 13 additions & 5 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package saga

import (
"database/sql"
"fmt"
"log"
"reflect"
Expand Down Expand Up @@ -77,9 +78,18 @@ func (si *Instance) requestsTimeout() (bool, time.Duration) {
return canTimeout, timeoutDuration
}

//NewInstance create a new instance of a Saga
func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error {

saga, canTimeout := si.UnderlyingInstance.(gbus.RequestSagaTimeout)
if !canTimeout {
return fmt.Errorf("saga instance does not support timeouts")

}
return saga.Timeout(tx, bus)
}

func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance {

func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns ...gbus.SagaConfFn) *Instance {

var newSagaPtr interface{}
if sagaType.Kind() == reflect.Ptr {
Expand All @@ -91,9 +101,7 @@ func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns
saga := newSagaPtr.(gbus.Saga)

newSaga := saga.New()
for _, conf := range confFns {
newSaga = conf(newSaga)
}

//newSagaPtr := reflect.New(sagaType).Elem()
newInstance := &Instance{
ID: xid.New().String(),
Expand Down
38 changes: 28 additions & 10 deletions gbus/saga/timeout.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package saga

import (
"context"
"time"

"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
)

//TimeoutManager manages timeouts for sagas
//TODO:Make it persistent
type TimeoutManager struct {
bus gbus.Bus
bus gbus.Bus
glue *Glue
txp gbus.TxProvider
}

//RequestTimeout requests a timeout from the timeout manager
Expand All @@ -20,13 +20,31 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D
go func(svcName, sagaID string, tm *TimeoutManager) {
c := time.After(duration)
<-c
reuqestTimeout := gbus.SagaTimeoutMessage{
SagaID: sagaID}
msg := gbus.NewBusMessage(reuqestTimeout)
msg.SagaCorrelationID = sagaID
if err := tm.bus.Send(context.Background(), svcName, msg); err != nil {
//TODO: add logger
logrus.WithError(err).Error("could not send timeout to bus")
//TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus
if tm.txp == nil {
tme := tm.glue.timeoutSaga(nil, sagaID)
if tme != nil {
tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed")
}
return
}
tx, txe := tm.txp.New()
if txe != nil {
tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction")
} else {
callErr := tm.glue.timeoutSaga(tx, sagaID)
if callErr != nil {
tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed")
rlbe := tx.Rollback()
if rlbe != nil {
tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction")
}
} else {
cmte := tx.Commit()
if cmte != nil {
tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction")
}
}
}

}(svcName, sagaID, tm)
Expand Down
Loading