Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support handling raw message #138

Merged
merged 17 commits into from
Aug 25, 2019
30 changes: 29 additions & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type BusConfiguration struct {
type Bus interface {
HandlerRegister
Deadlettering
RawMessageHandling
Copy link
Contributor

@vladshub vladshub Aug 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a chain design with default chain components would be more effective here and more clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, we can explore this design idea

BusSwitch
Messaging
SagaRegister
Expand Down Expand Up @@ -129,10 +130,37 @@ type Saga interface {

//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Deadlettering interface {
HandleDeadletter(handler DeadLetterMessageHandler)
/*
HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead.
This function will be removed in future grabbit releases
*/
HandleDeadletter(handler RawMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

/*
RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees
that the bus provides
*/
type RawMessageHandling interface {
/*
SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix indentation :)

        to the service queue.
        The handler will get called with a scoped transaction that is a different transaction than the ones that
        regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery
        can not be serialized by the bus to one of the registered schemas

        In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler
        and afterward will call any registered message handlers.
        if the global raw handler returns an error the message gets rejected and any additional
        handlers will not be called.
        You should not use the global raw message handler to drive business logic as it breaks the local transactivity
        guarantees grabbit provides and should only be used in specialized cases.
        If you do decide to use this feature try not shooting yourself in the foot.
*/
SetGlobalRawMessageHandler(handler RawMessageHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Global?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see the case in which more than one is needed. and if a client would like some logic being routed to according to specific criteria they should use the regular message handlers

}

//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Expand Down
22 changes: 15 additions & 7 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type DefaultBus struct {
amqpOutbox *AMQPOutbox

RPCHandlers map[string]MessageHandler
deadletterHandler DeadLetterMessageHandler
deadletterHandler RawMessageHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going back to the chain question from before? :)

globalRawHandler RawMessageHandler
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
Expand Down Expand Up @@ -73,8 +74,8 @@ var (
//BaseRetryDuration defines the basic milliseconds that the retry algorithm uses
//for a random retry time. Default is 10 but it is configurable.
BaseRetryDuration = 10 * time.Millisecond
//RpcHeaderName used to define the header in grabbit for RPC
RpcHeaderName = "x-grabbit-msg-rpc-id"
//RPCHeaderName used to define the header in grabbit for RPC
RPCHeaderName = "x-grabbit-msg-rpc-id"
)

func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
Expand Down Expand Up @@ -286,6 +287,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
rpcLock: b.RPCLock,
rpcHandlers: b.RPCHandlers,
deadletterHandler: b.deadletterHandler,
globalRawHandler: b.globalRawHandler,
handlersLock: &sync.Mutex{},
registrations: b.Registrations,
serializer: b.Serializer,
Expand Down Expand Up @@ -547,11 +549,17 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
return b.registerHandlerImpl(exchange, topic, event, handler)
}

//HandleDeadletter implements GBus.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
//HandleDeadletter implements Deadlettering.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler) {
b.registerDeadLetterHandler(handler)
}

//HandleDeadletter implements RawMessageHandling.SetGlobalRawMessageHandler
func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.globalRawHandler = handler
}

//ReturnDeadToQueue returns a message to its original destination
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error {
return b.returnDeadToQueue(ctx, nil, publishing)
Expand Down Expand Up @@ -691,7 +699,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
return nil
}

func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
func (b *DefaultBus) registerDeadLetterHandler(handler RawMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.deadletterHandler = handler
}
Expand All @@ -705,7 +713,7 @@ type rpcPolicy struct {
}

func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
publishing.Headers[RpcHeaderName] = p.rpcID
publishing.Headers[RPCHeaderName] = p.rpcID
}

//Log returns the default logrus.FieldLogger for the bus via the Glogged helper
Expand Down
9 changes: 5 additions & 4 deletions gbus/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@ package gbus

import (
"database/sql"
"github.com/streadway/amqp"
"reflect"
"runtime"
"strings"

"github.com/streadway/amqp"
)

//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//DeadLetterMessageHandler signature for dead letter handler
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error
//RawMessageHandler signature for handlers that handle raw amqp deliveries
type RawMessageHandler func(tx *sql.Tx, delivery *amqp.Delivery) error

//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
func (mg MessageHandler) Name() string {
return nameFromFunc(mg)
}

//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
func (dlmg DeadLetterMessageHandler) Name() string {
func (dlmg RawMessageHandler) Name() string {
return nameFromFunc(dlmg)
}

Expand Down
34 changes: 27 additions & 7 deletions gbus/messages.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gbus

import (
"errors"
"fmt"

"github.com/opentracing/opentracing-go/log"
"github.com/rs/xid"
"github.com/streadway/amqp"
Expand All @@ -27,11 +30,28 @@ func NewBusMessage(payload Message) *BusMessage {
return bm
}

//NewFromAMQPHeaders creates a BusMessage from headers of an amqp message
func NewFromAMQPHeaders(headers amqp.Table) *BusMessage {
//NewFromDelivery creates a BusMessage from an amqp delivery
func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error) {
bm := &BusMessage{}
bm.SetFromAMQPHeaders(headers)
return bm
bm.SetFromAMQPHeaders(delivery)

bm.ID = delivery.MessageId
bm.CorrelationID = delivery.CorrelationId
if delivery.Exchange != "" {
bm.Semantics = EVT
} else {
bm.Semantics = CMD
}
if bm.PayloadFQN == "" || bm.Semantics == "" {
errMsg := fmt.Sprintf("missing critical headers. message_name:%s semantics: %s", bm.PayloadFQN, bm.Semantics)
return nil, errors.New(errMsg)
}
return bm, nil
}

//GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery
func GetMessageName(delivery amqp.Delivery) string {
return castToString(delivery.Headers["x-msg-name"])
}

//GetAMQPHeaders convert to AMQP headers Table everything but a payload
Expand All @@ -46,12 +66,12 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
}

//SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table) {

func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) {
headers := delivery.Headers
bm.SagaID = castToString(headers["x-msg-saga-id"])
bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"])
bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"])
bm.PayloadFQN = castToString(headers["x-msg-name"])
bm.PayloadFQN = GetMessageName(delivery)

}

Expand Down
9 changes: 5 additions & 4 deletions gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"database/sql"
"encoding/gob"
"fmt"
"github.com/rs/xid"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/wework/grabbit/gbus"
"strconv"
"strings"
"sync"
"time"

"github.com/rs/xid"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/wework/grabbit/gbus"
)

var (
Expand Down
Loading