From 488c31afd8f82bd59f1da0314c721c2f51b1b76c Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 25 Aug 2019 21:45:09 +0300 Subject: [PATCH] Support handling raw message (#138) --- gbus/abstractions.go | 30 ++++- gbus/bus.go | 22 ++-- gbus/message_handler.go | 9 +- gbus/messages.go | 34 ++++-- gbus/worker.go | 242 ++++++++++++++++++++++++---------------- go.mod | 2 +- go.sum | 3 + tests/bus_test.go | 36 ++++-- 8 files changed, 251 insertions(+), 127 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 3b3aa75..a23244b 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -30,6 +30,7 @@ type BusConfiguration struct { type Bus interface { HandlerRegister Deadlettering + RawMessageHandling BusSwitch Messaging SagaRegister @@ -129,10 +130,37 @@ type Saga interface { //Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue type Deadlettering interface { - HandleDeadletter(handler DeadLetterMessageHandler) + /* + HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead. + This function will be removed in future grabbit releases + */ + HandleDeadletter(handler RawMessageHandler) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error } +/* + RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees + that the bus provides +*/ +type RawMessageHandling interface { + /* + SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered +         to the service queue. +         The handler will get called with a scoped transaction that is a different transaction than the ones that +         regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery +         can not be serialized by the bus to one of the registered schemas + +         In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler +         and afterward will call any registered message handlers. +         if the global raw handler returns an error the message gets rejected and any additional +         handlers will not be called. +         You should not use the global raw message handler to drive business logic as it breaks the local transactivity +         guarantees grabbit provides and should only be used in specialized cases. +         If you do decide to use this feature try not shooting yourself in the foot. + */ + SetGlobalRawMessageHandler(handler RawMessageHandler) +} + //RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess type RequestSagaTimeout interface { TimeoutDuration() time.Duration diff --git a/gbus/bus.go b/gbus/bus.go index 5291722..419e921 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -43,7 +43,8 @@ type DefaultBus struct { amqpOutbox *AMQPOutbox RPCHandlers map[string]MessageHandler - deadletterHandler DeadLetterMessageHandler + deadletterHandler RawMessageHandler + globalRawHandler RawMessageHandler HandlersLock *sync.Mutex RPCLock *sync.Mutex SenderLock *sync.Mutex @@ -73,8 +74,8 @@ var ( //BaseRetryDuration defines the basic milliseconds that the retry algorithm uses //for a random retry time. Default is 10 but it is configurable. BaseRetryDuration = 10 * time.Millisecond - //RpcHeaderName used to define the header in grabbit for RPC - RpcHeaderName = "x-grabbit-msg-rpc-id" + //RPCHeaderName used to define the header in grabbit for RPC + RPCHeaderName = "x-grabbit-msg-rpc-id" ) func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) { @@ -286,6 +287,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { rpcLock: b.RPCLock, rpcHandlers: b.RPCHandlers, deadletterHandler: b.deadletterHandler, + globalRawHandler: b.globalRawHandler, handlersLock: &sync.Mutex{}, registrations: b.Registrations, serializer: b.Serializer, @@ -547,11 +549,17 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler return b.registerHandlerImpl(exchange, topic, event, handler) } -//HandleDeadletter implements GBus.HandleDeadletter -func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) { +//HandleDeadletter implements Deadlettering.HandleDeadletter +func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler) { b.registerDeadLetterHandler(handler) } +//HandleDeadletter implements RawMessageHandling.SetGlobalRawMessageHandler +func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler) { + metrics.AddHandlerMetrics(handler.Name()) + b.globalRawHandler = handler +} + //ReturnDeadToQueue returns a message to its original destination func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error { return b.returnDeadToQueue(ctx, nil, publishing) @@ -692,7 +700,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag return nil } -func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) { +func (b *DefaultBus) registerDeadLetterHandler(handler RawMessageHandler) { metrics.AddHandlerMetrics(handler.Name()) b.deadletterHandler = handler } @@ -706,7 +714,7 @@ type rpcPolicy struct { } func (p rpcPolicy) Apply(publishing *amqp.Publishing) { - publishing.Headers[RpcHeaderName] = p.rpcID + publishing.Headers[RPCHeaderName] = p.rpcID } //Log returns the default logrus.FieldLogger for the bus via the Glogged helper diff --git a/gbus/message_handler.go b/gbus/message_handler.go index 8111a87..bfaf14d 100644 --- a/gbus/message_handler.go +++ b/gbus/message_handler.go @@ -2,17 +2,18 @@ package gbus import ( "database/sql" - "github.com/streadway/amqp" "reflect" "runtime" "strings" + + "github.com/streadway/amqp" ) //MessageHandler signature for all command handlers type MessageHandler func(invocation Invocation, message *BusMessage) error -//DeadLetterMessageHandler signature for dead letter handler -type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error +//RawMessageHandler signature for handlers that handle raw amqp deliveries +type RawMessageHandler func(tx *sql.Tx, delivery *amqp.Delivery) error //Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type func (mg MessageHandler) Name() string { @@ -20,7 +21,7 @@ func (mg MessageHandler) Name() string { } //Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type -func (dlmg DeadLetterMessageHandler) Name() string { +func (dlmg RawMessageHandler) Name() string { return nameFromFunc(dlmg) } diff --git a/gbus/messages.go b/gbus/messages.go index 123256f..6c78c60 100644 --- a/gbus/messages.go +++ b/gbus/messages.go @@ -1,6 +1,9 @@ package gbus import ( + "errors" + "fmt" + "github.com/opentracing/opentracing-go/log" "github.com/rs/xid" "github.com/streadway/amqp" @@ -27,11 +30,28 @@ func NewBusMessage(payload Message) *BusMessage { return bm } -//NewFromAMQPHeaders creates a BusMessage from headers of an amqp message -func NewFromAMQPHeaders(headers amqp.Table) *BusMessage { +//NewFromDelivery creates a BusMessage from an amqp delivery +func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error) { bm := &BusMessage{} - bm.SetFromAMQPHeaders(headers) - return bm + bm.SetFromAMQPHeaders(delivery) + + bm.ID = delivery.MessageId + bm.CorrelationID = delivery.CorrelationId + if delivery.Exchange != "" { + bm.Semantics = EVT + } else { + bm.Semantics = CMD + } + if bm.PayloadFQN == "" || bm.Semantics == "" { + errMsg := fmt.Sprintf("missing critical headers. message_name:%s semantics: %s", bm.PayloadFQN, bm.Semantics) + return nil, errors.New(errMsg) + } + return bm, nil +} + +//GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery +func GetMessageName(delivery amqp.Delivery) string { + return castToString(delivery.Headers["x-msg-name"]) } //GetAMQPHeaders convert to AMQP headers Table everything but a payload @@ -46,12 +66,12 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) { } //SetFromAMQPHeaders convert from AMQP headers Table everything but a payload -func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table) { - +func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) { + headers := delivery.Headers bm.SagaID = castToString(headers["x-msg-saga-id"]) bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"]) bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"]) - bm.PayloadFQN = castToString(headers["x-msg-name"]) + bm.PayloadFQN = GetMessageName(delivery) } diff --git a/gbus/worker.go b/gbus/worker.go index 55cc168..5bb6740 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -2,6 +2,7 @@ package gbus import ( "context" + "database/sql" "errors" "fmt" "math/rand" @@ -35,7 +36,8 @@ type worker struct { handlersLock *sync.Mutex registrations []*Registration rpcHandlers map[string]MessageHandler - deadletterHandler DeadLetterMessageHandler + deadletterHandler RawMessageHandler + globalRawHandler RawMessageHandler b *DefaultBus serializer Serializer txProvider TxProvider @@ -111,19 +113,10 @@ func (worker *worker) consumeRPC() { } func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, error) { - bm := NewFromAMQPHeaders(delivery.Headers) - bm.ID = delivery.MessageId - bm.CorrelationID = delivery.CorrelationId - if delivery.Exchange != "" { - bm.Semantics = EVT - } else { - bm.Semantics = CMD - } - if bm.PayloadFQN == "" || bm.Semantics == "" { - //TODO: Log poison pill message - worker.log().WithFields(logrus.Fields{"message_name": bm.PayloadFQN, "semantics": bm.Semantics}).Warn("message received but no headers found...rejecting message") - - return nil, errors.New("missing critical headers") + bm, err := NewFromDelivery(delivery) + if err != nil { + worker.log().Warn("failed creating BusMessage from AMQP delivery") + return nil, err } var decErr error @@ -135,10 +128,10 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er return bm, nil } -func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery amqp.Delivery) []MessageHandler { +func (worker *worker) resolveHandlers(isRPCreply bool, delivery amqp.Delivery) []MessageHandler { handlers := make([]MessageHandler, 0) if isRPCreply { - rpcID, rpcHeaderFound := delivery.Headers[RpcHeaderName].(string) + rpcID, rpcHeaderFound := delivery.Headers[RPCHeaderName].(string) if !rpcHeaderFound { worker.log().Warn("rpc message received but no rpc header found...rejecting message") return handlers @@ -157,15 +150,17 @@ func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery } else { worker.handlersLock.Lock() defer worker.handlersLock.Unlock() - + msgName := GetMessageName(delivery) for _, registration := range worker.registrations { - if registration.Matches(delivery.Exchange, delivery.RoutingKey, bm.PayloadFQN) { + if registration.Matches(delivery.Exchange, delivery.RoutingKey, msgName) { handlers = append(handlers, registration.Handler) } } } + if len(handlers) > 0 { + worker.log().WithFields(logrus.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers") + } - worker.log().WithFields(logrus.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers") return handlers } @@ -215,8 +210,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { return } err := metrics.RunHandlerWithMetric(func() error { - return worker.deadletterHandler(tx, delivery) + return worker.deadletterHandler(tx, &delivery) }, worker.deadletterHandler.Name(), worker.log()) + var reject bool if err != nil { worker.log().WithError(err).Error("failed handling deadletter") @@ -240,9 +236,8 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { } } -func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { +func (worker *worker) extractOpenTracingSpan(delivery amqp.Delivery, actionName string) (opentracing.Span, context.Context) { - rootCtx := context.Background() var spanOptions []opentracing.StartSpanOption spCtx, err := amqptracer.Extract(delivery.Headers) @@ -252,10 +247,32 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { } else { spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx)) } - span, ctx := opentracing.StartSpanFromContext(rootCtx, "ProcessMessage", spanOptions...) - worker.span = span - //defer worker.span.Finish() + return opentracing.StartSpanFromContext(context.Background(), actionName, spanOptions...) + +} + +func (worker *worker) runGlobalHandler(delivery *amqp.Delivery) error { + if worker.globalRawHandler != nil { + handlerName := worker.globalRawHandler.Name() + retryAction := func() error { + metricsWrapper := func() error { + txWrapper := func(tx *sql.Tx) error { + return worker.globalRawHandler(tx, delivery) + } + //run the global handler inside a transactions + return worker.withTx(txWrapper) + } + //run the global handler with metrics + return metrics.RunHandlerWithMetric(metricsWrapper, handlerName, worker.log()) + } + return worker.SafeWithRetries(retryAction, MaxRetryCount) + } + return nil +} +func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { + span, ctx := worker.extractOpenTracingSpan(delivery, "ProcessMessage") + worker.span = span //catch all error handling so goroutine will not crash defer func() { if r := recover(); r != nil { @@ -274,36 +291,43 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { worker.log().WithFields(logrus.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") //handle a message that originated from a deadletter exchange - if worker.isDead(delivery) { + if worker.isDead(delivery) && worker.deadletterHandler != nil { worker.span.LogFields(slog.Error(errors.New("handling dead-letter delivery"))) worker.log().Info("invoking deadletter handler") worker.invokeDeadletterHandler(delivery) return } - bm, err := worker.extractBusMessage(delivery) - if err != nil { - worker.span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) - //reject poison message - _ = worker.reject(false, delivery) - return + if err := worker.runGlobalHandler(&delivery); err != nil { + //when the global handler fails terminate executation and reject the message + _ = worker.reject(true, delivery) } - worker.span.LogFields(bm.GetTraceLog()...) //TODO:Dedup message - handlers := worker.resolveHandlers(isRPCreply, bm, delivery) + msgName := GetMessageName(delivery) + handlers := worker.resolveHandlers(isRPCreply, delivery) if len(handlers) == 0 { worker.log(). WithFields( - logrus.Fields{"message-name": bm.PayloadFQN, - "message-type": bm.Semantics}). + logrus.Fields{"message-name": msgName}). Warn("Message received but no handlers found") worker.span.LogFields(slog.String("grabbit", "no handlers found")) - // worker.log("Message received but no handlers found\nMessage name:%v\nMessage Type:%v\nRejecting message", bm.PayloadFQN, bm.Semantics) //remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue _ = worker.ack(delivery) return } + /* + extract the bus message only after we are sure there are registered handlers since + it includes deserializing the amqp payload which we want to avoid if no handlers are found + (for instance if a reply message arrives but bo handler is registered for that type of message) + */ + bm, err := worker.extractBusMessage(delivery) + if err != nil { + worker.span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) + //reject poison message + _ = worker.reject(false, delivery) + return + } err = worker.invokeHandlers(ctx, handlers, bm, &delivery) if err == nil { @@ -314,86 +338,108 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { } } +func (worker *worker) withTx(handlerWrapper func(tx *sql.Tx) error) (actionErr error) { + + var tx *sql.Tx + defer func() { + if p := recover(); p != nil { + pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) + worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") + actionErr = errors.New(pncMsg) + if tx != nil { + rbkErr := tx.Rollback() + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler panic") + } + } + worker.span.LogFields(slog.Error(actionErr)) + } + }() + tx, txCreateErr := worker.txProvider.New() + if txCreateErr != nil { + worker.log().WithError(txCreateErr).Error("failed creating new tx") + worker.span.LogFields(slog.Error(txCreateErr)) + return txCreateErr + } + //execute the wrapper that eventually calls the handler + handlerErr := handlerWrapper(tx) + if handlerErr != nil { + rbkErr := tx.Rollback() + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") + return rbkErr + } + return handlerErr + } + cmtErr := tx.Commit() + if cmtErr != nil { + worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") + return cmtErr + } + return nil +} + +func (worker *worker) createInvocation(ctx context.Context, delivery *amqp.Delivery, tx *sql.Tx, attempt uint, message *BusMessage) *defaultInvocationContext { + invocation := &defaultInvocationContext{ + invocingSvc: delivery.ReplyTo, + bus: worker.b, + inboundMsg: message, + tx: tx, + ctx: ctx, + exchange: delivery.Exchange, + routingKey: delivery.RoutingKey, + deliveryInfo: DeliveryInfo{ + Attempt: attempt, + MaxRetryCount: MaxRetryCount, + }, + } + return invocation +} + func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHandler, message *BusMessage, delivery *amqp.Delivery) (err error) { //this is the action that will get retried // each retry should run a new and separate transaction which should end with a commit or rollback - - action := func(attempt uint) (actionErr error) { - + retryAction := func(attempt uint) (actionErr error) { attemptSpan, sctx := opentracing.StartSpanFromContext(sctx, "InvokeHandler") defer attemptSpan.Finish() - tx, txCreateErr := worker.txProvider.New() - if txCreateErr != nil { - worker.log().WithError(txCreateErr).Error("failed creating new tx") - attemptSpan.LogFields(slog.Error(txCreateErr)) - return txCreateErr - } - attemptSpan.LogFields(slog.Uint64("attempt", uint64(attempt+1))) - defer func() { - if p := recover(); p != nil { - pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) - worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") - actionErr = errors.New(pncMsg) - rbkErr := tx.Rollback() - if rbkErr != nil { - worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler panic") - } - worker.span.LogFields(slog.Error(actionErr)) - } - worker.span.Finish() - }() - var handlerErr error for _, handler := range handlers { - hspan, hsctx := opentracing.StartSpanFromContext(sctx, handler.Name()) - - ctx := &defaultInvocationContext{ - invocingSvc: delivery.ReplyTo, - bus: worker.b, - inboundMsg: message, - tx: tx, - ctx: hsctx, - exchange: delivery.Exchange, - routingKey: delivery.RoutingKey, - deliveryInfo: DeliveryInfo{ - Attempt: attempt, - MaxRetryCount: MaxRetryCount, - }, - } - ctx.SetLogger(worker.log().WithField("handler", handler.Name())) - handlerErr = metrics.RunHandlerWithMetric(func() error { - return handler(ctx, message) - }, handler.Name(), worker.log()) - if handlerErr != nil { - break + //this function accepets the scoped transaction and executes the handler with metrics + handlerWrapper := func(tx *sql.Tx) error { + + pinedHandler := handler //https://github.com/kyoh86/scopelint + handlerName := pinedHandler.Name() + hspan, hsctx := opentracing.StartSpanFromContext(sctx, handlerName) + invocation := worker.createInvocation(hsctx, delivery, tx, attempt, message) + invocation.SetLogger(worker.log().WithField("handler", handlerName)) + //execute the handler with metrics + handlerErr := metrics.RunHandlerWithMetric(func() error { + return pinedHandler(invocation, message) + }, handlerName, worker.log()) + + if handlerErr != nil { + hspan.LogFields(slog.Error(handlerErr)) + } + hspan.Finish() + return handlerErr } - hspan.Finish() - } - if handlerErr != nil { - attemptSpan.LogFields(slog.Error(handlerErr)) - rbkErr := tx.Rollback() - if rbkErr != nil { - attemptSpan.LogFields(slog.Error(rbkErr)) - worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") + + err := worker.withTx(handlerWrapper) + if err != nil { + return err } - return handlerErr - } - cmtErr := tx.Commit() - if cmtErr != nil { - attemptSpan.LogFields(slog.Error(cmtErr)) - worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") - return cmtErr } + return nil } //retry for MaxRetryCount, back off by a jittered strategy seed := time.Now().UnixNano() random := rand.New(rand.NewSource(seed)) - return retry.Retry(action, + return retry.Retry(retryAction, strategy.Limit(MaxRetryCount), strategy.BackoffWithJitter( backoff.BinaryExponential(BaseRetryDuration), diff --git a/go.mod b/go.mod index ca69856..b447f2d 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect - golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d // indirect + golang.org/x/tools v0.0.0-20190822191935-b1e2c8edcefd // indirect google.golang.org/appengine v1.6.1 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect diff --git a/go.sum b/go.sum index fd5057a..8b9100e 100644 --- a/go.sum +++ b/go.sum @@ -174,6 +174,9 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= +golang.org/x/tools v0.0.0-20190822191935-b1e2c8edcefd h1:sl3cZ9UhakOcf0k3nWTLpJFHPGbvWf5Cao9HxvzkDos= +golang.org/x/tools v0.0.0-20190822191935-b1e2c8edcefd/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/tests/bus_test.go b/tests/bus_test.go index 265684e..dfd201c 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -5,12 +5,12 @@ import ( "database/sql" "errors" "fmt" - "github.com/wework/grabbit/gbus/metrics" "reflect" - "sync" "testing" "time" + "github.com/wework/grabbit/gbus/metrics" + "github.com/opentracing/opentracing-go" olog "github.com/opentracing/opentracing-go/log" "github.com/opentracing/opentracing-go/mocktracer" @@ -229,14 +229,13 @@ func TestRPC(t *testing.T) { func TestDeadlettering(t *testing.T) { - var waitgroup sync.WaitGroup - waitgroup.Add(2) + proceed := make(chan bool) poison := gbus.NewBusMessage(PoisonMessage{}) service1 := createNamedBusForTest(testSvc1) deadletterSvc := createNamedBusForTest("deadletterSvc") - deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error { - waitgroup.Done() + deadMessageHandler := func(tx *sql.Tx, poison *amqp.Delivery) error { + proceed <- true return nil } @@ -255,7 +254,7 @@ func TestDeadlettering(t *testing.T) { service1.Send(context.Background(), testSvc1, poison) service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) - waitgroup.Wait() + <-proceed count, _ := metrics.GetRejectedMessagesValue() if count != 1 { t.Error("Should have one rejected message") @@ -280,6 +279,25 @@ func TestDeadlettering(t *testing.T) { } } +func TestRawMessageHandling(t *testing.T) { + + proceed := make(chan bool) + handler := func(tx *sql.Tx, delivery *amqp.Delivery) error { + proceed <- true + return nil + } + svc1 := createNamedBusForTest(testSvc1) + svc1.SetGlobalRawMessageHandler(handler) + _ = svc1.Start() + + cmd1 := gbus.NewBusMessage(Command1{}) + _ = svc1.Send(context.Background(), testSvc1, cmd1) + + <-proceed + _ = svc1.Shutdown() + +} + func TestReturnDeadToQueue(t *testing.T) { var visited bool @@ -292,7 +310,7 @@ func TestReturnDeadToQueue(t *testing.T) { deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) - deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error { + deadMessageHandler := func(tx *sql.Tx, poison *amqp.Delivery) error { pub := amqpDeliveryToPublishing(poison) deadletterSvc.ReturnDeadToQueue(context.Background(), &pub) return nil @@ -449,7 +467,7 @@ func noopTraceContext() context.Context { // return ctx } -func amqpDeliveryToPublishing(del amqp.Delivery) (pub amqp.Publishing) { +func amqpDeliveryToPublishing(del *amqp.Delivery) (pub amqp.Publishing) { pub.Headers = del.Headers pub.ContentType = del.ContentType pub.ContentEncoding = del.ContentEncoding