Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support handling raw message #138

Merged
merged 17 commits into from
Aug 25, 2019
Merged
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
6) Deadlettering
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus

Planned:

1) Deduplication of inbound messages
8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing

## Stable release
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
Expand Down
25 changes: 25 additions & 0 deletions docs/TRACING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Tracing

grabbit supports reporting standard [OpenTracing](https://opentracing.io/) tracing spans to a compatable OpenTracing backend (such as [Jaeger](https://www.jaegertracing.io/)).

NOTE: In your hosting process you will need to set up a global tracer to collect and forward the traces reported by grabbit. See Jaeger go client for an [example](https://github.com/jaegertracing/jaeger-client-go)

Once the global tracer is set up you will need to make sure that in your message handlers you carry over the passed in context to successive messages sent by the handler.

```go

func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
reply := gbus.NewBusMessage(MyReply{})
cmd := gbus.NewBusMessage(MyCommand{})
ctx := invocation.Ctx()

if err := invocation.Send(ctx, "another-service", cmd); err != nil{
return err
}
if err := invocation.Reply(ctx, reply); err != nil{
return err
}
return nil
}

```
30 changes: 29 additions & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type BusConfiguration struct {
type Bus interface {
HandlerRegister
Deadlettering
RawMessageHandling
Copy link
Contributor

@vladshub vladshub Aug 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a chain design with default chain components would be more effective here and more clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, we can explore this design idea

BusSwitch
Messaging
SagaRegister
Expand Down Expand Up @@ -129,10 +130,37 @@ type Saga interface {

//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Deadlettering interface {
HandleDeadletter(handler DeadLetterMessageHandler)
/*
HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead.
This function will be removed in future grabbit releases
*/
HandleDeadletter(handler RawMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

/*
RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees
that the bus provides
*/
type RawMessageHandling interface {
/*
SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix indentation :)

        to the service queue.
        The handler will get called with a scoped transaction that is a different transaction than the ones that
        regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery
        can not be serialized by the bus to one of the registered schemas

        In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler
        and afterward will call any registered message handlers.
        if the global raw handler returns an error the message gets rejected and any additional
        handlers will not be called.
        You should not use the global raw message handler to drive business logic as it breaks the local transactivity
        guarantees grabbit provides and should only be used in specialized cases.
        If you do decide to use this feature try not shooting yourself in the foot.
*/
SetGlobalRawMessageHandler(handler RawMessageHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Global?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see the case in which more than one is needed. and if a client would like some logic being routed to according to specific criteria they should use the regular message handlers

}

//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
type RequestSagaTimeout interface {
TimeoutDuration() time.Duration
Expand Down
25 changes: 17 additions & 8 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type DefaultBus struct {
amqpOutbox *AMQPOutbox

RPCHandlers map[string]MessageHandler
deadletterHandler DeadLetterMessageHandler
deadletterHandler RawMessageHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going back to the chain question from before? :)

globalRawHandler RawMessageHandler
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
Expand Down Expand Up @@ -73,8 +74,8 @@ var (
//BaseRetryDuration defines the basic milliseconds that the retry algorithm uses
//for a random retry time. Default is 10 but it is configurable.
BaseRetryDuration = 10 * time.Millisecond
//RpcHeaderName used to define the header in grabbit for RPC
RpcHeaderName = "x-grabbit-msg-rpc-id"
//RPCHeaderName used to define the header in grabbit for RPC
RPCHeaderName = "x-grabbit-msg-rpc-id"
)

func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
Expand Down Expand Up @@ -286,6 +287,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
rpcLock: b.RPCLock,
rpcHandlers: b.RPCHandlers,
deadletterHandler: b.deadletterHandler,
globalRawHandler: b.globalRawHandler,
handlersLock: &sync.Mutex{},
registrations: b.Registrations,
serializer: b.Serializer,
Expand Down Expand Up @@ -547,11 +549,17 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
return b.registerHandlerImpl(exchange, topic, event, handler)
}

//HandleDeadletter implements GBus.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
//HandleDeadletter implements Deadlettering.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler) {
b.registerDeadLetterHandler(handler)
}

//HandleDeadletter implements RawMessageHandling.SetGlobalRawMessageHandler
func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.globalRawHandler = handler
}

//ReturnDeadToQueue returns a message to its original destination
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error {
return b.returnDeadToQueue(ctx, nil, publishing)
Expand Down Expand Up @@ -623,7 +631,8 @@ func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.
func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) {
b.SenderLock.Lock()
defer b.SenderLock.Unlock()
span, _ := opentracing.StartSpanFromContext(sctx, "sendImpl")
span, _ := opentracing.StartSpanFromContext(sctx, "SendMessage")

defer func() {
if err := recover(); err != nil {
errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack())
Expand Down Expand Up @@ -691,7 +700,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
return nil
}

func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
func (b *DefaultBus) registerDeadLetterHandler(handler RawMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.deadletterHandler = handler
}
Expand All @@ -705,7 +714,7 @@ type rpcPolicy struct {
}

func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
publishing.Headers[RpcHeaderName] = p.rpcID
publishing.Headers[RPCHeaderName] = p.rpcID
}

//Log returns the default logrus.FieldLogger for the bus via the Glogged helper
Expand Down
9 changes: 5 additions & 4 deletions gbus/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@ package gbus

import (
"database/sql"
"github.com/streadway/amqp"
"reflect"
"runtime"
"strings"

"github.com/streadway/amqp"
)

//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//DeadLetterMessageHandler signature for dead letter handler
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error
//RawMessageHandler signature for handlers that handle raw amqp deliveries
type RawMessageHandler func(tx *sql.Tx, delivery *amqp.Delivery) error

//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
func (mg MessageHandler) Name() string {
return nameFromFunc(mg)
}

//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
func (dlmg DeadLetterMessageHandler) Name() string {
func (dlmg RawMessageHandler) Name() string {
return nameFromFunc(dlmg)
}

Expand Down
34 changes: 27 additions & 7 deletions gbus/messages.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gbus

import (
"errors"
"fmt"

"github.com/opentracing/opentracing-go/log"
"github.com/rs/xid"
"github.com/streadway/amqp"
Expand All @@ -27,11 +30,28 @@ func NewBusMessage(payload Message) *BusMessage {
return bm
}

//NewFromAMQPHeaders creates a BusMessage from headers of an amqp message
func NewFromAMQPHeaders(headers amqp.Table) *BusMessage {
//NewFromDelivery creates a BusMessage from an amqp delivery
func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error) {
bm := &BusMessage{}
bm.SetFromAMQPHeaders(headers)
return bm
bm.SetFromAMQPHeaders(delivery)

bm.ID = delivery.MessageId
bm.CorrelationID = delivery.CorrelationId
if delivery.Exchange != "" {
bm.Semantics = EVT
} else {
bm.Semantics = CMD
}
if bm.PayloadFQN == "" || bm.Semantics == "" {
errMsg := fmt.Sprintf("missing critical headers. message_name:%s semantics: %s", bm.PayloadFQN, bm.Semantics)
return nil, errors.New(errMsg)
}
return bm, nil
}

//GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery
func GetMessageName(delivery amqp.Delivery) string {
return castToString(delivery.Headers["x-msg-name"])
}

//GetAMQPHeaders convert to AMQP headers Table everything but a payload
Expand All @@ -46,12 +66,12 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
}

//SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table) {

func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) {
headers := delivery.Headers
bm.SagaID = castToString(headers["x-msg-saga-id"])
bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"])
bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"])
bm.PayloadFQN = castToString(headers["x-msg-name"])
bm.PayloadFQN = GetMessageName(delivery)

}

Expand Down
35 changes: 26 additions & 9 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package saga

import (
"context"
"database/sql"
"errors"
"fmt"
"reflect"
"strings"
"sync"

"github.com/opentracing/opentracing-go"
slog "github.com/opentracing/opentracing-go/log"
"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/metrics"
Expand Down Expand Up @@ -98,7 +101,8 @@ func (imsm *Glue) getDefsForMsgName(msgName string) []*Def {
return defs
}

func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) error {
//SagaHandler is the generic handler invoking saga instances
func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessage) error {

imsm.lock.Lock()
defer imsm.lock.Unlock()
Expand All @@ -117,11 +121,12 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
*/
startNew := def.shouldStartNewSaga(message)
if startNew {

newInstance := def.newInstance()
imsm.Log().
WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
Info("created new saga")
if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil {
if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
return invkErr
}
Expand Down Expand Up @@ -154,7 +159,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
return e
}
def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}
Expand All @@ -176,7 +181,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)

for _, instance := range instances {
def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}
Expand All @@ -191,13 +196,16 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
return nil
}

func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {
func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {

span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), def.String())
defer span.Finish()
sginv := &sagaInvocation{
decoratedBus: invocation.Bus(),
decoratedInvocation: invocation,
inboundMsg: message,
sagaID: instance.ID,
ctx: invocation.Ctx(),
ctx: sctx,
invokingService: imsm.svcName,
}
sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{
Expand All @@ -207,7 +215,11 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
}))

exchange, routingKey := invocation.Routing()
return instance.invoke(exchange, routingKey, sginv, message)
err := instance.invoke(exchange, routingKey, sginv, message)
if err != nil {
span.LogFields(slog.Error(err))
}
return err
}

func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {
Expand All @@ -232,7 +244,7 @@ func (imsm *Glue) registerMessage(message gbus.Message) error {
return nil
}
imsm.alreadyRegistred[message.SchemaName()] = true
return imsm.bus.HandleMessage(message, imsm.handler)
return imsm.bus.HandleMessage(message, imsm.SagaHandler)
}

func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) error {
Expand All @@ -241,7 +253,7 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
return nil
}
imsm.alreadyRegistred[event.SchemaName()] = true
return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler)
return imsm.bus.HandleEvent(exchange, topic, event, imsm.SagaHandler)
}

//TimeoutSaga fetches a saga instance and calls its timeout interface
Expand All @@ -257,7 +269,12 @@ func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error {
if err != nil {
return err
}

span, _ := opentracing.StartSpanFromContext(context.Background(), "SagaTimeout")
span.SetTag("saga_type", saga.String())
defer span.Finish()
timeoutErr := saga.timeout(tx, imsm.bus)

if timeoutErr != nil {
imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
return timeoutErr
Expand Down
Loading