From c7ceb8318cd38bfeb7218be270ab83c57030f2ac Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 29 Jun 2019 19:30:58 +0300 Subject: [PATCH 01/15] decouple transaction manager from glue --- gbus/builder/builder.go | 8 ++++++-- gbus/saga/glue.go | 9 ++++++--- gbus/saga/timeout.go | 27 +++++++++++++++------------ 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 016b100..534db91 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -2,10 +2,11 @@ package builder import ( "fmt" - "github.com/sirupsen/logrus" "sync" "time" + "github.com/sirupsen/logrus" + "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/saga" "github.com/wework/grabbit/gbus/saga/stores" @@ -107,7 +108,10 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } } - gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider) + tm := saga.TimeoutManager{ + Bus: gb, Txp: gb.TxProvider, Log: gb.Log, + } + gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, tm) return gb } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 8367b29..d7eddde 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -32,6 +32,7 @@ type Glue struct { msgToDefMap map[string][]*Def sagaStore Store timeoutManger TimeoutManager + getLog func() logrus.FieldLogger } func (imsm *Glue) isSagaAlreadyRegistered(sagaType reflect.Type) bool { @@ -246,11 +247,11 @@ func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { } func (imsm *Glue) log() logrus.FieldLogger { - return imsm.bus.Log().WithField("_service", imsm.svcName) + return imsm.getLog() } //NewGlue creates a new Sagamanager -func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) *Glue { +func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, tm TimeoutManager) *Glue { g := &Glue{ svcName: svcName, bus: bus, @@ -259,7 +260,9 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) alreadyRegistred: make(map[string]bool), msgToDefMap: make(map[string][]*Def), sagaStore: sagaStore, + getLog: getLog, } - g.timeoutManger = TimeoutManager{bus: bus, txp: txp, glue: g} + tm.TimeoutSaga = g.timeoutSaga + g.timeoutManger = tm return g } diff --git a/gbus/saga/timeout.go b/gbus/saga/timeout.go index 4012353..4935d4c 100644 --- a/gbus/saga/timeout.go +++ b/gbus/saga/timeout.go @@ -1,17 +1,20 @@ package saga import ( + "database/sql" "time" + "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" ) //TimeoutManager manages timeouts for sagas //TODO:Make it persistent type TimeoutManager struct { - bus gbus.Bus - glue *Glue - txp gbus.TxProvider + Bus gbus.Bus + Log func() logrus.FieldLogger + TimeoutSaga func(*sql.Tx, string) error + Txp gbus.TxProvider } //RequestTimeout requests a timeout from the timeout manager @@ -21,28 +24,28 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D c := time.After(duration) <-c //TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus - if tm.txp == nil { - tme := tm.glue.timeoutSaga(nil, sagaID) + if tm.Txp == nil { + tme := tm.TimeoutSaga(nil, sagaID) if tme != nil { - tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") + tm.Log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") } return } - tx, txe := tm.txp.New() + tx, txe := tm.Txp.New() if txe != nil { - tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction") + tm.Log().WithError(txe).Warn("timeout manager failed to create a transaction") } else { - callErr := tm.glue.timeoutSaga(tx, sagaID) + callErr := tm.TimeoutSaga(tx, sagaID) if callErr != nil { - tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") + tm.Log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") rlbe := tx.Rollback() if rlbe != nil { - tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") + tm.Log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") } } else { cmte := tx.Commit() if cmte != nil { - tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction") + tm.Log().WithError(cmte).Warn("timeout manager failed to rollback transaction") } } } From 323ddc0aef3b43165aa81be11a60c5695ff8e446 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 29 Jun 2019 20:29:46 +0300 Subject: [PATCH 02/15] moved timeout manager to gbus/tx package --- gbus/builder/builder.go | 7 +++++-- gbus/saga/glue.go | 12 ++++++------ gbus/{saga => tx}/timeout.go | 2 +- 3 files changed, 12 insertions(+), 9 deletions(-) rename gbus/{saga => tx}/timeout.go (99%) diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 534db91..3e16a3b 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -11,6 +11,7 @@ import ( "github.com/wework/grabbit/gbus/saga" "github.com/wework/grabbit/gbus/saga/stores" "github.com/wework/grabbit/gbus/serialization" + "github.com/wework/grabbit/gbus/tx" "github.com/wework/grabbit/gbus/tx/mysql" ) @@ -108,10 +109,12 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } } - tm := saga.TimeoutManager{ + tm := tx.TimeoutManager{ Bus: gb, Txp: gb.TxProvider, Log: gb.Log, } - gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, tm) + glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, tm.RequestTimeout) + gb.Glue = glue + tm.TimeoutSaga = glue.TimeoutSaga return gb } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index d7eddde..77ecfc1 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -6,6 +6,7 @@ import ( "reflect" "strings" "sync" + "time" "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" @@ -31,7 +32,7 @@ type Glue struct { alreadyRegistred map[string]bool msgToDefMap map[string][]*Def sagaStore Store - timeoutManger TimeoutManager + requestTimeout func(string, string, time.Duration) getLog func() logrus.FieldLogger } @@ -131,7 +132,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { imsm.log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") - imsm.timeoutManger.RequestTimeout(imsm.svcName, newInstance.ID, duration) + imsm.requestTimeout(imsm.svcName, newInstance.ID, duration) } } return nil @@ -232,7 +233,7 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler) } -func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { +func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error { saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID) if err != nil { @@ -251,7 +252,7 @@ func (imsm *Glue) log() logrus.FieldLogger { } //NewGlue creates a new Sagamanager -func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, tm TimeoutManager) *Glue { +func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, requestTimeoutFunc func(string, string, time.Duration)) *Glue { g := &Glue{ svcName: svcName, bus: bus, @@ -262,7 +263,6 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, sagaStore: sagaStore, getLog: getLog, } - tm.TimeoutSaga = g.timeoutSaga - g.timeoutManger = tm + g.requestTimeout = requestTimeoutFunc return g } diff --git a/gbus/saga/timeout.go b/gbus/tx/timeout.go similarity index 99% rename from gbus/saga/timeout.go rename to gbus/tx/timeout.go index 4935d4c..2a2c8a6 100644 --- a/gbus/saga/timeout.go +++ b/gbus/tx/timeout.go @@ -1,4 +1,4 @@ -package saga +package tx import ( "database/sql" From 52a569e7156e78b62d76de553fa7c0c0baea0d25 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 4 Jul 2019 19:32:47 +0300 Subject: [PATCH 03/15] initial commit in order to support persisted timeouts --- gbus/builder/builder.go | 21 ++++++++--- gbus/saga/glue.go | 5 +-- gbus/tx/mysql/timeout.go | 80 ++++++++++++++++++++++++++++++++++++++++ gbus/tx/timeout.go | 12 ++++++ 4 files changed, 109 insertions(+), 9 deletions(-) create mode 100644 gbus/tx/mysql/timeout.go diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 3e16a3b..8403813 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -1,6 +1,7 @@ package builder import ( + "database/sql" "fmt" "sync" "time" @@ -70,7 +71,9 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb.WorkerNum = builder.workerNum } var ( - sagaStore saga.Store + sagaStore saga.Store + requestTimeoutFunc func(svcName, sagaID string, duration time.Duration) + timeoutSagaFunc func(tx *sql.Tx, sagaID string) error ) if builder.txnl { gb.IsTxnl = true @@ -91,6 +94,14 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup) + //setup timeout manager + baseTimeoutManager := &tx.TimeoutManager{ + Bus: gb, Txp: gb.TxProvider, Log: gb.Log, SvcName: svcName, + } + tm := mysql.NewTimeoutManager(baseTimeoutManager, builder.purgeOnStartup) + requestTimeoutFunc = tm.RequestTimeout + tm.TimeoutSaga = timeoutSagaFunc + default: err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider) panic(err) @@ -109,12 +120,10 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } } - tm := tx.TimeoutManager{ - Bus: gb, Txp: gb.TxProvider, Log: gb.Log, - } - glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, tm.RequestTimeout) + + glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, requestTimeoutFunc) + timeoutSagaFunc = glue.TimeoutSaga gb.Glue = glue - tm.TimeoutSaga = glue.TimeoutSaga return gb } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 77ecfc1..8b31869 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -253,7 +253,7 @@ func (imsm *Glue) log() logrus.FieldLogger { //NewGlue creates a new Sagamanager func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, requestTimeoutFunc func(string, string, time.Duration)) *Glue { - g := &Glue{ + return &Glue{ svcName: svcName, bus: bus, sagaDefs: make([]*Def, 0), @@ -262,7 +262,6 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, msgToDefMap: make(map[string][]*Def), sagaStore: sagaStore, getLog: getLog, + requestTimeout: requestTimeoutFunc, } - g.requestTimeout = requestTimeoutFunc - return g } diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go new file mode 100644 index 0000000..7e11216 --- /dev/null +++ b/gbus/tx/mysql/timeout.go @@ -0,0 +1,80 @@ +package mysql + +import ( + "database/sql" + + "github.com/wework/grabbit/gbus/tx" +) + +//TimeoutManager is a mysql implementation of a persistent timeoutmanager +type TimeoutManager struct { + *tx.TimeoutManager +} + +func (tm *TimeoutManager) ensureSchema() error { + tblName := tm.GetTimeoutsTableName() + tx, e := tm.Txp.New() + if e != nil { + tm.Log().WithError(e).Error("failed to create schema for mysql timeout manager") + return e + } + + selectSQL := `SELECT 1 FROM ` + tblName + ` LIMIT 1;` + + tm.Log().Info(selectSQL) + + row := tx.QueryRow(selectSQL) + var exists int + err := row.Scan(&exists) + if err != nil && err != sql.ErrNoRows { + + createTableSQL := `CREATE TABLE ` + tblName + ` ( + rec_id INT PRIMARY KEY AUTO_INCREMENT, + saga_id VARCHAR(255) UNIQUE NOT NULL, + timeout DATETIME NOT NULL + )` + + createSagaTypeIndex := `CREATE INDEX ` + tblName + `_timeout_idx ON ` + tblName + ` (timeout)` + + if _, e := tx.Exec(createTableSQL); e != nil { + tx.Rollback() + return e + } + + if _, e := tx.Exec(createSagaTypeIndex); e != nil { + tx.Rollback() + return e + } + return tx.Commit() + } else if err != nil { + return err + } + + return nil +} + +func (tm *TimeoutManager) purge() error { + purgeSQL := `DELETE FROM ` + tm.GetTimeoutsTableName() + + tx, e := tm.Txp.New() + if e != nil { + tm.Log().WithError(e).Error("failed to purge timeout manager") + return e + } + + if _, execErr := tx.Exec(purgeSQL); execErr != nil { + tm.Log().WithError(execErr).Error("failed to purge timeout manager") + return tx.Rollback() + } + return tx.Commit() +} + +//NewTimeoutManager creates a new instance of a mysql based TimeoutManager +func NewTimeoutManager(base *tx.TimeoutManager, purge bool) *TimeoutManager { + tm := &TimeoutManager{TimeoutManager: base} + tm.ensureSchema() + if purge { + tm.purge() + } + return tm +} diff --git a/gbus/tx/timeout.go b/gbus/tx/timeout.go index 2a2c8a6..eed4d9c 100644 --- a/gbus/tx/timeout.go +++ b/gbus/tx/timeout.go @@ -2,6 +2,8 @@ package tx import ( "database/sql" + "regexp" + "strings" "time" "github.com/sirupsen/logrus" @@ -15,6 +17,7 @@ type TimeoutManager struct { Log func() logrus.FieldLogger TimeoutSaga func(*sql.Tx, string) error Txp gbus.TxProvider + SvcName string } //RequestTimeout requests a timeout from the timeout manager @@ -52,3 +55,12 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D }(svcName, sagaID, tm) } + +//GetTimeoutsTableName returns the table name in which to store timeouts +func (tm *TimeoutManager) GetTimeoutsTableName() string { + + var re = regexp.MustCompile(`-|;|\\|`) + sanitized := re.ReplaceAllString(tm.SvcName, "") + + return strings.ToLower("grabbit_" + sanitized + "_timeouts") +} From 8d1c4c3c0f65844cb91b80064b8ab3645f7f3525 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 1 Aug 2019 22:54:14 +0300 Subject: [PATCH 04/15] first working version of a mysql persisted timeout manager --- gbus/abstractions.go | 13 ++- gbus/builder/builder.go | 21 ++--- gbus/saga/glue.go | 33 ++++++-- gbus/saga/inmemory_timeout.go | 63 +++++++++++++++ gbus/tx/mysql/timeout.go | 148 +++++++++++++++++++++++++++++++--- gbus/tx/sagastore.go | 4 + gbus/tx/timeout.go | 66 --------------- 7 files changed, 248 insertions(+), 100 deletions(-) create mode 100644 gbus/saga/inmemory_timeout.go delete mode 100644 gbus/tx/timeout.go diff --git a/gbus/abstractions.go b/gbus/abstractions.go index eb47e30..799a0dc 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -3,9 +3,10 @@ package gbus import ( "context" "database/sql" - "github.com/sirupsen/logrus" "time" + "github.com/sirupsen/logrus" + "github.com/streadway/amqp" ) @@ -217,6 +218,16 @@ type TxOutbox interface { Stop() error } +//TimeoutManager abstracts the implementation of determining when a saga should be timed out +type TimeoutManager interface { + //RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance + RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error + //ClearTimeout clears a timeout for a specific saga + ClearTimeout(tx *sql.Tx, sagaID string) error + //AcceptTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires + AcceptTimeoutFunction(func(tx *sql.Tx, sagaID string) error) +} + type Logged interface { SetLogger(entry logrus.FieldLogger) Log() logrus.FieldLogger diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 8403813..386c5ec 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -1,7 +1,6 @@ package builder import ( - "database/sql" "fmt" "sync" "time" @@ -12,7 +11,6 @@ import ( "github.com/wework/grabbit/gbus/saga" "github.com/wework/grabbit/gbus/saga/stores" "github.com/wework/grabbit/gbus/serialization" - "github.com/wework/grabbit/gbus/tx" "github.com/wework/grabbit/gbus/tx/mysql" ) @@ -72,8 +70,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } var ( sagaStore saga.Store - requestTimeoutFunc func(svcName, sagaID string, duration time.Duration) - timeoutSagaFunc func(tx *sql.Tx, sagaID string) error + timeoutManager gbus.TimeoutManager ) if builder.txnl { gb.IsTxnl = true @@ -85,6 +82,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } gb.TxProvider = mysqltx + //TODO move purge logic into the NewSagaStore factory method sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx) if builder.purgeOnStartup { err := sagaStore.Purge() @@ -93,14 +91,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } } gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup) - - //setup timeout manager - baseTimeoutManager := &tx.TimeoutManager{ - Bus: gb, Txp: gb.TxProvider, Log: gb.Log, SvcName: svcName, - } - tm := mysql.NewTimeoutManager(baseTimeoutManager, builder.purgeOnStartup) - requestTimeoutFunc = tm.RequestTimeout - tm.TimeoutSaga = timeoutSagaFunc + timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup) default: err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider) @@ -108,12 +99,14 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } } else { sagaStore = stores.NewInMemoryStore() + timeoutManager = &saga.InMemoryTimeoutManager{} } if builder.usingPingTimeout { gb.DbPingTimeout = builder.dbPingTimeout } + //TODO move this into the NewSagaStore factory methods if builder.purgeOnStartup { err := sagaStore.Purge() if err != nil { @@ -121,8 +114,8 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } } - glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, requestTimeoutFunc) - timeoutSagaFunc = glue.TimeoutSaga + glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager) + gb.Glue = glue return gb } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 8b31869..a8ed7d0 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -2,11 +2,11 @@ package saga import ( "database/sql" + "errors" "fmt" "reflect" "strings" "sync" - "time" "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" @@ -21,6 +21,9 @@ func fqnsFromMessages(objs []gbus.Message) []string { return fqns } +//ErrInstanceNotFound is returned by the saga store if a saga lookup by saga id returns no valid instances +var ErrInstanceNotFound = errors.New("saga not be found") + var _ gbus.SagaRegister = &Glue{} //Glue ties the incoming messages from the Bus with the needed Saga instances @@ -32,8 +35,8 @@ type Glue struct { alreadyRegistred map[string]bool msgToDefMap map[string][]*Def sagaStore Store - requestTimeout func(string, string, time.Duration) getLog func() logrus.FieldLogger + timeoutManager gbus.TimeoutManager } func (imsm *Glue) isSagaAlreadyRegistered(sagaType reflect.Type) bool { @@ -132,7 +135,9 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { imsm.log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") - imsm.requestTimeout(imsm.svcName, newInstance.ID, duration) + if tme := imsm.timeoutManager.RegisterTimeout(invocation.Tx(), newInstance.ID, duration); tme != nil { + return tme + } } } return nil @@ -209,7 +214,12 @@ func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error { if instance.isComplete() { imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted") - return imsm.sagaStore.DeleteSaga(tx, instance) + deleteErr := imsm.sagaStore.DeleteSaga(tx, instance) + if deleteErr != nil { + return deleteErr + } + + return imsm.timeoutManager.ClearTimeout(tx, instance.ID) } return imsm.sagaStore.UpdateSaga(tx, instance) @@ -233,9 +243,15 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler) } +//TimeoutSaga fetches a saga instance and calls its timeout interface func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error { saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID) + //we are assuming that if the TimeoutSaga has been called but no instance returned from the store the saga + //has been completed already and + if err == ErrInstanceNotFound { + return nil + } if err != nil { return err } @@ -252,8 +268,8 @@ func (imsm *Glue) log() logrus.FieldLogger { } //NewGlue creates a new Sagamanager -func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, requestTimeoutFunc func(string, string, time.Duration)) *Glue { - return &Glue{ +func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, timeoutManager gbus.TimeoutManager) *Glue { + g := &Glue{ svcName: svcName, bus: bus, sagaDefs: make([]*Def, 0), @@ -262,6 +278,9 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, msgToDefMap: make(map[string][]*Def), sagaStore: sagaStore, getLog: getLog, - requestTimeout: requestTimeoutFunc, + timeoutManager: timeoutManager, } + + timeoutManager.AcceptTimeoutFunction(g.TimeoutSaga) + return g } diff --git a/gbus/saga/inmemory_timeout.go b/gbus/saga/inmemory_timeout.go new file mode 100644 index 0000000..a6f3248 --- /dev/null +++ b/gbus/saga/inmemory_timeout.go @@ -0,0 +1,63 @@ +package saga + +import ( + "database/sql" + "time" + + "github.com/wework/grabbit/gbus" +) + +var _ gbus.TimeoutManager = &InMemoryTimeoutManager{} + +//InMemoryTimeoutManager should not be used in production +type InMemoryTimeoutManager struct { + bus gbus.Bus + glue *Glue + txp gbus.TxProvider +} + +//RegisterTimeout requests a timeout from the timeout manager +func (tm *InMemoryTimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error { + + go func(svcName, sagaID string, tm *InMemoryTimeoutManager) { + c := time.After(duration) + <-c + //TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus + if tm.txp == nil { + tme := tm.glue.TimeoutSaga(nil, sagaID) + if tme != nil { + tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") + } + return + } + tx, txe := tm.txp.New() + if txe != nil { + tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction") + } else { + callErr := tm.glue.TimeoutSaga(tx, sagaID) + if callErr != nil { + tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") + rlbe := tx.Rollback() + if rlbe != nil { + tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") + } + } else { + cmte := tx.Commit() + if cmte != nil { + tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction") + } + } + } + + }(tm.glue.svcName, sagaID, tm) + + return nil +} + +//ClearTimeout clears a timeout for a specific saga +func (tm *InMemoryTimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error { + return nil +} + +//AcceptTimeoutFunction accepts the timeouting function +func (tm *InMemoryTimeoutManager) AcceptTimeoutFunction(fun func(tx *sql.Tx, sagaID string) error) {} diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index 7e11216..2afec11 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -2,13 +2,26 @@ package mysql import ( "database/sql" + "regexp" + "strings" + "time" - "github.com/wework/grabbit/gbus/tx" + "github.com/sirupsen/logrus" + + "github.com/wework/grabbit/gbus" ) +var _ gbus.TimeoutManager = &TimeoutManager{} + //TimeoutManager is a mysql implementation of a persistent timeoutmanager type TimeoutManager struct { - *tx.TimeoutManager + Bus gbus.Bus + Log func() logrus.FieldLogger + TimeoutSaga func(*sql.Tx, string) error + Txp gbus.TxProvider + SvcName string + paramMarker func(int) string + exit chan bool } func (tm *TimeoutManager) ensureSchema() error { @@ -28,23 +41,18 @@ func (tm *TimeoutManager) ensureSchema() error { err := row.Scan(&exists) if err != nil && err != sql.ErrNoRows { - createTableSQL := `CREATE TABLE ` + tblName + ` ( + createTableSQL := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( rec_id INT PRIMARY KEY AUTO_INCREMENT, saga_id VARCHAR(255) UNIQUE NOT NULL, - timeout DATETIME NOT NULL + timeout DATETIME NOT NULL, + INDEX ix_` + tm.GetTimeoutsTableName() + `_timeout_date(timeout) )` - createSagaTypeIndex := `CREATE INDEX ` + tblName + `_timeout_idx ON ` + tblName + ` (timeout)` - if _, e := tx.Exec(createTableSQL); e != nil { tx.Rollback() return e } - if _, e := tx.Exec(createSagaTypeIndex); e != nil { - tx.Rollback() - return e - } return tx.Commit() } else if err != nil { return err @@ -69,12 +77,128 @@ func (tm *TimeoutManager) purge() error { return tx.Commit() } +func (tm *TimeoutManager) start() { + go tm.trackTimeouts() +} + +func (tm *TimeoutManager) trackTimeouts() { + tick := time.NewTicker(time.Second * 1).C + for { + select { + case <-tick: + tx, txe := tm.Txp.New() + if txe != nil { + tm.Log().WithError(txe).Warn("timeout manager failed to create a transaction") + continue + } + now := time.Now().UTC() + getTimeoutsSQL := `select saga_id from ` + tm.GetTimeoutsTableName() + ` where timeout < ? LIMIT 100` + rows, selectErr := tx.Query(getTimeoutsSQL, now) + if selectErr != nil { + tm.Log().WithError(selectErr).Error("timeout manager failed to query for pending timeouts") + rows.Close() + continue + } + + sagaIDs := make([]string, 0) + for rows.Next() { + var sagaID string + + if err := rows.Scan(&sagaID); err != nil { + tm.Log().WithError(err).Error("failed to scan timeout record") + } + sagaIDs = append(sagaIDs, sagaID) + } + tm.executeTimeout(sagaIDs) + case <-tm.exit: + return + } + } +} + +func (tm *TimeoutManager) executeTimeout(sagaIDs []string) { + + for _, sagaID := range sagaIDs { + tx, txe := tm.Txp.New() + if txe != nil { + tm.Log().WithError(txe).Warn("timeout manager failed to create a transaction") + return + } + + callErr := tm.TimeoutSaga(tx, sagaID) + clrErr := tm.ClearTimeout(tx, sagaID) + + if callErr != nil || clrErr != nil { + logEntry := tm.Log() + if callErr != nil { + logEntry = logEntry.WithError(callErr) + } else { + logEntry = logEntry.WithError(clrErr) + } + logEntry.WithField("sagaID", sagaID).Error("timing out a saga failed") + rlbe := tx.Rollback() + if rlbe != nil { + tm.Log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") + } + return + } + + cmte := tx.Commit() + if cmte != nil { + tm.Log().WithError(cmte).Warn("timeout manager failed to commit transaction") + } + } + +} + +//RegisterTimeout requests a timeout from the timeout manager +func (tm *TimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error { + + timeoutTime := time.Now().UTC().Add(duration) + + insertSQL := "INSERT INTO " + tm.GetTimeoutsTableName() + " (saga_id, timeout) VALUES(?, ?)" + _, insertErr := tx.Exec(insertSQL, sagaID, timeoutTime) + if insertErr == nil { + tm.Log().Info("timout inserted into timeout manager") + } + + return insertErr +} + +//ClearTimeout clears a timeout for a specific saga +func (tm *TimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error { + + deleteSQL := `delete from ` + tm.GetTimeoutsTableName() + ` where saga_id = ?` + _, err := tx.Exec(deleteSQL, sagaID) + return err +} + +//AcceptTimeoutFunction accepts the timeouting function +func (tm *TimeoutManager) AcceptTimeoutFunction(timeoutFunc func(tx *sql.Tx, sagaID string) error) { + tm.TimeoutSaga = timeoutFunc +} + +//GetTimeoutsTableName returns the table name in which to store timeouts +func (tm *TimeoutManager) GetTimeoutsTableName() string { + + var re = regexp.MustCompile(`-|;|\\|`) + sanitized := re.ReplaceAllString(tm.SvcName, "") + + return strings.ToLower("grabbit_" + sanitized + "_timeouts") +} + //NewTimeoutManager creates a new instance of a mysql based TimeoutManager -func NewTimeoutManager(base *tx.TimeoutManager, purge bool) *TimeoutManager { - tm := &TimeoutManager{TimeoutManager: base} +func NewTimeoutManager(bus gbus.Bus, txp gbus.TxProvider, logger func() logrus.FieldLogger, svcName string, purge bool) *TimeoutManager { + tm := &TimeoutManager{ + Log: logger, + Bus: bus, + Txp: txp, + SvcName: svcName} + tm.ensureSchema() if purge { tm.purge() } + tm.start() return tm } diff --git a/gbus/tx/sagastore.go b/gbus/tx/sagastore.go index c7e1547..c933051 100644 --- a/gbus/tx/sagastore.go +++ b/gbus/tx/sagastore.go @@ -133,6 +133,10 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, store.log().WithError(err).Error("could not close rows") } }() + + if err != nil && err == sql.ErrNoRows { + return nil, saga.ErrInstanceNotFound + } if err != nil { store.log().WithError(err). WithFields(log.Fields{"saga_id": sagaID, "table_name": store.GetSagatableName()}). diff --git a/gbus/tx/timeout.go b/gbus/tx/timeout.go deleted file mode 100644 index eed4d9c..0000000 --- a/gbus/tx/timeout.go +++ /dev/null @@ -1,66 +0,0 @@ -package tx - -import ( - "database/sql" - "regexp" - "strings" - "time" - - "github.com/sirupsen/logrus" - "github.com/wework/grabbit/gbus" -) - -//TimeoutManager manages timeouts for sagas -//TODO:Make it persistent -type TimeoutManager struct { - Bus gbus.Bus - Log func() logrus.FieldLogger - TimeoutSaga func(*sql.Tx, string) error - Txp gbus.TxProvider - SvcName string -} - -//RequestTimeout requests a timeout from the timeout manager -func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.Duration) { - - go func(svcName, sagaID string, tm *TimeoutManager) { - c := time.After(duration) - <-c - //TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus - if tm.Txp == nil { - tme := tm.TimeoutSaga(nil, sagaID) - if tme != nil { - tm.Log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") - } - return - } - tx, txe := tm.Txp.New() - if txe != nil { - tm.Log().WithError(txe).Warn("timeout manager failed to create a transaction") - } else { - callErr := tm.TimeoutSaga(tx, sagaID) - if callErr != nil { - tm.Log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") - rlbe := tx.Rollback() - if rlbe != nil { - tm.Log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") - } - } else { - cmte := tx.Commit() - if cmte != nil { - tm.Log().WithError(cmte).Warn("timeout manager failed to rollback transaction") - } - } - } - - }(svcName, sagaID, tm) -} - -//GetTimeoutsTableName returns the table name in which to store timeouts -func (tm *TimeoutManager) GetTimeoutsTableName() string { - - var re = regexp.MustCompile(`-|;|\\|`) - sanitized := re.ReplaceAllString(tm.SvcName, "") - - return strings.ToLower("grabbit_" + sanitized + "_timeouts") -} From b4f1d259f9caca370ec40a5df09098d0da87d7e9 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 1 Aug 2019 23:38:15 +0300 Subject: [PATCH 05/15] fixing ci lint errors --- gbus/saga/inmemory_timeout.go | 1 - gbus/tx/mysql/timeout.go | 13 +++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/gbus/saga/inmemory_timeout.go b/gbus/saga/inmemory_timeout.go index a6f3248..a600968 100644 --- a/gbus/saga/inmemory_timeout.go +++ b/gbus/saga/inmemory_timeout.go @@ -11,7 +11,6 @@ var _ gbus.TimeoutManager = &InMemoryTimeoutManager{} //InMemoryTimeoutManager should not be used in production type InMemoryTimeoutManager struct { - bus gbus.Bus glue *Glue txp gbus.TxProvider } diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index 2afec11..4e66b25 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -20,7 +20,6 @@ type TimeoutManager struct { TimeoutSaga func(*sql.Tx, string) error Txp gbus.TxProvider SvcName string - paramMarker func(int) string exit chan bool } @@ -49,7 +48,9 @@ func (tm *TimeoutManager) ensureSchema() error { )` if _, e := tx.Exec(createTableSQL); e != nil { - tx.Rollback() + if rbkErr := tx.Rollback(); rbkErr != nil { + tm.Log().Warn("timeout manager failed to rollback transaction") + } return e } @@ -195,9 +196,13 @@ func NewTimeoutManager(bus gbus.Bus, txp gbus.TxProvider, logger func() logrus.F Txp: txp, SvcName: svcName} - tm.ensureSchema() + if err := tm.ensureSchema(); err != nil { + panic(err) + } if purge { - tm.purge() + if err := tm.purge(); err != nil { + panic(err) + } } tm.start() return tm From a589559905cc70697f6bb0720b891a1e9e1d1739 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 1 Aug 2019 23:51:00 +0300 Subject: [PATCH 06/15] refactored ensure schema of timeout manager --- gbus/tx/mysql/timeout.go | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index 4e66b25..23a3d46 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -31,35 +31,20 @@ func (tm *TimeoutManager) ensureSchema() error { return e } - selectSQL := `SELECT 1 FROM ` + tblName + ` LIMIT 1;` - - tm.Log().Info(selectSQL) - - row := tx.QueryRow(selectSQL) - var exists int - err := row.Scan(&exists) - if err != nil && err != sql.ErrNoRows { - - createTableSQL := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( + createTableSQL := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( rec_id INT PRIMARY KEY AUTO_INCREMENT, saga_id VARCHAR(255) UNIQUE NOT NULL, timeout DATETIME NOT NULL, INDEX ix_` + tm.GetTimeoutsTableName() + `_timeout_date(timeout) )` - if _, e := tx.Exec(createTableSQL); e != nil { - if rbkErr := tx.Rollback(); rbkErr != nil { - tm.Log().Warn("timeout manager failed to rollback transaction") - } - return e + if _, e := tx.Exec(createTableSQL); e != nil { + if rbkErr := tx.Rollback(); rbkErr != nil { + tm.Log().Warn("timeout manager failed to rollback transaction") } - - return tx.Commit() - } else if err != nil { - return err + return e } - - return nil + return tx.Commit() } func (tm *TimeoutManager) purge() error { From a618addc4d1417fececf1e669f249c3804084fee Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Fri, 2 Aug 2019 11:38:16 +0300 Subject: [PATCH 07/15] cleanup timeout manager when bs shuts down --- gbus/abstractions.go | 15 +++++++++++++++ gbus/bus.go | 13 +++++++++++-- gbus/saga/glue.go | 14 ++++++++++++-- gbus/saga/inmemory_timeout.go | 10 ++++++++++ gbus/tx/mysql/timeout.go | 15 ++++++++++++--- 5 files changed, 60 insertions(+), 7 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index d9c693e..848f3ee 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -10,10 +10,13 @@ import ( "github.com/streadway/amqp" ) +//Semantics reopresents the semantics of a grabbit message type Semantics string const ( + //CMD represenst a messge with command semantics in grabbit CMD Semantics = "cmd" + //EVT represenst a messge with event semantics in grabbit EVT Semantics = "evt" ) @@ -143,6 +146,13 @@ type SagaRegister interface { RegisterSaga(saga Saga, conf ...SagaConfFn) error } +//SagaGlue glues together all the parts needed in order to orchistrate saga instances +type SagaGlue interface { + SagaRegister + Start() error + Stop() error +} + //Builder is the main interface that should be used to create an instance of a Bus type Builder interface { PurgeOnStartUp() Builder @@ -223,8 +233,13 @@ type TimeoutManager interface { ClearTimeout(tx *sql.Tx, sagaID string) error //AcceptTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires AcceptTimeoutFunction(func(tx *sql.Tx, sagaID string) error) + //Start starts the timeout manager + Start() error + //Stop shuts the timeout manager down + Stop() error } +//Logged represents a grabbit component that can be logged type Logged interface { SetLogger(entry logrus.FieldLogger) Log() logrus.FieldLogger diff --git a/gbus/bus.go b/gbus/bus.go index c4e34cb..79f3ad7 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -5,11 +5,12 @@ import ( "database/sql" "errors" "fmt" - "github.com/wework/grabbit/gbus/metrics" "runtime/debug" "sync" "time" + "github.com/wework/grabbit/gbus/metrics" + "github.com/opentracing-contrib/go-amqp/amqptracer" "github.com/opentracing/opentracing-go" slog "github.com/opentracing/opentracing-go/log" @@ -51,7 +52,7 @@ type DefaultBus struct { DelayedSubscriptions [][]string PurgeOnStartup bool started bool - Glue SagaRegister + Glue SagaGlue TxProvider TxProvider IsTxnl bool WorkerNum uint @@ -263,6 +264,10 @@ func (b *DefaultBus) Start() error { return createWorkersErr } + + if err := b.Glue.Start(); err != nil { + return err + } b.workers = workers b.started = true //start monitoring on amqp related errors @@ -335,6 +340,10 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { } } b.Outgoing.shutdown() + + if err := b.Glue.Stop(); err != nil { + return err + } b.started = false if b.IsTxnl { diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index a8ed7d0..44270f8 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -24,9 +24,9 @@ func fqnsFromMessages(objs []gbus.Message) []string { //ErrInstanceNotFound is returned by the saga store if a saga lookup by saga id returns no valid instances var ErrInstanceNotFound = errors.New("saga not be found") -var _ gbus.SagaRegister = &Glue{} +var _ gbus.SagaGlue = &Glue{} -//Glue ties the incoming messages from the Bus with the needed Saga instances +//Glue t/* */ies the incoming messages from the Bus with the needed Saga instances type Glue struct { svcName string bus gbus.Bus @@ -267,6 +267,16 @@ func (imsm *Glue) log() logrus.FieldLogger { return imsm.getLog() } +//Start starts the glue instance up +func (imsm *Glue) Start() error { + return imsm.timeoutManager.Start() +} + +//Stop starts the glue instance up +func (imsm *Glue) Stop() error { + return imsm.timeoutManager.Stop() +} + //NewGlue creates a new Sagamanager func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, timeoutManager gbus.TimeoutManager) *Glue { g := &Glue{ diff --git a/gbus/saga/inmemory_timeout.go b/gbus/saga/inmemory_timeout.go index a600968..690a77a 100644 --- a/gbus/saga/inmemory_timeout.go +++ b/gbus/saga/inmemory_timeout.go @@ -60,3 +60,13 @@ func (tm *InMemoryTimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error //AcceptTimeoutFunction accepts the timeouting function func (tm *InMemoryTimeoutManager) AcceptTimeoutFunction(fun func(tx *sql.Tx, sagaID string) error) {} + +//Start starts the timeout manager +func (tm *InMemoryTimeoutManager) Start() error { + return nil +} + +//Stop starts the timeout manager +func (tm *InMemoryTimeoutManager) Stop() error { + return nil +} diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index 23a3d46..e1d95e6 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -63,8 +63,16 @@ func (tm *TimeoutManager) purge() error { return tx.Commit() } -func (tm *TimeoutManager) start() { +//Start starts the timeout manager +func (tm *TimeoutManager) Start() error { go tm.trackTimeouts() + return nil +} + +//Stop shuts down the timeout manager +func (tm *TimeoutManager) Stop() error { + tm.exit <- true + return nil } func (tm *TimeoutManager) trackTimeouts() { @@ -179,7 +187,8 @@ func NewTimeoutManager(bus gbus.Bus, txp gbus.TxProvider, logger func() logrus.F Log: logger, Bus: bus, Txp: txp, - SvcName: svcName} + SvcName: svcName, + exit: make(chan bool)} if err := tm.ensureSchema(); err != nil { panic(err) @@ -189,6 +198,6 @@ func NewTimeoutManager(bus gbus.Bus, txp gbus.TxProvider, logger func() logrus.F panic(err) } } - tm.start() return tm + } From 3d857024e20d561282eac968d774a7a7d2633e23 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 3 Aug 2019 10:34:19 +0300 Subject: [PATCH 08/15] fixing formatting issues --- gbus/builder/builder.go | 4 +--- gbus/saga/glue.go | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 386c5ec..80d2439 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -69,7 +69,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb.WorkerNum = builder.workerNum } var ( - sagaStore saga.Store + sagaStore saga.Store timeoutManager gbus.TimeoutManager ) if builder.txnl { @@ -113,9 +113,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } } - glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager) - gb.Glue = glue return gb } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 44270f8..dda7f85 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -22,7 +22,7 @@ func fqnsFromMessages(objs []gbus.Message) []string { } //ErrInstanceNotFound is returned by the saga store if a saga lookup by saga id returns no valid instances -var ErrInstanceNotFound = errors.New("saga not be found") +var ErrInstanceNotFound = errors.New("saga not be found") var _ gbus.SagaGlue = &Glue{} From 3b3239d2060118845b8c088682a68147fcfcd731 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 3 Aug 2019 10:37:02 +0300 Subject: [PATCH 09/15] changed logging level from Info to Debug when inserting a new timeout --- gbus/tx/mysql/timeout.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index e1d95e6..170d4e5 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -153,7 +153,7 @@ func (tm *TimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration ti insertSQL := "INSERT INTO " + tm.GetTimeoutsTableName() + " (saga_id, timeout) VALUES(?, ?)" _, insertErr := tx.Exec(insertSQL, sagaID, timeoutTime) if insertErr == nil { - tm.Log().Info("timout inserted into timeout manager") + tm.Log().WithField("timeout_duration", duration).Debug("timout inserted into timeout manager") } return insertErr From 870b3335df04f68e85df70360c27de466c4a623b Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 3 Aug 2019 10:45:31 +0300 Subject: [PATCH 10/15] resusing timeouts tablename (PR review) --- gbus/tx/mysql/timeout.go | 42 ++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index 170d4e5..cd50e02 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -15,16 +15,17 @@ var _ gbus.TimeoutManager = &TimeoutManager{} //TimeoutManager is a mysql implementation of a persistent timeoutmanager type TimeoutManager struct { - Bus gbus.Bus - Log func() logrus.FieldLogger - TimeoutSaga func(*sql.Tx, string) error - Txp gbus.TxProvider - SvcName string - exit chan bool + Bus gbus.Bus + Log func() logrus.FieldLogger + TimeoutSaga func(*sql.Tx, string) error + Txp gbus.TxProvider + SvcName string + timeoutsTableName string + exit chan bool } func (tm *TimeoutManager) ensureSchema() error { - tblName := tm.GetTimeoutsTableName() + tblName := tm.timeoutsTableName tx, e := tm.Txp.New() if e != nil { tm.Log().WithError(e).Error("failed to create schema for mysql timeout manager") @@ -35,7 +36,7 @@ func (tm *TimeoutManager) ensureSchema() error { rec_id INT PRIMARY KEY AUTO_INCREMENT, saga_id VARCHAR(255) UNIQUE NOT NULL, timeout DATETIME NOT NULL, - INDEX ix_` + tm.GetTimeoutsTableName() + `_timeout_date(timeout) + INDEX ix_` + tm.timeoutsTableName + `_timeout_date(timeout) )` if _, e := tx.Exec(createTableSQL); e != nil { @@ -48,7 +49,7 @@ func (tm *TimeoutManager) ensureSchema() error { } func (tm *TimeoutManager) purge() error { - purgeSQL := `DELETE FROM ` + tm.GetTimeoutsTableName() + purgeSQL := `DELETE FROM ` + tm.timeoutsTableName tx, e := tm.Txp.New() if e != nil { @@ -86,7 +87,7 @@ func (tm *TimeoutManager) trackTimeouts() { continue } now := time.Now().UTC() - getTimeoutsSQL := `select saga_id from ` + tm.GetTimeoutsTableName() + ` where timeout < ? LIMIT 100` + getTimeoutsSQL := `select saga_id from ` + tm.timeoutsTableName + ` where timeout < ? LIMIT 100` rows, selectErr := tx.Query(getTimeoutsSQL, now) if selectErr != nil { tm.Log().WithError(selectErr).Error("timeout manager failed to query for pending timeouts") @@ -150,7 +151,7 @@ func (tm *TimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration ti timeoutTime := time.Now().UTC().Add(duration) - insertSQL := "INSERT INTO " + tm.GetTimeoutsTableName() + " (saga_id, timeout) VALUES(?, ?)" + insertSQL := "INSERT INTO " + tm.timeoutsTableName + " (saga_id, timeout) VALUES(?, ?)" _, insertErr := tx.Exec(insertSQL, sagaID, timeoutTime) if insertErr == nil { tm.Log().WithField("timeout_duration", duration).Debug("timout inserted into timeout manager") @@ -162,7 +163,7 @@ func (tm *TimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration ti //ClearTimeout clears a timeout for a specific saga func (tm *TimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error { - deleteSQL := `delete from ` + tm.GetTimeoutsTableName() + ` where saga_id = ?` + deleteSQL := `delete from ` + tm.timeoutsTableName + ` where saga_id = ?` _, err := tx.Exec(deleteSQL, sagaID) return err } @@ -173,22 +174,25 @@ func (tm *TimeoutManager) AcceptTimeoutFunction(timeoutFunc func(tx *sql.Tx, sag } //GetTimeoutsTableName returns the table name in which to store timeouts -func (tm *TimeoutManager) GetTimeoutsTableName() string { +func getTimeoutsTableName(svcName string) string { var re = regexp.MustCompile(`-|;|\\|`) - sanitized := re.ReplaceAllString(tm.SvcName, "") + sanitized := re.ReplaceAllString(svcName, "") return strings.ToLower("grabbit_" + sanitized + "_timeouts") } //NewTimeoutManager creates a new instance of a mysql based TimeoutManager func NewTimeoutManager(bus gbus.Bus, txp gbus.TxProvider, logger func() logrus.FieldLogger, svcName string, purge bool) *TimeoutManager { + + timeoutsTableName := getTimeoutsTableName(svcName) tm := &TimeoutManager{ - Log: logger, - Bus: bus, - Txp: txp, - SvcName: svcName, - exit: make(chan bool)} + Log: logger, + Bus: bus, + Txp: txp, + SvcName: svcName, + timeoutsTableName: timeoutsTableName, + exit: make(chan bool)} if err := tm.ensureSchema(); err != nil { panic(err) From baf7c88d3013268cc11770e323da5cb50db42c13 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 3 Aug 2019 10:48:31 +0300 Subject: [PATCH 11/15] renamed AcceptTimeoutFunction to SetTimeoutFunction on the TimeoutManager interface (PR review) --- gbus/abstractions.go | 4 ++-- gbus/saga/glue.go | 2 +- gbus/saga/inmemory_timeout.go | 4 ++-- gbus/tx/mysql/timeout.go | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 848f3ee..8ee7f65 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -231,8 +231,8 @@ type TimeoutManager interface { RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error //ClearTimeout clears a timeout for a specific saga ClearTimeout(tx *sql.Tx, sagaID string) error - //AcceptTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires - AcceptTimeoutFunction(func(tx *sql.Tx, sagaID string) error) + //SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires + SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error) //Start starts the timeout manager Start() error //Stop shuts the timeout manager down diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index dda7f85..06d9a7c 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -291,6 +291,6 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, timeoutManager: timeoutManager, } - timeoutManager.AcceptTimeoutFunction(g.TimeoutSaga) + timeoutManager.SetTimeoutFunction(g.TimeoutSaga) return g } diff --git a/gbus/saga/inmemory_timeout.go b/gbus/saga/inmemory_timeout.go index 690a77a..750791e 100644 --- a/gbus/saga/inmemory_timeout.go +++ b/gbus/saga/inmemory_timeout.go @@ -58,8 +58,8 @@ func (tm *InMemoryTimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error return nil } -//AcceptTimeoutFunction accepts the timeouting function -func (tm *InMemoryTimeoutManager) AcceptTimeoutFunction(fun func(tx *sql.Tx, sagaID string) error) {} +//SetTimeoutFunction accepts the timeouting function +func (tm *InMemoryTimeoutManager) SetTimeoutFunction(fun func(tx *sql.Tx, sagaID string) error) {} //Start starts the timeout manager func (tm *InMemoryTimeoutManager) Start() error { diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index cd50e02..d3112cc 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -168,8 +168,8 @@ func (tm *TimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error { return err } -//AcceptTimeoutFunction accepts the timeouting function -func (tm *TimeoutManager) AcceptTimeoutFunction(timeoutFunc func(tx *sql.Tx, sagaID string) error) { +//SetTimeoutFunction accepts the timeouting function +func (tm *TimeoutManager) SetTimeoutFunction(timeoutFunc func(tx *sql.Tx, sagaID string) error) { tm.TimeoutSaga = timeoutFunc } From 76df53b0361a66c49cf8062a9964424b01949f40 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 3 Aug 2019 11:06:12 +0300 Subject: [PATCH 12/15] refactored glue to implement the Logged inetrface and use the GLogged helper struct --- gbus/abstractions.go | 1 + gbus/saga/glue.go | 35 +++++++++++++++-------------------- gbus/saga/inmemory_timeout.go | 10 +++++----- 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 8ee7f65..0e459b5 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -149,6 +149,7 @@ type SagaRegister interface { //SagaGlue glues together all the parts needed in order to orchistrate saga instances type SagaGlue interface { SagaRegister + Logged Start() error Stop() error } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 06d9a7c..9e6cc48 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -28,6 +28,7 @@ var _ gbus.SagaGlue = &Glue{} //Glue t/* */ies the incoming messages from the Bus with the needed Saga instances type Glue struct { + *gbus.Glogged svcName string bus gbus.Bus sagaDefs []*Def @@ -35,7 +36,6 @@ type Glue struct { alreadyRegistred map[string]bool msgToDefMap map[string][]*Def sagaStore Store - getLog func() logrus.FieldLogger timeoutManager gbus.TimeoutManager } @@ -76,7 +76,7 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error { imsm.addMsgNameToDef(msgName, def) } - imsm.log(). + imsm.Log(). WithFields(logrus.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}). Info("registered saga with messages") @@ -117,24 +117,24 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) startNew := def.shouldStartNewSaga(message) if startNew { newInstance := def.newInstance() - imsm.log(). + imsm.Log(). WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). Info("created new saga") if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil { - imsm.log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") + imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") return invkErr } if !newInstance.isComplete() { - imsm.log().WithField("saga_id", newInstance.ID).Info("saving new saga") + imsm.Log().WithField("saga_id", newInstance.ID).Info("saving new saga") if e := imsm.sagaStore.SaveNewSaga(invocation.Tx(), def.sagaType, newInstance); e != nil { - imsm.log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed") + imsm.Log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed") return e } if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { - imsm.log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") + imsm.Log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") if tme := imsm.timeoutManager.RegisterTimeout(invocation.Tx(), newInstance.ID, duration); tme != nil { return tme } @@ -145,7 +145,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) instance, getErr := imsm.sagaStore.GetSagaByID(invocation.Tx(), message.SagaCorrelationID) if getErr != nil { - imsm.log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id") + imsm.Log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id") return getErr } if instance == nil { @@ -154,7 +154,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) } def.configureSaga(instance) if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { - imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") + imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } @@ -165,18 +165,18 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return e } else { - imsm.log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") + imsm.Log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") instances, e := imsm.sagaStore.GetSagasByType(invocation.Tx(), def.sagaType) if e != nil { return e } - imsm.log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") + imsm.Log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") for _, instance := range instances { def.configureSaga(instance) if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { - imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") + imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } e = imsm.completeOrUpdateSaga(invocation.Tx(), instance) @@ -199,7 +199,7 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat ctx: invocation.Ctx(), invokingService: imsm.svcName, } - sginv.SetLogger(imsm.log().WithFields(logrus.Fields{ + sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{ "saga_id": instance.ID, "saga_type": instance.String(), "message_name": message.PayloadFQN, @@ -212,7 +212,7 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error { if instance.isComplete() { - imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted") + imsm.Log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted") deleteErr := imsm.sagaStore.DeleteSaga(tx, instance) if deleteErr != nil { @@ -257,16 +257,12 @@ func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error { } timeoutErr := saga.timeout(tx, imsm.bus) if timeoutErr != nil { - imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") + imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") return timeoutErr } return imsm.completeOrUpdateSaga(tx, saga) } -func (imsm *Glue) log() logrus.FieldLogger { - return imsm.getLog() -} - //Start starts the glue instance up func (imsm *Glue) Start() error { return imsm.timeoutManager.Start() @@ -287,7 +283,6 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, alreadyRegistred: make(map[string]bool), msgToDefMap: make(map[string][]*Def), sagaStore: sagaStore, - getLog: getLog, timeoutManager: timeoutManager, } diff --git a/gbus/saga/inmemory_timeout.go b/gbus/saga/inmemory_timeout.go index 750791e..f9fd499 100644 --- a/gbus/saga/inmemory_timeout.go +++ b/gbus/saga/inmemory_timeout.go @@ -25,25 +25,25 @@ func (tm *InMemoryTimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, dur if tm.txp == nil { tme := tm.glue.TimeoutSaga(nil, sagaID) if tme != nil { - tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") + tm.glue.Log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") } return } tx, txe := tm.txp.New() if txe != nil { - tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction") + tm.glue.Log().WithError(txe).Warn("timeout manager failed to create a transaction") } else { callErr := tm.glue.TimeoutSaga(tx, sagaID) if callErr != nil { - tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") + tm.glue.Log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") rlbe := tx.Rollback() if rlbe != nil { - tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") + tm.glue.Log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") } } else { cmte := tx.Commit() if cmte != nil { - tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction") + tm.glue.Log().WithError(cmte).Warn("timeout manager failed to rollback transaction") } } } From 4daca9066e9d5ab6a6b894e1b5edb31466ca07c4 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 3 Aug 2019 11:49:39 +0300 Subject: [PATCH 13/15] locking timeout record before executing timeout In order to prevent having a timeout beeing executed twice due to two concurrent grabbit instances running the same service a lock (FOR UPDATE) has been placed on the timeout record in the scope of the executing transaction --- gbus/tx/mysql/timeout.go | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index d3112cc..f2d2869 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -36,8 +36,9 @@ func (tm *TimeoutManager) ensureSchema() error { rec_id INT PRIMARY KEY AUTO_INCREMENT, saga_id VARCHAR(255) UNIQUE NOT NULL, timeout DATETIME NOT NULL, - INDEX ix_` + tm.timeoutsTableName + `_timeout_date(timeout) - )` + INDEX (timeout), + INDEX (saga_id) + )` if _, e := tx.Exec(createTableSQL); e != nil { if rbkErr := tx.Rollback(); rbkErr != nil { @@ -97,6 +98,7 @@ func (tm *TimeoutManager) trackTimeouts() { sagaIDs := make([]string, 0) for rows.Next() { + var sagaID string if err := rows.Scan(&sagaID); err != nil { @@ -111,6 +113,15 @@ func (tm *TimeoutManager) trackTimeouts() { } } +func (tm *TimeoutManager) lockTimeoutRecord(tx *sql.Tx, sagaID string) error { + + selectTimeout := `SELECT saga_id FROM ` + tm.timeoutsTableName + ` WHERE saga_id = ? FOR UPDATE` + row := tx.QueryRow(selectTimeout, sagaID) + //scan the row so we can determine if the lock has been successfully acquired + var x string + return row.Scan(&x) +} + func (tm *TimeoutManager) executeTimeout(sagaIDs []string) { for _, sagaID := range sagaIDs { @@ -119,7 +130,12 @@ func (tm *TimeoutManager) executeTimeout(sagaIDs []string) { tm.Log().WithError(txe).Warn("timeout manager failed to create a transaction") return } - + lckErr := tm.lockTimeoutRecord(tx, sagaID) + if lckErr != nil { + tm.Log().WithField("saga_id", sagaID).Info("failed to obtain lock for saga timeout") + _ = tx.Rollback() + continue + } callErr := tm.TimeoutSaga(tx, sagaID) clrErr := tm.ClearTimeout(tx, sagaID) @@ -163,7 +179,7 @@ func (tm *TimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration ti //ClearTimeout clears a timeout for a specific saga func (tm *TimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error { - deleteSQL := `delete from ` + tm.timeoutsTableName + ` where saga_id = ?` + deleteSQL := `delete from ` + tm.timeoutsTableName + ` where saga_id_id = ?` _, err := tx.Exec(deleteSQL, sagaID) return err } From cc697ecdec5b3952619700aff34ef89ef364b924 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 4 Aug 2019 11:49:01 +0300 Subject: [PATCH 14/15] Commiting the select transaction when querying for pending timeouts --- gbus/tx/mysql/timeout.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index f2d2869..d0f405b 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -106,6 +106,9 @@ func (tm *TimeoutManager) trackTimeouts() { } sagaIDs = append(sagaIDs, sagaID) } + if cmtErr := tx.Commit(); cmtErr != nil { + continue + } tm.executeTimeout(sagaIDs) case <-tm.exit: return @@ -118,6 +121,8 @@ func (tm *TimeoutManager) lockTimeoutRecord(tx *sql.Tx, sagaID string) error { selectTimeout := `SELECT saga_id FROM ` + tm.timeoutsTableName + ` WHERE saga_id = ? FOR UPDATE` row := tx.QueryRow(selectTimeout, sagaID) //scan the row so we can determine if the lock has been successfully acquired + //in case the timeout has been already executed by a different instance of grabbit the scan will + //return an error and the timeout will not get processed var x string return row.Scan(&x) } From 9207f65436a8a9352812e5866321e2439f4eea85 Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Sun, 4 Aug 2019 17:17:37 +0300 Subject: [PATCH 15/15] feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances --- gbus/tx/mysql/timeout.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index d0f405b..56ab6da 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -118,7 +118,9 @@ func (tm *TimeoutManager) trackTimeouts() { func (tm *TimeoutManager) lockTimeoutRecord(tx *sql.Tx, sagaID string) error { - selectTimeout := `SELECT saga_id FROM ` + tm.timeoutsTableName + ` WHERE saga_id = ? FOR UPDATE` + // FOR UPDATE should create a lock on the row that we are processing and skip lock should ensure that we don't wait for a locked row but skip it. + // This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances + selectTimeout := `SELECT saga_id FROM ` + tm.timeoutsTableName + ` WHERE saga_id = ? FOR UPDATE SKIP LOCKED` row := tx.QueryRow(selectTimeout, sagaID) //scan the row so we can determine if the lock has been successfully acquired //in case the timeout has been already executed by a different instance of grabbit the scan will