Skip to content

Commit

Permalink
Merge branch 'v1.x' into fix-outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron committed Oct 16, 2019
2 parents 11d7841 + d605440 commit 7d21ad9
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 29 deletions.
11 changes: 8 additions & 3 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
Confirm: builder.confirm,
}

var finalLogger logrus.FieldLogger
if builder.logger != nil {
gb.SetLogger(builder.logger)
finalLogger = builder.logger.WithField("_service", gb.SvcName)
} else {
gb.SetLogger(logrus.New())
finalLogger = logrus.WithField("_service", gb.SvcName)
}
gb.SetLogger(finalLogger)

if builder.workerNum < 1 {
gb.WorkerNum = 1
Expand All @@ -74,6 +76,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
switch builder.txnlProvider {

case "mysql":
providerLogger := gb.Log().WithField("provider", "mysql")
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
if err != nil {
panic(err)
Expand All @@ -84,14 +87,15 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {

//TODO move purge logic into the NewSagaStore factory method
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
sagaStore.SetLogger(providerLogger)
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup, builder.busCfg.OutboxCfg)
gb.Outbox.SetLogger(gb.Log())
gb.Outbox.SetLogger(providerLogger)
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)

default:
Expand All @@ -111,6 +115,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
glue.SetLogger(gb.Log())
sagaStore.SetLogger(glue.Log())
gb.Glue = glue
return gb
}
Expand Down
10 changes: 0 additions & 10 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,13 +717,3 @@ type rpcPolicy struct {
func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
publishing.Headers[RPCHeaderName] = p.rpcID
}

//Log returns the default logrus.FieldLogger for the bus via the Glogged helper
func (b *DefaultBus) Log() logrus.FieldLogger {
if b.Glogged == nil {
b.Glogged = &Glogged{
log: logrus.WithField("_service", b.SvcName),
}
}
return b.Glogged.Log()
}
10 changes: 10 additions & 0 deletions gbus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gbus
import (
"errors"
"fmt"
"strings"

"github.com/opentracing/opentracing-go/log"
"github.com/rs/xid"
Expand All @@ -12,6 +13,7 @@ import (
//BusMessage the structure that gets sent to the underlying transport
type BusMessage struct {
ID string
IdempotencyKey string
CorrelationID string
SagaID string
SagaCorrelationID string
Expand All @@ -26,6 +28,7 @@ func NewBusMessage(payload Message) *BusMessage {
bm := &BusMessage{
ID: xid.New().String(),
}
bm.SetIdempotencyKey(bm.ID)
bm.SetPayload(payload)
return bm
}
Expand Down Expand Up @@ -57,6 +60,7 @@ func GetMessageName(delivery amqp.Delivery) string {
//GetAMQPHeaders convert to AMQP headers Table everything but a payload
func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
headers = amqp.Table{}
headers["x-idempotency-key"] = bm.IdempotencyKey
headers["x-msg-saga-id"] = bm.SagaID
headers["x-msg-saga-correlation-id"] = bm.SagaCorrelationID
headers["x-grabbit-msg-rpc-id"] = bm.RPCID
Expand All @@ -68,6 +72,7 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
//SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) {
headers := delivery.Headers
bm.IdempotencyKey = castToString(headers["x-idempotency-key"])
bm.SagaID = castToString(headers["x-msg-saga-id"])
bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"])
bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"])
Expand All @@ -81,6 +86,10 @@ func (bm *BusMessage) SetPayload(payload Message) {
bm.Payload = payload
}

func (bm *BusMessage) SetIdempotencyKey(idempotencyKey string) {
bm.IdempotencyKey = strings.TrimSpace(idempotencyKey)
}

//TargetSaga allows sending the message to a specific Saga instance
func (bm *BusMessage) TargetSaga(sagaID string) {
bm.SagaCorrelationID = sagaID
Expand All @@ -91,6 +100,7 @@ func (bm *BusMessage) GetTraceLog() (fields []log.Field) {
return []log.Field{
log.String("message", bm.PayloadFQN),
log.String("ID", bm.ID),
log.String("IdempotencyKey", bm.IdempotencyKey),
log.String("SagaID", bm.SagaID),
log.String("CorrelationID", bm.CorrelationID),
log.String("SagaCorrelationID", bm.SagaCorrelationID),
Expand Down
1 change: 1 addition & 0 deletions gbus/saga/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

//Store abtracts the way sagas get persisted
type Store interface {
gbus.Logged
RegisterSagaType(saga gbus.Saga)
GetSagaByID(tx *sql.Tx, sagaID string) (*Instance, error)
GetSagasByType(tx *sql.Tx, sagaType reflect.Type) ([]*Instance, error)
Expand Down
1 change: 1 addition & 0 deletions gbus/tx/mysql/sagastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type SagaStore struct {
func NewSagaStore(svcName string, txProvider gbus.TxProvider) saga.Store {

base := &tx.SagaStore{
Glogged: &gbus.Glogged{},
Tx: txProvider,
SvcName: svcName,
ParamsMarkers: getParamsMarker()}
Expand Down
30 changes: 14 additions & 16 deletions gbus/tx/sagastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

//SagaStore base type for embedding for new transactional saga stores
type SagaStore struct {
*gbus.Glogged
Tx gbus.TxProvider
SvcName string
ParamsMarkers []string
Expand All @@ -41,7 +42,7 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
if error == sql.ErrNoRows {
return nil, error
} else if error != nil {
store.log().WithError(error).Error("failed to scan saga row")
store.Log().WithError(error).Error("failed to scan saga row")
return nil, error
}

Expand All @@ -67,7 +68,7 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
}

if decErr != nil {
store.log().WithError(decErr).Error("failed to decode saga instance")
store.Log().WithError(decErr).Error("failed to decode saga instance")
return nil, decErr
}

Expand All @@ -88,7 +89,7 @@ func (store *SagaStore) GetSagasByType(tx *sql.Tx, sagaType reflect.Type) (insta
defer func() {
err := rows.Close()
if err != nil {
store.log().WithError(err).Error("could not close rows")
store.Log().WithError(err).Error("could not close rows")
}
}()

Expand All @@ -97,7 +98,7 @@ func (store *SagaStore) GetSagasByType(tx *sql.Tx, sagaType reflect.Type) (insta
}

if instances, err = store.scanInstances(rows); err != nil {
store.log().WithError(err).Error("SagaStore failed to scan saga db record")
store.Log().WithError(err).Error("SagaStore failed to scan saga db record")
return nil, err
}
return instances, nil
Expand All @@ -111,7 +112,7 @@ func (store *SagaStore) UpdateSaga(tx *sql.Tx, instance *saga.Instance) (err err
instance.ConcurrencyCtrl = nextVersion
var buf []byte
if buf, err = store.serilizeSaga(instance); err != nil {
store.log().WithError(err).WithField("saga_id", instance.ID).Error("SagaStore failed to encode saga")
store.Log().WithError(err).WithField("saga_id", instance.ID).Error("SagaStore failed to encode saga")
return err
}

Expand Down Expand Up @@ -158,15 +159,15 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance,
defer func() {
err := rows.Close()
if err != nil {
store.log().WithError(err).Error("could not close rows")
store.Log().WithError(err).Error("could not close rows")
}
}()

if err != nil && err == sql.ErrNoRows {
return nil, saga.ErrInstanceNotFound
}
if err != nil {
store.log().WithError(err).
store.Log().WithError(err).
WithFields(log.Fields{"saga_id": sagaID, "table_name": GetSagatableName(store.SvcName)}).
Error("Failed to fetch saga")

Expand All @@ -190,12 +191,12 @@ func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstan

var buf []byte
if buf, err = store.serilizeSaga(newInstance); err != nil {
store.log().WithError(err).WithField("saga_id", newInstance.ID).Error("failed to encode saga with sagaID")
store.Log().WithError(err).WithField("saga_id", newInstance.ID).Error("failed to encode saga with sagaID")
return err
}
_, err = tx.Exec(insertSQL, newInstance.ID, sagaType.String(), buf, newInstance.StartedBy, newInstance.StartedByMessageID, newInstance.StartedByRPCID, newInstance.StartedBySaga, newInstance.ConcurrencyCtrl)
if err != nil {
store.log().WithError(err).Error("failed saving new saga")
store.Log().WithError(err).Error("failed saving new saga")
return err
}
return nil
Expand All @@ -204,21 +205,21 @@ func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstan
//Purge cleans up the saga store, to be used in tests and in extreme situations in production
func (store *SagaStore) Purge() error {
tx := store.NewTx()
store.log().WithField("saga_table", GetSagatableName(store.SvcName)).Info("Purging saga table")
store.Log().WithField("saga_table", GetSagatableName(store.SvcName)).Info("Purging saga table")
deleteSQL := fmt.Sprintf("DELETE FROM %s", GetSagatableName(store.SvcName))
results, err := tx.Exec(deleteSQL)
if err != nil {
store.log().WithError(err).Error("failed to purge saga table")
store.Log().WithError(err).Error("failed to purge saga table")
return err
}
if txErr := tx.Commit(); txErr != nil {
return txErr
}
rowsEffected, resultsErr := results.RowsAffected()
if resultsErr != nil {
store.log().WithError(err).Warn("failed to fetch number of deleted saga records")
store.Log().WithError(err).Warn("failed to fetch number of deleted saga records")
} else {
store.log().WithField("deleted_instances", rowsEffected).Info("purged saga store")
store.Log().WithField("deleted_instances", rowsEffected).Info("purged saga store")
}

return nil
Expand Down Expand Up @@ -251,6 +252,3 @@ func GetSagatableName(svcName string) string {
return strings.ToLower("grabbit_" + sanitized + "_sagas")
}

func (store *SagaStore) log() *log.Entry {
return log.WithField("store", "mysql")
}
35 changes: 35 additions & 0 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -760,6 +761,40 @@ func TestSanitizingSvcName(t *testing.T) {
fmt.Println("succeeded sanitizing service name")
}

func TestIdempotencyKeyHeaders(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
keys := make([]string, 0)
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
keys = append(keys, message.IdempotencyKey)
wg.Done()
return nil
}

bus := createNamedBusForTest(testSvc1)

bus.HandleMessage(Command1{}, handler)
bus.Start()
defer bus.Shutdown()

cmd1 := gbus.NewBusMessage(Command1{})
cmd1.SetIdempotencyKey("some-unique-key")

cmd2 := gbus.NewBusMessage(Command1{})
cmd2.SetIdempotencyKey("some-unique-key")

//send two commands with the same IdempotencyKey to test that the same IdempotencyKey is propogated
bus.Send(context.Background(), testSvc1, cmd1)
bus.Send(context.Background(), testSvc1, cmd2)

wg.Wait()

if keys[0] != keys[1] && keys[0] != "" {
t.Errorf("expected same IdempotencyKey. actual key1:%s, key2%s", keys[0], keys[1])
}

}

func amqpDeliveryToPublishing(del *amqp.Delivery) (pub amqp.Publishing) {
pub.Headers = del.Headers
pub.ContentType = del.ContentType
Expand Down
4 changes: 4 additions & 0 deletions tests/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tests
import (
"time"

"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/builder"
"github.com/wework/grabbit/gbus/policy"
Expand All @@ -25,12 +26,15 @@ func init() {
type configBilder func(builder gbus.Builder)

func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration, cf ...configBilder) gbus.Bus {

log := logrus.WithField("_test", "true")
busBuilder := builder.
New().
Bus(connStr).
WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}).
WorkerNum(3, 1).
WithConfirms().
WithLogger(log).
WithConfiguration(conf)

if txnl {
Expand Down
38 changes: 38 additions & 0 deletions tests/logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package tests

import (
"testing"

"github.com/sirupsen/logrus/hooks/test"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/builder"
)

func TestCustomLogger(t *testing.T) {

svcName := "kong-service"
testLogger, hook := test.NewNullLogger()
buslogger := testLogger.WithField("_custom", "true")
bus := getBaseBusBuilder().
WithLogger(buslogger).
Build(svcName)

bus.Start()
defer bus.Shutdown()
bus.Log().Info("testing custom logger")

for index := 0; index < len(hook.Entries); index++ {
loggedServiceName := hook.Entries[index].Data["_service"].(string)
loggedCustomField := hook.Entries[index].Data["_custom"].(string)
if loggedServiceName != svcName || loggedCustomField != "true" {
t.Errorf("_service value was %s. _custom was %s", loggedServiceName, loggedCustomField)
}
}
}

func getBaseBusBuilder() gbus.Builder {
return builder.
New().
Bus(connStr).
Txnl("mysql", "rhinof:rhinof@/rhinof")
}

0 comments on commit 7d21ad9

Please sign in to comment.