diff --git a/README.md b/README.md index 665dc4c..bcba0e2 100644 --- a/README.md +++ b/README.md @@ -21,11 +21,8 @@ A lightweight transactional message bus on top of RabbitMQ supporting: 5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern 6) Deadlettering 7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md) -8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus - -Planned: - -1) Deduplication of inbound messages +8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus +9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing ## Stable release the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates. diff --git a/docs/TRACING.md b/docs/TRACING.md new file mode 100644 index 0000000..d2f6a46 --- /dev/null +++ b/docs/TRACING.md @@ -0,0 +1,25 @@ +# Tracing + +grabbit supports reporting standard [OpenTracing](https://opentracing.io/) tracing spans to a compatable OpenTracing backend (such as [Jaeger](https://www.jaegertracing.io/)). + +NOTE: In your hosting process you will need to set up a global tracer to collect and forward the traces reported by grabbit. See Jaeger go client for an [example](https://github.com/jaegertracing/jaeger-client-go) + +Once the global tracer is set up you will need to make sure that in your message handlers you carry over the passed in context to successive messages sent by the handler. + +```go + +func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{ + reply := gbus.NewBusMessage(MyReply{}) + cmd := gbus.NewBusMessage(MyCommand{}) + ctx := invocation.Ctx() + + if err := invocation.Send(ctx, "another-service", cmd); err != nil{ + return err + } + if err := invocation.Reply(ctx, reply); err != nil{ + return err + } + return nil + } + +``` \ No newline at end of file diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 3b3aa75..a23244b 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -30,6 +30,7 @@ type BusConfiguration struct { type Bus interface { HandlerRegister Deadlettering + RawMessageHandling BusSwitch Messaging SagaRegister @@ -129,10 +130,37 @@ type Saga interface { //Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue type Deadlettering interface { - HandleDeadletter(handler DeadLetterMessageHandler) + /* + HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead. + This function will be removed in future grabbit releases + */ + HandleDeadletter(handler RawMessageHandler) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error } +/* + RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees + that the bus provides +*/ +type RawMessageHandling interface { + /* + SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered +         to the service queue. +         The handler will get called with a scoped transaction that is a different transaction than the ones that +         regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery +         can not be serialized by the bus to one of the registered schemas + +         In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler +         and afterward will call any registered message handlers. +         if the global raw handler returns an error the message gets rejected and any additional +         handlers will not be called. +         You should not use the global raw message handler to drive business logic as it breaks the local transactivity +         guarantees grabbit provides and should only be used in specialized cases. +         If you do decide to use this feature try not shooting yourself in the foot. + */ + SetGlobalRawMessageHandler(handler RawMessageHandler) +} + //RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess type RequestSagaTimeout interface { TimeoutDuration() time.Duration diff --git a/gbus/bus.go b/gbus/bus.go index e3e6e8e..419e921 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -43,7 +43,8 @@ type DefaultBus struct { amqpOutbox *AMQPOutbox RPCHandlers map[string]MessageHandler - deadletterHandler DeadLetterMessageHandler + deadletterHandler RawMessageHandler + globalRawHandler RawMessageHandler HandlersLock *sync.Mutex RPCLock *sync.Mutex SenderLock *sync.Mutex @@ -73,8 +74,8 @@ var ( //BaseRetryDuration defines the basic milliseconds that the retry algorithm uses //for a random retry time. Default is 10 but it is configurable. BaseRetryDuration = 10 * time.Millisecond - //RpcHeaderName used to define the header in grabbit for RPC - RpcHeaderName = "x-grabbit-msg-rpc-id" + //RPCHeaderName used to define the header in grabbit for RPC + RPCHeaderName = "x-grabbit-msg-rpc-id" ) func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) { @@ -286,6 +287,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { rpcLock: b.RPCLock, rpcHandlers: b.RPCHandlers, deadletterHandler: b.deadletterHandler, + globalRawHandler: b.globalRawHandler, handlersLock: &sync.Mutex{}, registrations: b.Registrations, serializer: b.Serializer, @@ -547,11 +549,17 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler return b.registerHandlerImpl(exchange, topic, event, handler) } -//HandleDeadletter implements GBus.HandleDeadletter -func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) { +//HandleDeadletter implements Deadlettering.HandleDeadletter +func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler) { b.registerDeadLetterHandler(handler) } +//HandleDeadletter implements RawMessageHandling.SetGlobalRawMessageHandler +func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler) { + metrics.AddHandlerMetrics(handler.Name()) + b.globalRawHandler = handler +} + //ReturnDeadToQueue returns a message to its original destination func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error { return b.returnDeadToQueue(ctx, nil, publishing) @@ -623,7 +631,8 @@ func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp. func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) { b.SenderLock.Lock() defer b.SenderLock.Unlock() - span, _ := opentracing.StartSpanFromContext(sctx, "sendImpl") + span, _ := opentracing.StartSpanFromContext(sctx, "SendMessage") + defer func() { if err := recover(); err != nil { errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack()) @@ -691,7 +700,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag return nil } -func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) { +func (b *DefaultBus) registerDeadLetterHandler(handler RawMessageHandler) { metrics.AddHandlerMetrics(handler.Name()) b.deadletterHandler = handler } @@ -705,7 +714,7 @@ type rpcPolicy struct { } func (p rpcPolicy) Apply(publishing *amqp.Publishing) { - publishing.Headers[RpcHeaderName] = p.rpcID + publishing.Headers[RPCHeaderName] = p.rpcID } //Log returns the default logrus.FieldLogger for the bus via the Glogged helper diff --git a/gbus/message_handler.go b/gbus/message_handler.go index 8111a87..bfaf14d 100644 --- a/gbus/message_handler.go +++ b/gbus/message_handler.go @@ -2,17 +2,18 @@ package gbus import ( "database/sql" - "github.com/streadway/amqp" "reflect" "runtime" "strings" + + "github.com/streadway/amqp" ) //MessageHandler signature for all command handlers type MessageHandler func(invocation Invocation, message *BusMessage) error -//DeadLetterMessageHandler signature for dead letter handler -type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error +//RawMessageHandler signature for handlers that handle raw amqp deliveries +type RawMessageHandler func(tx *sql.Tx, delivery *amqp.Delivery) error //Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type func (mg MessageHandler) Name() string { @@ -20,7 +21,7 @@ func (mg MessageHandler) Name() string { } //Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type -func (dlmg DeadLetterMessageHandler) Name() string { +func (dlmg RawMessageHandler) Name() string { return nameFromFunc(dlmg) } diff --git a/gbus/messages.go b/gbus/messages.go index 123256f..6c78c60 100644 --- a/gbus/messages.go +++ b/gbus/messages.go @@ -1,6 +1,9 @@ package gbus import ( + "errors" + "fmt" + "github.com/opentracing/opentracing-go/log" "github.com/rs/xid" "github.com/streadway/amqp" @@ -27,11 +30,28 @@ func NewBusMessage(payload Message) *BusMessage { return bm } -//NewFromAMQPHeaders creates a BusMessage from headers of an amqp message -func NewFromAMQPHeaders(headers amqp.Table) *BusMessage { +//NewFromDelivery creates a BusMessage from an amqp delivery +func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error) { bm := &BusMessage{} - bm.SetFromAMQPHeaders(headers) - return bm + bm.SetFromAMQPHeaders(delivery) + + bm.ID = delivery.MessageId + bm.CorrelationID = delivery.CorrelationId + if delivery.Exchange != "" { + bm.Semantics = EVT + } else { + bm.Semantics = CMD + } + if bm.PayloadFQN == "" || bm.Semantics == "" { + errMsg := fmt.Sprintf("missing critical headers. message_name:%s semantics: %s", bm.PayloadFQN, bm.Semantics) + return nil, errors.New(errMsg) + } + return bm, nil +} + +//GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery +func GetMessageName(delivery amqp.Delivery) string { + return castToString(delivery.Headers["x-msg-name"]) } //GetAMQPHeaders convert to AMQP headers Table everything but a payload @@ -46,12 +66,12 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) { } //SetFromAMQPHeaders convert from AMQP headers Table everything but a payload -func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table) { - +func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) { + headers := delivery.Headers bm.SagaID = castToString(headers["x-msg-saga-id"]) bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"]) bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"]) - bm.PayloadFQN = castToString(headers["x-msg-name"]) + bm.PayloadFQN = GetMessageName(delivery) } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index d6e71e2..e5d20c8 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -1,6 +1,7 @@ package saga import ( + "context" "database/sql" "errors" "fmt" @@ -8,6 +9,8 @@ import ( "strings" "sync" + "github.com/opentracing/opentracing-go" + slog "github.com/opentracing/opentracing-go/log" "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/metrics" @@ -98,7 +101,8 @@ func (imsm *Glue) getDefsForMsgName(msgName string) []*Def { return defs } -func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) error { +//SagaHandler is the generic handler invoking saga instances +func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessage) error { imsm.lock.Lock() defer imsm.lock.Unlock() @@ -117,11 +121,12 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) */ startNew := def.shouldStartNewSaga(message) if startNew { + newInstance := def.newInstance() imsm.Log(). WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). Info("created new saga") - if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil { + if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil { imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") return invkErr } @@ -154,7 +159,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return e } def.configureSaga(instance) - if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { + if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil { imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } @@ -176,7 +181,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) for _, instance := range instances { def.configureSaga(instance) - if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { + if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil { imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } @@ -191,13 +196,16 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return nil } -func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error { +func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error { + + span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), def.String()) + defer span.Finish() sginv := &sagaInvocation{ decoratedBus: invocation.Bus(), decoratedInvocation: invocation, inboundMsg: message, sagaID: instance.ID, - ctx: invocation.Ctx(), + ctx: sctx, invokingService: imsm.svcName, } sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{ @@ -207,7 +215,11 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat })) exchange, routingKey := invocation.Routing() - return instance.invoke(exchange, routingKey, sginv, message) + err := instance.invoke(exchange, routingKey, sginv, message) + if err != nil { + span.LogFields(slog.Error(err)) + } + return err } func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error { @@ -232,7 +244,7 @@ func (imsm *Glue) registerMessage(message gbus.Message) error { return nil } imsm.alreadyRegistred[message.SchemaName()] = true - return imsm.bus.HandleMessage(message, imsm.handler) + return imsm.bus.HandleMessage(message, imsm.SagaHandler) } func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) error { @@ -241,7 +253,7 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro return nil } imsm.alreadyRegistred[event.SchemaName()] = true - return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler) + return imsm.bus.HandleEvent(exchange, topic, event, imsm.SagaHandler) } //TimeoutSaga fetches a saga instance and calls its timeout interface @@ -257,7 +269,12 @@ func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error { if err != nil { return err } + + span, _ := opentracing.StartSpanFromContext(context.Background(), "SagaTimeout") + span.SetTag("saga_type", saga.String()) + defer span.Finish() timeoutErr := saga.timeout(tx, imsm.bus) + if timeoutErr != nil { imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") return timeoutErr diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index a178384..bbf982c 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -6,11 +6,11 @@ import ( "reflect" "time" - "github.com/sirupsen/logrus" - "github.com/wework/grabbit/gbus/metrics" - + "github.com/opentracing/opentracing-go" "github.com/rs/xid" + "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/metrics" ) //Instance represent a living instance of a saga of a particular definition @@ -22,7 +22,7 @@ type Instance struct { Log logrus.FieldLogger } -func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { +func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocation, message *gbus.BusMessage) error { methodsToInvoke := si.getSagaMethodNameToInvoke(exchange, routingKey, message) @@ -48,6 +48,13 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati "method_name": methodName, "saga_id": si.ID, }).Info("invoking method on saga") + span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), methodName) + // replace the original context with the conext built arround the span so we ca + // trace the saga handler that is invoked + invocation.ctx = sctx + + defer span.Finish() + err := metrics.RunHandlerWithMetric(func() error { returns := method.Call(params) diff --git a/gbus/saga/instance_test.go b/gbus/saga/instance_test.go index f5bc56a..44581b4 100644 --- a/gbus/saga/instance_test.go +++ b/gbus/saga/instance_test.go @@ -1,6 +1,7 @@ package saga import ( + "context" "errors" "reflect" "testing" @@ -19,7 +20,9 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) { m2 := TestMsg2{} exchange, routingKey := "", "kong" - invocationStub := &sagaInvocation{} + invocationStub := &sagaInvocation{ + ctx: context.Background(), + } failName := gbus.MessageHandler(s.Fail).Name() failFilter := gbus.NewMessageFilter(exchange, routingKey, m1) diff --git a/gbus/tx/mysql/migrations.go b/gbus/tx/mysql/migrations.go index 6f78f14..5b92d4e 100644 --- a/gbus/tx/mysql/migrations.go +++ b/gbus/tx/mysql/migrations.go @@ -2,16 +2,14 @@ package mysql import ( "database/sql" - "regexp" "strings" "github.com/lopezator/migrator" "github.com/wework/grabbit/gbus/tx" ) -//SagaStoreTableMigration creates the service saga store table -func SagaStoreTableMigration(svcName string) *migrator.Migration { - tblName := tx.GetSagatableName(svcName) +func sagaStoreTableMigration(svcName string) *migrator.Migration { + tblName := tx.GrabbitTableNameTemplate(svcName, "sagas") createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( rec_id INT PRIMARY KEY AUTO_INCREMENT, @@ -33,10 +31,10 @@ func SagaStoreTableMigration(svcName string) *migrator.Migration { } } -//OutboxMigrations creates service outbox table -func OutboxMigrations(svcName string) *migrator.Migration { +func outboxMigrations(svcName string) *migrator.Migration { - query := `CREATE TABLE IF NOT EXISTS ` + getOutboxName(svcName) + ` ( + tblName := tx.GrabbitTableNameTemplate(svcName, "outbox") + query := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( rec_id int NOT NULL AUTO_INCREMENT, message_id varchar(50) NOT NULL UNIQUE, message_type varchar(50) NOT NULL, @@ -62,8 +60,7 @@ func OutboxMigrations(svcName string) *migrator.Migration { } } -//TimoutTableMigration creates the service timeout table, where timeouts are persisted -func TimoutTableMigration(svcName string) *migrator.Migration { +func timoutTableMigration(svcName string) *migrator.Migration { tblName := GetTimeoutsTableName(svcName) createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( @@ -85,14 +82,31 @@ func TimoutTableMigration(svcName string) *migrator.Migration { } } +func legacyMigrationsTable(svcName string) *migrator.Migration { + + query := `DROP TABLE IF EXISTS grabbitmigrations_` + sanitizeSvcName(svcName) + + return &migrator.Migration{ + Name: "drop legacy migrations table", + Func: func(tx *sql.Tx) error { + if _, err := tx.Exec(query); err != nil { + return err + } + return nil + }, + } +} + //EnsureSchema implements Grabbit's migrations strategy func EnsureSchema(db *sql.DB, svcName string) { - migrationsTable := sanitizedMigrationsTable(svcName) - migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations( - OutboxMigrations(svcName), - SagaStoreTableMigration(svcName), - TimoutTableMigration(svcName), + tblName := tx.GrabbitTableNameTemplate(svcName, "migrations") + + migrate, err := migrator.New(migrator.TableName(tblName), migrator.Migrations( + outboxMigrations(svcName), + sagaStoreTableMigration(svcName), + timoutTableMigration(svcName), + legacyMigrationsTable(svcName), )) if err != nil { panic(err) @@ -103,9 +117,8 @@ func EnsureSchema(db *sql.DB, svcName string) { } } -func sanitizedMigrationsTable(svcName string) string { - var re = regexp.MustCompile(`-|;|\\|`) - sanitized := re.ReplaceAllString(svcName, "") +func sanitizeSvcName(svcName string) string { - return strings.ToLower("grabbitMigrations_" + sanitized) + sanitized := tx.SanitizeTableName(svcName) + return strings.ToLower(sanitized) } diff --git a/gbus/tx/mysql/sanitize.go b/gbus/tx/mysql/sanitize.go deleted file mode 100644 index 224b0d1..0000000 --- a/gbus/tx/mysql/sanitize.go +++ /dev/null @@ -1,10 +0,0 @@ -package mysql - -import "regexp" - -func sanitizeTableName(dirty string) string { - - var re = regexp.MustCompile(`-|;|\\|`) - sanitized := re.ReplaceAllString(dirty, "") - return sanitized -} diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index c72a73b..2a605f6 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -2,13 +2,12 @@ package mysql import ( "database/sql" - "regexp" - "strings" "time" "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/tx" ) var _ gbus.TimeoutManager = &TimeoutManager{} @@ -173,11 +172,7 @@ func (tm *TimeoutManager) SetTimeoutFunction(timeoutFunc func(tx *sql.Tx, sagaID //GetTimeoutsTableName returns the table name in which to store timeouts func GetTimeoutsTableName(svcName string) string { - - var re = regexp.MustCompile(`-|;|\\|`) - sanitized := re.ReplaceAllString(svcName, "") - - return strings.ToLower("grabbit_" + sanitized + "_timeouts") + return tx.GrabbitTableNameTemplate(svcName, "timeouts") } //NewTimeoutManager creates a new instance of a mysql based TimeoutManager diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 21c8d79..5ea3e9b 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -5,14 +5,15 @@ import ( "database/sql" "encoding/gob" "fmt" + "strconv" + "sync" + "time" + "github.com/rs/xid" log "github.com/sirupsen/logrus" "github.com/streadway/amqp" "github.com/wework/grabbit/gbus" - "strconv" - "strings" - "sync" - "time" + "github.com/wework/grabbit/gbus/tx" ) var ( @@ -315,5 +316,5 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, func getOutboxName(svcName string) string { - return strings.ToLower("grabbit_" + sanitizeTableName(svcName) + "_outbox") + return tx.GrabbitTableNameTemplate(svcName, "outbox") } diff --git a/gbus/tx/sanitize.go b/gbus/tx/sanitize.go new file mode 100644 index 0000000..619799a --- /dev/null +++ b/gbus/tx/sanitize.go @@ -0,0 +1,23 @@ +package tx + +import ( + "fmt" + "regexp" + "strings" +) + +//SanitizeTableName returns a sanitizes and lower cased string for creating a table +func SanitizeTableName(dirty string) string { + + var re = regexp.MustCompile(`-|;|\\|`) + sanitized := re.ReplaceAllString(dirty, "") + return strings.ToLower(sanitized) +} + +//GrabbitTableNameTemplate returns the tamplated grabbit table name for the table type and service +func GrabbitTableNameTemplate(svcName, table string) string { + sanitized := SanitizeTableName(svcName) + templated := fmt.Sprintf("grabbit_%s_%s", sanitized, table) + return strings.ToLower(templated) + +} diff --git a/gbus/worker.go b/gbus/worker.go index 216b771..5bb6740 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -2,6 +2,7 @@ package gbus import ( "context" + "database/sql" "errors" "fmt" "math/rand" @@ -35,7 +36,8 @@ type worker struct { handlersLock *sync.Mutex registrations []*Registration rpcHandlers map[string]MessageHandler - deadletterHandler DeadLetterMessageHandler + deadletterHandler RawMessageHandler + globalRawHandler RawMessageHandler b *DefaultBus serializer Serializer txProvider TxProvider @@ -111,19 +113,10 @@ func (worker *worker) consumeRPC() { } func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, error) { - bm := NewFromAMQPHeaders(delivery.Headers) - bm.ID = delivery.MessageId - bm.CorrelationID = delivery.CorrelationId - if delivery.Exchange != "" { - bm.Semantics = EVT - } else { - bm.Semantics = CMD - } - if bm.PayloadFQN == "" || bm.Semantics == "" { - //TODO: Log poison pill message - worker.log().WithFields(logrus.Fields{"message_name": bm.PayloadFQN, "semantics": bm.Semantics}).Warn("message received but no headers found...rejecting message") - - return nil, errors.New("missing critical headers") + bm, err := NewFromDelivery(delivery) + if err != nil { + worker.log().Warn("failed creating BusMessage from AMQP delivery") + return nil, err } var decErr error @@ -135,10 +128,10 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er return bm, nil } -func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery amqp.Delivery) []MessageHandler { +func (worker *worker) resolveHandlers(isRPCreply bool, delivery amqp.Delivery) []MessageHandler { handlers := make([]MessageHandler, 0) if isRPCreply { - rpcID, rpcHeaderFound := delivery.Headers[RpcHeaderName].(string) + rpcID, rpcHeaderFound := delivery.Headers[RPCHeaderName].(string) if !rpcHeaderFound { worker.log().Warn("rpc message received but no rpc header found...rejecting message") return handlers @@ -157,15 +150,17 @@ func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery } else { worker.handlersLock.Lock() defer worker.handlersLock.Unlock() - + msgName := GetMessageName(delivery) for _, registration := range worker.registrations { - if registration.Matches(delivery.Exchange, delivery.RoutingKey, bm.PayloadFQN) { + if registration.Matches(delivery.Exchange, delivery.RoutingKey, msgName) { handlers = append(handlers, registration.Handler) } } } + if len(handlers) > 0 { + worker.log().WithFields(logrus.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers") + } - worker.log().WithFields(logrus.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers") return handlers } @@ -215,8 +210,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { return } err := metrics.RunHandlerWithMetric(func() error { - return worker.deadletterHandler(tx, delivery) + return worker.deadletterHandler(tx, &delivery) }, worker.deadletterHandler.Name(), worker.log()) + var reject bool if err != nil { worker.log().WithError(err).Error("failed handling deadletter") @@ -240,18 +236,43 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { } } -func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { - var ctx context.Context +func (worker *worker) extractOpenTracingSpan(delivery amqp.Delivery, actionName string) (opentracing.Span, context.Context) { + var spanOptions []opentracing.StartSpanOption spCtx, err := amqptracer.Extract(delivery.Headers) + if err != nil { worker.log().WithError(err).Debug("could not extract SpanContext from headers") } else { spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx)) } - worker.span, ctx = opentracing.StartSpanFromContext(context.Background(), "processMessage", spanOptions...) + return opentracing.StartSpanFromContext(context.Background(), actionName, spanOptions...) + +} +func (worker *worker) runGlobalHandler(delivery *amqp.Delivery) error { + if worker.globalRawHandler != nil { + handlerName := worker.globalRawHandler.Name() + retryAction := func() error { + metricsWrapper := func() error { + txWrapper := func(tx *sql.Tx) error { + return worker.globalRawHandler(tx, delivery) + } + //run the global handler inside a transactions + return worker.withTx(txWrapper) + } + //run the global handler with metrics + return metrics.RunHandlerWithMetric(metricsWrapper, handlerName, worker.log()) + } + return worker.SafeWithRetries(retryAction, MaxRetryCount) + } + return nil +} + +func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { + span, ctx := worker.extractOpenTracingSpan(delivery, "ProcessMessage") + worker.span = span //catch all error handling so goroutine will not crash defer func() { if r := recover(); r != nil { @@ -265,42 +286,48 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { worker.span.LogFields(slog.String("panic", "failed to process message")) logEntry.Error("failed to process message") } - worker.span.Finish() }() worker.log().WithFields(logrus.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") //handle a message that originated from a deadletter exchange - if worker.isDead(delivery) { + if worker.isDead(delivery) && worker.deadletterHandler != nil { worker.span.LogFields(slog.Error(errors.New("handling dead-letter delivery"))) worker.log().Info("invoking deadletter handler") worker.invokeDeadletterHandler(delivery) return } - bm, err := worker.extractBusMessage(delivery) - if err != nil { - worker.span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) - //reject poison message - _ = worker.reject(false, delivery) - return + if err := worker.runGlobalHandler(&delivery); err != nil { + //when the global handler fails terminate executation and reject the message + _ = worker.reject(true, delivery) } - worker.span.LogFields(bm.GetTraceLog()...) //TODO:Dedup message - handlers := worker.resolveHandlers(isRPCreply, bm, delivery) + msgName := GetMessageName(delivery) + handlers := worker.resolveHandlers(isRPCreply, delivery) if len(handlers) == 0 { worker.log(). WithFields( - logrus.Fields{"message-name": bm.PayloadFQN, - "message-type": bm.Semantics}). + logrus.Fields{"message-name": msgName}). Warn("Message received but no handlers found") worker.span.LogFields(slog.String("grabbit", "no handlers found")) - // worker.log("Message received but no handlers found\nMessage name:%v\nMessage Type:%v\nRejecting message", bm.PayloadFQN, bm.Semantics) //remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue _ = worker.ack(delivery) return } + /* + extract the bus message only after we are sure there are registered handlers since + it includes deserializing the amqp payload which we want to avoid if no handlers are found + (for instance if a reply message arrives but bo handler is registered for that type of message) + */ + bm, err := worker.extractBusMessage(delivery) + if err != nil { + worker.span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) + //reject poison message + _ = worker.reject(false, delivery) + return + } err = worker.invokeHandlers(ctx, handlers, bm, &delivery) if err == nil { @@ -311,85 +338,108 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { } } -func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHandler, message *BusMessage, delivery *amqp.Delivery) (err error) { - - //this is the action that will get retried - // each retry should run a new and separate transaction which should end with a commit or rollback +func (worker *worker) withTx(handlerWrapper func(tx *sql.Tx) error) (actionErr error) { - action := func(attempt uint) (actionErr error) { - - tx, txCreateErr := worker.txProvider.New() - if txCreateErr != nil { - worker.log().WithError(txCreateErr).Error("failed creating new tx") - worker.span.LogFields(slog.Error(txCreateErr)) - return txCreateErr - } - - worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers") - worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1))) - defer func() { - if p := recover(); p != nil { - pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) - worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") - actionErr = errors.New(pncMsg) + var tx *sql.Tx + defer func() { + if p := recover(); p != nil { + pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) + worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") + actionErr = errors.New(pncMsg) + if tx != nil { rbkErr := tx.Rollback() if rbkErr != nil { worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler panic") } - worker.span.LogFields(slog.Error(actionErr)) } - worker.span.Finish() - }() - var handlerErr error - var hspan opentracing.Span - var hsctx context.Context + worker.span.LogFields(slog.Error(actionErr)) + } + }() + tx, txCreateErr := worker.txProvider.New() + if txCreateErr != nil { + worker.log().WithError(txCreateErr).Error("failed creating new tx") + worker.span.LogFields(slog.Error(txCreateErr)) + return txCreateErr + } + //execute the wrapper that eventually calls the handler + handlerErr := handlerWrapper(tx) + if handlerErr != nil { + rbkErr := tx.Rollback() + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") + return rbkErr + } + return handlerErr + } + cmtErr := tx.Commit() + if cmtErr != nil { + worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") + return cmtErr + } + return nil +} + +func (worker *worker) createInvocation(ctx context.Context, delivery *amqp.Delivery, tx *sql.Tx, attempt uint, message *BusMessage) *defaultInvocationContext { + invocation := &defaultInvocationContext{ + invocingSvc: delivery.ReplyTo, + bus: worker.b, + inboundMsg: message, + tx: tx, + ctx: ctx, + exchange: delivery.Exchange, + routingKey: delivery.RoutingKey, + deliveryInfo: DeliveryInfo{ + Attempt: attempt, + MaxRetryCount: MaxRetryCount, + }, + } + return invocation +} + +func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHandler, message *BusMessage, delivery *amqp.Delivery) (err error) { + + //this is the action that will get retried + // each retry should run a new and separate transaction which should end with a commit or rollback + retryAction := func(attempt uint) (actionErr error) { + attemptSpan, sctx := opentracing.StartSpanFromContext(sctx, "InvokeHandler") + defer attemptSpan.Finish() + + attemptSpan.LogFields(slog.Uint64("attempt", uint64(attempt+1))) + for _, handler := range handlers { - hspan, hsctx = opentracing.StartSpanFromContext(sctx, handler.Name()) - - ctx := &defaultInvocationContext{ - invocingSvc: delivery.ReplyTo, - bus: worker.b, - inboundMsg: message, - tx: tx, - ctx: hsctx, - exchange: delivery.Exchange, - routingKey: delivery.RoutingKey, - deliveryInfo: DeliveryInfo{ - Attempt: attempt, - MaxRetryCount: MaxRetryCount, - }, - } - ctx.SetLogger(worker.log().WithField("handler", handler.Name())) - handlerErr = metrics.RunHandlerWithMetric(func() error { - return handler(ctx, message) - }, handler.Name(), worker.log()) - if handlerErr != nil { - hspan.LogFields(slog.Error(handlerErr)) - break + //this function accepets the scoped transaction and executes the handler with metrics + handlerWrapper := func(tx *sql.Tx) error { + + pinedHandler := handler //https://github.com/kyoh86/scopelint + handlerName := pinedHandler.Name() + hspan, hsctx := opentracing.StartSpanFromContext(sctx, handlerName) + invocation := worker.createInvocation(hsctx, delivery, tx, attempt, message) + invocation.SetLogger(worker.log().WithField("handler", handlerName)) + //execute the handler with metrics + handlerErr := metrics.RunHandlerWithMetric(func() error { + return pinedHandler(invocation, message) + }, handlerName, worker.log()) + + if handlerErr != nil { + hspan.LogFields(slog.Error(handlerErr)) + } + hspan.Finish() + return handlerErr } - hspan.Finish() - } - if handlerErr != nil { - hspan.LogFields(slog.Error(handlerErr)) - rbkErr := tx.Rollback() - if rbkErr != nil { - worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") + + err := worker.withTx(handlerWrapper) + if err != nil { + return err } - hspan.Finish() - return handlerErr - } - cmtErr := tx.Commit() - if cmtErr != nil { - worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") - return cmtErr } + return nil } //retry for MaxRetryCount, back off by a jittered strategy seed := time.Now().UnixNano() random := rand.New(rand.NewSource(seed)) - return retry.Retry(action, + return retry.Retry(retryAction, strategy.Limit(MaxRetryCount), strategy.BackoffWithJitter( backoff.BinaryExponential(BaseRetryDuration), diff --git a/go.mod b/go.mod index ca69856..b447f2d 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect - golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d // indirect + golang.org/x/tools v0.0.0-20190822191935-b1e2c8edcefd // indirect google.golang.org/appengine v1.6.1 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect diff --git a/go.sum b/go.sum index fd5057a..8b9100e 100644 --- a/go.sum +++ b/go.sum @@ -174,6 +174,9 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= +golang.org/x/tools v0.0.0-20190822191935-b1e2c8edcefd h1:sl3cZ9UhakOcf0k3nWTLpJFHPGbvWf5Cao9HxvzkDos= +golang.org/x/tools v0.0.0-20190822191935-b1e2c8edcefd/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/tests/bus_test.go b/tests/bus_test.go index 265684e..dfd201c 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -5,12 +5,12 @@ import ( "database/sql" "errors" "fmt" - "github.com/wework/grabbit/gbus/metrics" "reflect" - "sync" "testing" "time" + "github.com/wework/grabbit/gbus/metrics" + "github.com/opentracing/opentracing-go" olog "github.com/opentracing/opentracing-go/log" "github.com/opentracing/opentracing-go/mocktracer" @@ -229,14 +229,13 @@ func TestRPC(t *testing.T) { func TestDeadlettering(t *testing.T) { - var waitgroup sync.WaitGroup - waitgroup.Add(2) + proceed := make(chan bool) poison := gbus.NewBusMessage(PoisonMessage{}) service1 := createNamedBusForTest(testSvc1) deadletterSvc := createNamedBusForTest("deadletterSvc") - deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error { - waitgroup.Done() + deadMessageHandler := func(tx *sql.Tx, poison *amqp.Delivery) error { + proceed <- true return nil } @@ -255,7 +254,7 @@ func TestDeadlettering(t *testing.T) { service1.Send(context.Background(), testSvc1, poison) service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) - waitgroup.Wait() + <-proceed count, _ := metrics.GetRejectedMessagesValue() if count != 1 { t.Error("Should have one rejected message") @@ -280,6 +279,25 @@ func TestDeadlettering(t *testing.T) { } } +func TestRawMessageHandling(t *testing.T) { + + proceed := make(chan bool) + handler := func(tx *sql.Tx, delivery *amqp.Delivery) error { + proceed <- true + return nil + } + svc1 := createNamedBusForTest(testSvc1) + svc1.SetGlobalRawMessageHandler(handler) + _ = svc1.Start() + + cmd1 := gbus.NewBusMessage(Command1{}) + _ = svc1.Send(context.Background(), testSvc1, cmd1) + + <-proceed + _ = svc1.Shutdown() + +} + func TestReturnDeadToQueue(t *testing.T) { var visited bool @@ -292,7 +310,7 @@ func TestReturnDeadToQueue(t *testing.T) { deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) - deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error { + deadMessageHandler := func(tx *sql.Tx, poison *amqp.Delivery) error { pub := amqpDeliveryToPublishing(poison) deadletterSvc.ReturnDeadToQueue(context.Background(), &pub) return nil @@ -449,7 +467,7 @@ func noopTraceContext() context.Context { // return ctx } -func amqpDeliveryToPublishing(del amqp.Delivery) (pub amqp.Publishing) { +func amqpDeliveryToPublishing(del *amqp.Delivery) (pub amqp.Publishing) { pub.Headers = del.Headers pub.ContentType = del.ContentType pub.ContentEncoding = del.ContentEncoding