Skip to content

Commit

Permalink
Support handling raw message (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron committed Aug 25, 2019
1 parent b4d07df commit 488c31a
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 127 deletions.
30 changes: 29 additions & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type BusConfiguration struct {
type Bus interface {
HandlerRegister
Deadlettering
RawMessageHandling
BusSwitch
Messaging
SagaRegister
Expand Down Expand Up @@ -129,10 +130,37 @@ type Saga interface {

//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Deadlettering interface {
HandleDeadletter(handler DeadLetterMessageHandler)
/*
HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead.
This function will be removed in future grabbit releases
*/
HandleDeadletter(handler RawMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

/*
RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees
that the bus provides
*/
type RawMessageHandling interface {
/*
SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered
        to the service queue.
        The handler will get called with a scoped transaction that is a different transaction than the ones that
        regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery
        can not be serialized by the bus to one of the registered schemas
        In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler
        and afterward will call any registered message handlers.
        if the global raw handler returns an error the message gets rejected and any additional
        handlers will not be called.
        You should not use the global raw message handler to drive business logic as it breaks the local transactivity
        guarantees grabbit provides and should only be used in specialized cases.
        If you do decide to use this feature try not shooting yourself in the foot.
*/
SetGlobalRawMessageHandler(handler RawMessageHandler)
}

//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Expand Down
22 changes: 15 additions & 7 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type DefaultBus struct {
amqpOutbox *AMQPOutbox

RPCHandlers map[string]MessageHandler
deadletterHandler DeadLetterMessageHandler
deadletterHandler RawMessageHandler
globalRawHandler RawMessageHandler
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
Expand Down Expand Up @@ -73,8 +74,8 @@ var (
//BaseRetryDuration defines the basic milliseconds that the retry algorithm uses
//for a random retry time. Default is 10 but it is configurable.
BaseRetryDuration = 10 * time.Millisecond
//RpcHeaderName used to define the header in grabbit for RPC
RpcHeaderName = "x-grabbit-msg-rpc-id"
//RPCHeaderName used to define the header in grabbit for RPC
RPCHeaderName = "x-grabbit-msg-rpc-id"
)

func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
Expand Down Expand Up @@ -286,6 +287,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
rpcLock: b.RPCLock,
rpcHandlers: b.RPCHandlers,
deadletterHandler: b.deadletterHandler,
globalRawHandler: b.globalRawHandler,
handlersLock: &sync.Mutex{},
registrations: b.Registrations,
serializer: b.Serializer,
Expand Down Expand Up @@ -547,11 +549,17 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
return b.registerHandlerImpl(exchange, topic, event, handler)
}

//HandleDeadletter implements GBus.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
//HandleDeadletter implements Deadlettering.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler) {
b.registerDeadLetterHandler(handler)
}

//HandleDeadletter implements RawMessageHandling.SetGlobalRawMessageHandler
func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.globalRawHandler = handler
}

//ReturnDeadToQueue returns a message to its original destination
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error {
return b.returnDeadToQueue(ctx, nil, publishing)
Expand Down Expand Up @@ -692,7 +700,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
return nil
}

func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
func (b *DefaultBus) registerDeadLetterHandler(handler RawMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.deadletterHandler = handler
}
Expand All @@ -706,7 +714,7 @@ type rpcPolicy struct {
}

func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
publishing.Headers[RpcHeaderName] = p.rpcID
publishing.Headers[RPCHeaderName] = p.rpcID
}

//Log returns the default logrus.FieldLogger for the bus via the Glogged helper
Expand Down
9 changes: 5 additions & 4 deletions gbus/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@ package gbus

import (
"database/sql"
"github.com/streadway/amqp"
"reflect"
"runtime"
"strings"

"github.com/streadway/amqp"
)

//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//DeadLetterMessageHandler signature for dead letter handler
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error
//RawMessageHandler signature for handlers that handle raw amqp deliveries
type RawMessageHandler func(tx *sql.Tx, delivery *amqp.Delivery) error

//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
func (mg MessageHandler) Name() string {
return nameFromFunc(mg)
}

//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
func (dlmg DeadLetterMessageHandler) Name() string {
func (dlmg RawMessageHandler) Name() string {
return nameFromFunc(dlmg)
}

Expand Down
34 changes: 27 additions & 7 deletions gbus/messages.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gbus

import (
"errors"
"fmt"

"github.com/opentracing/opentracing-go/log"
"github.com/rs/xid"
"github.com/streadway/amqp"
Expand All @@ -27,11 +30,28 @@ func NewBusMessage(payload Message) *BusMessage {
return bm
}

//NewFromAMQPHeaders creates a BusMessage from headers of an amqp message
func NewFromAMQPHeaders(headers amqp.Table) *BusMessage {
//NewFromDelivery creates a BusMessage from an amqp delivery
func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error) {
bm := &BusMessage{}
bm.SetFromAMQPHeaders(headers)
return bm
bm.SetFromAMQPHeaders(delivery)

bm.ID = delivery.MessageId
bm.CorrelationID = delivery.CorrelationId
if delivery.Exchange != "" {
bm.Semantics = EVT
} else {
bm.Semantics = CMD
}
if bm.PayloadFQN == "" || bm.Semantics == "" {
errMsg := fmt.Sprintf("missing critical headers. message_name:%s semantics: %s", bm.PayloadFQN, bm.Semantics)
return nil, errors.New(errMsg)
}
return bm, nil
}

//GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery
func GetMessageName(delivery amqp.Delivery) string {
return castToString(delivery.Headers["x-msg-name"])
}

//GetAMQPHeaders convert to AMQP headers Table everything but a payload
Expand All @@ -46,12 +66,12 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
}

//SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table) {

func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) {
headers := delivery.Headers
bm.SagaID = castToString(headers["x-msg-saga-id"])
bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"])
bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"])
bm.PayloadFQN = castToString(headers["x-msg-name"])
bm.PayloadFQN = GetMessageName(delivery)

}

Expand Down
Loading

0 comments on commit 488c31a

Please sign in to comment.