Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persisted timeouts #107

Merged
merged 16 commits into from
Aug 4, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package gbus
import (
"context"
"database/sql"
"github.com/sirupsen/logrus"
"time"

"github.com/sirupsen/logrus"
rhinof marked this conversation as resolved.
Show resolved Hide resolved

"github.com/streadway/amqp"
)

//Semantics reopresents the semantics of a grabbit message
type Semantics string

const (
//CMD represenst a messge with command semantics in grabbit
CMD Semantics = "cmd"
//EVT represenst a messge with event semantics in grabbit
EVT Semantics = "evt"
)

Expand Down Expand Up @@ -142,6 +146,14 @@ type SagaRegister interface {
RegisterSaga(saga Saga, conf ...SagaConfFn) error
}

//SagaGlue glues together all the parts needed in order to orchistrate saga instances
type SagaGlue interface {
rhinof marked this conversation as resolved.
Show resolved Hide resolved
SagaRegister
Logged
Start() error
Stop() error
}

//Builder is the main interface that should be used to create an instance of a Bus
type Builder interface {
PurgeOnStartUp() Builder
Expand Down Expand Up @@ -214,6 +226,21 @@ type TxOutbox interface {
Stop() error
}

//TimeoutManager abstracts the implementation of determining when a saga should be timed out
type TimeoutManager interface {
//RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance
RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error
//ClearTimeout clears a timeout for a specific saga
ClearTimeout(tx *sql.Tx, sagaID string) error
//SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires
SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error)
//Start starts the timeout manager
Start() error
//Stop shuts the timeout manager down
Stop() error
}

//Logged represents a grabbit component that can be logged
type Logged interface {
SetLogger(entry logrus.FieldLogger)
Log() logrus.FieldLogger
Expand Down
13 changes: 10 additions & 3 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package builder

import (
"fmt"
"github.com/sirupsen/logrus"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/saga"
"github.com/wework/grabbit/gbus/saga/stores"
Expand Down Expand Up @@ -68,7 +69,8 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
gb.WorkerNum = builder.workerNum
}
var (
sagaStore saga.Store
sagaStore saga.Store
timeoutManager gbus.TimeoutManager
rhinof marked this conversation as resolved.
Show resolved Hide resolved
)
if builder.txnl {
gb.IsTxnl = true
Expand All @@ -80,6 +82,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
panic(err)
}
gb.TxProvider = mysqltx
//TODO move purge logic into the NewSagaStore factory method
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
if builder.purgeOnStartup {
err := sagaStore.Purge()
Expand All @@ -88,26 +91,30 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)

default:
err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider)
panic(err)
}
} else {
sagaStore = stores.NewInMemoryStore()
timeoutManager = &saga.InMemoryTimeoutManager{}
}

if builder.usingPingTimeout {
gb.DbPingTimeout = builder.dbPingTimeout
}

//TODO move this into the NewSagaStore factory methods
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
}
}
gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider)
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
rhinof marked this conversation as resolved.
Show resolved Hide resolved
gb.Glue = glue
return gb
}

Expand Down
13 changes: 11 additions & 2 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/wework/grabbit/gbus/metrics"
"runtime/debug"
"sync"
"time"

"github.com/wework/grabbit/gbus/metrics"

"github.com/opentracing-contrib/go-amqp/amqptracer"
"github.com/opentracing/opentracing-go"
slog "github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -51,7 +52,7 @@ type DefaultBus struct {
DelayedSubscriptions [][]string
PurgeOnStartup bool
started bool
Glue SagaRegister
Glue SagaGlue
TxProvider TxProvider
IsTxnl bool
WorkerNum uint
Expand Down Expand Up @@ -263,6 +264,10 @@ func (b *DefaultBus) Start() error {

return createWorkersErr
}

if err := b.Glue.Start(); err != nil {
return err
}
b.workers = workers
b.started = true
//start monitoring on amqp related errors
Expand Down Expand Up @@ -335,6 +340,10 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
}
}
b.Outgoing.shutdown()

if err := b.Glue.Stop(); err != nil {
return err
}
b.started = false
if b.IsTxnl {

Expand Down
74 changes: 50 additions & 24 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package saga

import (
"database/sql"
"errors"
"fmt"
"reflect"
"strings"
Expand All @@ -20,18 +21,22 @@ func fqnsFromMessages(objs []gbus.Message) []string {
return fqns
}

var _ gbus.SagaRegister = &Glue{}
//ErrInstanceNotFound is returned by the saga store if a saga lookup by saga id returns no valid instances
var ErrInstanceNotFound = errors.New("saga not be found")

//Glue ties the incoming messages from the Bus with the needed Saga instances
var _ gbus.SagaGlue = &Glue{}

//Glue t/* */ies the incoming messages from the Bus with the needed Saga instances
type Glue struct {
*gbus.Glogged
svcName string
bus gbus.Bus
sagaDefs []*Def
lock *sync.Mutex
alreadyRegistred map[string]bool
msgToDefMap map[string][]*Def
sagaStore Store
timeoutManger TimeoutManager
timeoutManager gbus.TimeoutManager
}

func (imsm *Glue) isSagaAlreadyRegistered(sagaType reflect.Type) bool {
Expand Down Expand Up @@ -71,7 +76,7 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error {
imsm.addMsgNameToDef(msgName, def)
}

imsm.log().
imsm.Log().
WithFields(logrus.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}).
Info("registered saga with messages")

Expand Down Expand Up @@ -112,33 +117,35 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
startNew := def.shouldStartNewSaga(message)
if startNew {
newInstance := def.newInstance()
imsm.log().
imsm.Log().
WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
Info("created new saga")
if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil {
imsm.log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
return invkErr
}

if !newInstance.isComplete() {
imsm.log().WithField("saga_id", newInstance.ID).Info("saving new saga")
imsm.Log().WithField("saga_id", newInstance.ID).Info("saving new saga")

if e := imsm.sagaStore.SaveNewSaga(invocation.Tx(), def.sagaType, newInstance); e != nil {
imsm.log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed")
imsm.Log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed")
return e
}

if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout {
imsm.log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout")
imsm.timeoutManger.RequestTimeout(imsm.svcName, newInstance.ID, duration)
imsm.Log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout")
if tme := imsm.timeoutManager.RegisterTimeout(invocation.Tx(), newInstance.ID, duration); tme != nil {
return tme
}
}
}
return nil
} else if message.SagaCorrelationID != "" {
instance, getErr := imsm.sagaStore.GetSagaByID(invocation.Tx(), message.SagaCorrelationID)

if getErr != nil {
imsm.log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id")
imsm.Log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id")
return getErr
}
if instance == nil {
Expand All @@ -147,7 +154,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
}
def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}

Expand All @@ -158,18 +165,18 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
return e
} else {

imsm.log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type")
imsm.Log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type")
instances, e := imsm.sagaStore.GetSagasByType(invocation.Tx(), def.sagaType)

if e != nil {
return e
}
imsm.log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances")
imsm.Log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances")

for _, instance := range instances {
def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}
e = imsm.completeOrUpdateSaga(invocation.Tx(), instance)
Expand All @@ -192,7 +199,7 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
ctx: invocation.Ctx(),
invokingService: imsm.svcName,
}
sginv.SetLogger(imsm.log().WithFields(logrus.Fields{
sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{
"saga_id": instance.ID,
"saga_type": instance.String(),
"message_name": message.PayloadFQN,
Expand All @@ -205,9 +212,14 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {

if instance.isComplete() {
imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted")
imsm.Log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted")

deleteErr := imsm.sagaStore.DeleteSaga(tx, instance)
if deleteErr != nil {
return deleteErr
}

return imsm.sagaStore.DeleteSaga(tx, instance)
return imsm.timeoutManager.ClearTimeout(tx, instance.ID)

}
return imsm.sagaStore.UpdateSaga(tx, instance)
Expand All @@ -231,26 +243,38 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler)
}

func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error {
//TimeoutSaga fetches a saga instance and calls its timeout interface
func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error {

saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID)
//we are assuming that if the TimeoutSaga has been called but no instance returned from the store the saga
//has been completed already and
if err == ErrInstanceNotFound {
return nil
}
if err != nil {
return err
}
timeoutErr := saga.timeout(tx, imsm.bus)
if timeoutErr != nil {
imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
return timeoutErr
}
return imsm.completeOrUpdateSaga(tx, saga)
}

func (imsm *Glue) log() logrus.FieldLogger {
return imsm.bus.Log().WithField("_service", imsm.svcName)
//Start starts the glue instance up
func (imsm *Glue) Start() error {
return imsm.timeoutManager.Start()
}

//Stop starts the glue instance up
func (imsm *Glue) Stop() error {
return imsm.timeoutManager.Stop()
}

//NewGlue creates a new Sagamanager
func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) *Glue {
func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, timeoutManager gbus.TimeoutManager) *Glue {
g := &Glue{
svcName: svcName,
bus: bus,
Expand All @@ -259,7 +283,9 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider)
alreadyRegistred: make(map[string]bool),
msgToDefMap: make(map[string][]*Def),
sagaStore: sagaStore,
timeoutManager: timeoutManager,
}
g.timeoutManger = TimeoutManager{bus: bus, txp: txp, glue: g}

timeoutManager.SetTimeoutFunction(g.TimeoutSaga)
return g
}
Loading