Skip to content

Commit

Permalink
Merge branch 'v1.x' into fix_handle_empty_body
Browse files Browse the repository at this point in the history
  • Loading branch information
adiweiss committed Sep 10, 2019
2 parents e1cf06b + 61c7dc4 commit 27ff0eb
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 15 deletions.
57 changes: 54 additions & 3 deletions docs/SAGA.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type BookVacationSaga struct {
BookingId string
GotCarSvcResponse bool
GotHotelSvcResponse bool
SomeConfigData string
}
```

Expand Down Expand Up @@ -171,7 +172,7 @@ func (s *BookVacationSaga) HandleBookFlightResponse(invocation gbus.Invocation,

}
```
### Step 4 - Handling the timeout requirement
### Step 4 - Handling the timeout requirement

In order to define a timeout for the saga and have grabbit call the saga instance once that timeout is reached (assuming the saga hasn't completed yet) the saga needs to implement the gbus.RequestSagaTimeout interface

Expand All @@ -191,7 +192,7 @@ func (s *BookVacationSaga) TimeoutDuration() time.Duration {
return time.Minute * 15
}

func (s *BookVacationSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error {
func (s *BookVacationSaga) Timeout(tx *sql.Tx, bus Messaging) error {
return bus.Publish(context.Background(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{}))
}

Expand All @@ -202,7 +203,7 @@ func (s *BookVacationSaga) Timeout(invocation gbus.Invocation, message *gbus.Bus
```go

gb := getBus("vacationSvc")
gb.RegisterSaga(BookVacationSaga{})
gb.RegisterSaga(&BookVacationSaga{})

```

Expand All @@ -218,3 +219,53 @@ It is recommended to follow [semantic versioning](https://semver.org/) of the go

grabbit automatically implements an optimistic concurrency model when processing a message and persisting saga instances, detecting when the saga state turns stale due to processing concurrently a different message.
When the above is detected grabbit will rollback the bounded transaction and retry the execution of the saga.

### Configuring a Saga Instance

It is sometimes needed to configure a saga instance with some data before it gets executed.
grrabit allows you to do so by providing a saga configuration function when registering the saga.
Each time a saga instance gets created or inflated from the persistent store the configuration function will be executed.

The saga configuration function accepts a single gbus.Saga parameter and returns a single gbus.Saga return value.
The passed in gbus.Saga is the instance that will be executed and will be the type of the saga being registered meaning it can safely be casted to your specific saga type.
Once you casted to the specific saga type you can configure the instance and access its fields as needed.
After the instance is configured the function returns the configured saga instance so grabbit can proceed and execute it.

The following snippet is an example of how to pass in a saga configuration function
```go
configSaga := func(saga gbus.Saga) gbus.Saga {
s := saga.(*BookVacationSaga)
s.SomeConfigData = "config value"
return s
}
svc1.RegisterSaga(&BookVacationSaga{}, configSaga)

```

### Replying to the saga initiator

It is common that during its life cycle a saga will need to report back and send messages with the service that initiated it (sent the command that started the saga).
In the example above when the booking has completed we would like to send a message to the service initiating the booking saga.
The way we have implemented this in the example above is by publishing an event which the initiating service would need to subscribe to and handle to get notified when the booking is complete.
And although this would work it won't be an elegant solution especially if the initiator of the saga is another saga since it means that the initiating saga will need to filter all events and select the single event that correlates to that particular instance.
To relive client code to do so grabbit provides a way for a saga to directly send a message to its initiator, and if the initiator is another saga grabbit will automatically correlate the message with the correct saga instance and invoke the relevant handler.

To send a message to the saga initiator the message handler attached to the saga instance will need to cast the passed in gbus.Invocation argument to a gbus.SagaInvocation and then invoke the ReplyToInitiator function.
We can replace the following code from the above example

```go
if s.IsComplete(){
event := gbus.NewBusMessage(VacationBookingComplete{})
invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "some.topic", event)
}
```

to this:

```go
sagaInvocation := invocation.(gbus.SagaInvocation)
if s.IsComplete(){
msg := gbus.NewBusMessage(VacationBookingComplete{})
sagaInvocation.ReplyToInitiator(invocation.Ctx(), msg)
}
```
13 changes: 13 additions & 0 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,23 @@ type Invocation interface {
Bus() Messaging
Tx() *sql.Tx
Ctx() context.Context
InvokingSvc() string
Routing() (exchange, routingKey string)
DeliveryInfo() DeliveryInfo
}

/*
SagaInvocation allows saga instances to reply to their creator even when not in the conext of handling
the message that starts the saga.
A message handler that is attached to a saga instance can safly cast the passed in invocation to SagaInvocation
and use the ReplyToInitiator function to send a message to the originating service that sent the message that started the saga
*/
type SagaInvocation interface {
ReplyToInitiator(ctx context.Context, message *BusMessage) error
//HostingSvc returns the svc name that is executing the service
HostingSvc() string
}

//Serializer is the base interface for all message serializers
type Serializer interface {
Name() string
Expand Down
10 changes: 7 additions & 3 deletions gbus/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ Messaging = &defaultInvocationContext{}

type defaultInvocationContext struct {
*Glogged
invocingSvc string
invokingSvc string
bus *DefaultBus
inboundMsg *BusMessage
tx *sql.Tx
Expand All @@ -33,6 +33,10 @@ func (dfi *defaultInvocationContext) Log() logrus.FieldLogger {
return dfi.Glogged.Log().WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID})
}

func (dfi *defaultInvocationContext) InvokingSvc() string {
return dfi.invokingSvc
}

//Reply implements the Invocation.Reply signature
func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error {
if dfi.inboundMsg != nil {
Expand All @@ -43,9 +47,9 @@ func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *Bu
var err error

if dfi.tx != nil {
return dfi.bus.sendWithTx(ctx, dfi.tx, dfi.invocingSvc, replyMessage)
return dfi.bus.sendWithTx(ctx, dfi.tx, dfi.invokingSvc, replyMessage)
}
if err = dfi.bus.Send(ctx, dfi.invocingSvc, replyMessage); err != nil {
if err = dfi.bus.Send(ctx, dfi.invokingSvc, replyMessage); err != nil {
//TODO: add logs?
logrus.WithError(err).Error("could not send reply")

Expand Down
8 changes: 7 additions & 1 deletion gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa
if startNew {

newInstance := def.newInstance()
newInstance.StartedBy = invocation.InvokingSvc()
newInstance.StartedBySaga = message.SagaCorrelationID
// newInstance.StartedBy =
imsm.Log().
WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
Info("created new saga")
Expand Down Expand Up @@ -199,14 +202,17 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa
func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {

span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), def.String())

defer span.Finish()
sginv := &sagaInvocation{
decoratedBus: invocation.Bus(),
decoratedInvocation: invocation,
inboundMsg: message,
sagaID: instance.ID,
ctx: sctx,
invokingService: imsm.svcName,
hostingSvc: imsm.svcName,
startedBy: instance.StartedBy,
startedBySaga: instance.StartedBySaga,
}
sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{
"saga_id": instance.ID,
Expand Down
9 changes: 9 additions & 0 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ type Instance struct {
UnderlyingInstance gbus.Saga
MsgToMethodMap []*MsgToFuncPair
Log logrus.FieldLogger
/*
Will hold the service name that sent the command or event that started the saga
*/
StartedBy string
/*
If this saga has been started by a message originating from another saga instance
this field will hold the saga_id of that instance
*/
StartedBySaga string
}

func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocation, message *gbus.BusMessage) error {
Expand Down
29 changes: 27 additions & 2 deletions gbus/saga/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

var _ gbus.Invocation = &sagaInvocation{}
var _ gbus.SagaInvocation = &sagaInvocation{}

type sagaInvocation struct {
*gbus.Glogged
Expand All @@ -17,7 +18,15 @@ type sagaInvocation struct {
inboundMsg *gbus.BusMessage
sagaID string
ctx context.Context
invokingService string
//the service that is executing the saga instance
hostingSvc string
//the service that sent the command/event that triggered the creation of the saga
startedBy string
/*
in case the command/event that triggered the creation of the saga was sent from a saga
then this field will hold the saga id of that instance
*/
startedBySaga string
}

func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bool) {
Expand All @@ -33,20 +42,36 @@ func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bo
//if the saga is potentially invoking itself then set the SagaCorrelationID to reflect that
//https://github.com/wework/grabbit/issues/64
_, targetService := si.decoratedInvocation.Routing()
if targetService == si.invokingService {
if targetService == si.hostingSvc {
message.SagaCorrelationID = message.SagaID
}

}

}
func (si *sagaInvocation) HostingSvc() string {
return si.hostingSvc
}

func (si *sagaInvocation) InvokingSvc() string {
return si.decoratedInvocation.InvokingSvc()
}

func (si *sagaInvocation) Reply(ctx context.Context, message *gbus.BusMessage) error {

si.setCorrelationIDs(message, false)
return si.decoratedInvocation.Reply(ctx, message)
}

func (si *sagaInvocation) ReplyToInitiator(ctx context.Context, message *gbus.BusMessage) error {

si.setCorrelationIDs(message, false)

//overridethe SagaCorrelationID to the one of the saga id of the creating service
message.SagaCorrelationID = si.startedBySaga
return si.decoratedInvocation.Bus().Send(ctx, si.startedBy, message)
}

func (si *sagaInvocation) Bus() gbus.Messaging {
return si
}
Expand Down
17 changes: 17 additions & 0 deletions gbus/tx/mysql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ func sagaStoreTableMigration(svcName string) *migrator.Migration {
}
}

func sagaStoreAddSagaCreatorDetails(svcName string) *migrator.Migration {
tblName := tx.GrabbitTableNameTemplate(svcName, "sagas")

addCreatorDetailsSQL := `ALTER TABLE ` + tblName + ` ADD COLUMN started_by_request_of_svc VARCHAR(2048) AFTER saga_data, ADD COLUMN started_by_request_of_saga VARCHAR(255) AFTER started_by_request_of_svc`

return &migrator.Migration{
Name: "create saga store table",
Func: func(tx *sql.Tx) error {
if _, err := tx.Exec(addCreatorDetailsSQL); err != nil {
return err
}
return nil
},
}
}

func outboxMigrations(svcName string) *migrator.Migration {

tblName := tx.GrabbitTableNameTemplate(svcName, "outbox")
Expand Down Expand Up @@ -124,6 +140,7 @@ func EnsureSchema(db *sql.DB, svcName string) {
timoutTableMigration(svcName),
legacyMigrationsTable(svcName),
outboxChangeColumnLength(svcName),
sagaStoreAddSagaCreatorDetails(svcName),
))
if err != nil {
panic(err)
Expand Down
21 changes: 16 additions & 5 deletions gbus/tx/sagastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
var sagaID, sagaType string
var version int
var sagaData []byte
var startedBy sql.NullString
var startedBySaga sql.NullString

error := rows.Scan(&sagaID, &sagaType, &sagaData, &version)
error := rows.Scan(&sagaID, &sagaType, &sagaData, &startedBy, &startedBySaga, &version)
if error == sql.ErrNoRows {
return nil, error
} else if error != nil {
Expand All @@ -46,7 +48,16 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
dec := gob.NewDecoder(reader)
var instance saga.Instance
instance.ConcurrencyCtrl = version

decErr := dec.Decode(&instance)

if startedBy.Valid {
instance.StartedBy = startedBy.String
}
if startedBySaga.Valid {
instance.StartedBySaga = startedBySaga.String
}

if decErr != nil {
store.log().WithError(decErr).Error("failed to decode saga instance")
return nil, decErr
Expand All @@ -63,7 +74,7 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
func (store *SagaStore) GetSagasByType(tx *sql.Tx, sagaType reflect.Type) (instances []*saga.Instance, err error) {

tblName := GetSagatableName(store.SvcName)
selectSQL := "SELECT saga_id, saga_type, saga_data, version FROM " + tblName + " WHERE saga_type=" + store.ParamsMarkers[0]
selectSQL := "SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_request_of_saga, version FROM " + tblName + " WHERE saga_type=" + store.ParamsMarkers[0]

rows, err := tx.Query(selectSQL, sagaType.String())
defer func() {
Expand Down Expand Up @@ -133,7 +144,7 @@ func (store *SagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error {
func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, error) {

tblName := GetSagatableName(store.SvcName)
selectSQL := `SELECT saga_id, saga_type, saga_data, version FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + ``
selectSQL := `SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_request_of_saga, version FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + ``

rows, err := tx.Query(selectSQL, sagaID)
defer func() {
Expand Down Expand Up @@ -167,14 +178,14 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance,
func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstance *saga.Instance) (err error) {
store.RegisterSagaType(newInstance.UnderlyingInstance)
tblName := GetSagatableName(store.SvcName)
insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, version) VALUES (?, ?, ?, ?)`
insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_request_of_saga, version) VALUES (?, ?, ?, ?, ?, ?)`

var buf []byte
if buf, err = store.serilizeSaga(newInstance); err != nil {
store.log().WithError(err).WithField("saga_id", newInstance.ID).Error("failed to encode saga with sagaID")
return err
}
_, err = tx.Exec(insertSQL, newInstance.ID, sagaType.String(), buf, newInstance.ConcurrencyCtrl)
_, err = tx.Exec(insertSQL, newInstance.ID, sagaType.String(), buf, newInstance.StartedBy, newInstance.StartedBySaga, newInstance.ConcurrencyCtrl)
if err != nil {
store.log().WithError(err).Error("failed saving new saga")
return err
Expand Down
2 changes: 1 addition & 1 deletion gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (worker *worker) withTx(handlerWrapper func(tx *sql.Tx) error) (actionErr e

func (worker *worker) createInvocation(ctx context.Context, delivery *amqp.Delivery, tx *sql.Tx, attempt uint, message *BusMessage) *defaultInvocationContext {
invocation := &defaultInvocationContext{
invocingSvc: delivery.ReplyTo,
invokingSvc: delivery.ReplyTo,
bus: worker.b,
inboundMsg: message,
tx: tx,
Expand Down

0 comments on commit 27ff0eb

Please sign in to comment.