Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first commit towards supporting persistent timeouts #78

Merged
merged 5 commits into from
May 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type RegisterDeadletterHandler interface {
//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Timeout(invocation Invocation, message *BusMessage) error
Timeout(tx *sql.Tx, bus Messaging) error
}

//SagaConfFn is a function to allow configuration of a saga in the context of the gbus
Expand Down
2 changes: 1 addition & 1 deletion gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
panic(err)
}
}
gb.Glue = saga.NewGlue(gb, sagaStore, svcName)
gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider)
return gb
}

Expand Down
40 changes: 22 additions & 18 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,6 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error {
WithFields(log.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}).
Info("registered saga with messages")

//register on timeout messages
timeoutEtfs, requestsTimeout := saga.(gbus.RequestSagaTimeout)
if requestsTimeout {
timeoutMessage := gbus.SagaTimeoutMessage{}
timeoutMsgName := timeoutMessage.SchemaName()
_ = def.HandleMessage(timeoutMessage, timeoutEtfs.Timeout)
imsm.addMsgNameToDef(timeoutMsgName, def)

}
return nil
}

Expand Down Expand Up @@ -160,7 +151,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
return invkErr
}

return imsm.completeOrUpdateSaga(invocation.Tx(), instance, message)
return imsm.completeOrUpdateSaga(invocation.Tx(), instance)

} else if message.Semantics == gbus.CMD {
e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name())
Expand All @@ -181,7 +172,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance, message)
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance)
if e != nil {
return e
}
Expand All @@ -205,11 +196,9 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
return instance.invoke(exchange, routingKey, sginv, message)
}

func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance, lastMessage *gbus.BusMessage) error {
func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {

_, timedOut := lastMessage.Payload.(gbus.SagaTimeoutMessage)

if instance.isComplete() || timedOut {
if instance.isComplete() {
imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted")

return imsm.sagaStore.DeleteSaga(tx, instance)
Expand All @@ -236,20 +225,35 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler)
}

func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error {

saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID)
if err != nil {
return err
}
timeoutErr := saga.timeout(tx, imsm.bus)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check out if the saga can time out and only then check if it has timed out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already encapsulated in the Instance. a call to timeout on an instance that does not support timeouts returns an error.

if timeoutErr != nil {
imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
return timeoutErr
}
return imsm.completeOrUpdateSaga(tx, saga)
}

func (imsm *Glue) log() *log.Entry {
return log.WithField("_service", imsm.svcName)
}

//NewGlue creates a new Sagamanager
func NewGlue(bus gbus.Bus, sagaStore Store, svcName string) *Glue {
return &Glue{
func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) *Glue {
g := &Glue{
svcName: svcName,
bus: bus,
sagaDefs: make([]*Def, 0),
lock: &sync.Mutex{},
alreadyRegistred: make(map[string]bool),
msgToDefMap: make(map[string][]*Def),
timeoutManger: TimeoutManager{bus: bus},
sagaStore: sagaStore,
}
g.timeoutManger = TimeoutManager{bus: bus, txp: txp, glue: g}
return g
}
12 changes: 11 additions & 1 deletion gbus/saga/instance.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package saga

import (
"database/sql"
"fmt"
"log"
"reflect"
Expand Down Expand Up @@ -77,10 +78,19 @@ func (si *Instance) requestsTimeout() (bool, time.Duration) {
return canTimeout, timeoutDuration
}

//NewInstance create a new instance of a Saga
func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error {

saga, canTimeout := si.UnderlyingInstance.(gbus.RequestSagaTimeout)
if !canTimeout {
return fmt.Errorf("saga instance does not support timeouts")

}
return saga.Timeout(tx, bus)
}

func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance {


var newSagaPtr interface{}
if sagaType.Kind() == reflect.Ptr {
newSagaPtr = reflect.New(sagaType).Elem().Interface()
Expand Down
38 changes: 28 additions & 10 deletions gbus/saga/timeout.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package saga

import (
"context"
"time"

"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
)

//TimeoutManager manages timeouts for sagas
//TODO:Make it persistent
type TimeoutManager struct {
bus gbus.Bus
bus gbus.Bus
glue *Glue
txp gbus.TxProvider
}

//RequestTimeout requests a timeout from the timeout manager
Expand All @@ -20,13 +20,31 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D
go func(svcName, sagaID string, tm *TimeoutManager) {
c := time.After(duration)
<-c
reuqestTimeout := gbus.SagaTimeoutMessage{
SagaID: sagaID}
msg := gbus.NewBusMessage(reuqestTimeout)
msg.SagaCorrelationID = sagaID
if err := tm.bus.Send(context.Background(), svcName, msg); err != nil {
//TODO: add logger
logrus.WithError(err).Error("could not send timeout to bus")
//TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus
if tm.txp == nil {
tme := tm.glue.timeoutSaga(nil, sagaID)
if tme != nil {
tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed")
}
return
}
tx, txe := tm.txp.New()
if txe != nil {
tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction")
} else {
callErr := tm.glue.timeoutSaga(tx, sagaID)
if callErr != nil {
tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed")
rlbe := tx.Rollback()
if rlbe != nil {
tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction")
}
} else {
cmte := tx.Commit()
if cmte != nil {
tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction")
}
}
}

}(svcName, sagaID, tm)
Expand Down
7 changes: 3 additions & 4 deletions tests/saga_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tests

import (
"context"
"database/sql"
"log"
"reflect"
"testing"
Expand Down Expand Up @@ -431,11 +432,9 @@ func (s *TimingOutSaga) TimeoutDuration() time.Duration {
return time.Second * 1
}

func (s *TimingOutSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error {
func (s *TimingOutSaga) Timeout(tx *sql.Tx, bus gbus.Messaging) error {
s.TimedOut = true
return invocation.Bus().Publish(noopTraceContext(), "test_exchange", "some.topic.1", gbus.NewBusMessage(Event1{
Data: "TimingOutSaga.Timeout",
}))
return bus.Publish(context.Background(), "test_exchange", "some.topic.1", gbus.NewBusMessage(Event1{}))
}

type SelfSendingSaga struct {
Expand Down