Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge v1.x into master #99

Merged
merged 19 commits into from
Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
4) Publisher confirms
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
6) Deadlettering
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)

Planned:

1) Deduplication of inbound messages

## Stable release
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.

## Supported transactional resources
1) MySql > 8.0 (InnoDB)
Expand Down Expand Up @@ -78,8 +79,8 @@ Register a command handler
```Go


handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error
cmd, ok := message.Payload.(SomeCommand)
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error{
cmd, ok := message.Payload.(*SomeCommand)
if ok {
fmt.Printf("handler invoked with message %v", cmd)
return nil
Expand All @@ -96,7 +97,7 @@ Register an event handler


eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) {
evt, ok := message.Payload.(SomeEvent)
evt, ok := message.Payload.(*SomeEvent)
if ok {
fmt.Printf("handler invoked with event %v", evt)
return nil
Expand Down
22 changes: 22 additions & 0 deletions docs/LOGGING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Logging

grabbit supports structured logging via the [logrus](https://github.com/sirupsen/logrus) logging package.
The logger is accessible to message handlers via the past in invocation instance.

```go

func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
invocation.Log().WithField("name", "rhinof").Info("handler invoked")
return nil
}

```

grabbit will create a default instance of logrus FieldLogger if no such logger is set when the bus is created.
In order to set a custom logger when creating the bus you need to call the Builder.WithLogger method passing it
a logrus.FieldLogger instance.

```go


```
1 change: 1 addition & 0 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ type Invocation interface {
Tx() *sql.Tx
Ctx() context.Context
Routing() (exchange, routingKey string)
DeliveryInfo() DeliveryInfo
}

//Serializer is the base interface for all message serializers
Expand Down
42 changes: 24 additions & 18 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ type DefaultBus struct {
Outbox TxOutbox
PrefetchCount uint
AmqpConnStr string
amqpConn *amqp.Connection
ingressConn *amqp.Connection
egressConn *amqp.Connection
workers []*worker
AMQPChannel *amqp.Channel
outAMQPChannel *amqp.Channel
ingressChannel *amqp.Channel
egressChannel *amqp.Channel
serviceQueue amqp.Queue
rpcQueue amqp.Queue
SvcName string
Expand Down Expand Up @@ -83,7 +84,7 @@ func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
*/
uid := xid.New().String()
qName := b.SvcName + "_rpc_" + uid
q, e := b.AMQPChannel.QueueDeclare(qName,
q, e := b.ingressChannel.QueueDeclare(qName,
false, /*durable*/
true, /*autoDelete*/
false, /*exclusive*/
Expand All @@ -97,7 +98,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
var q amqp.Queue

if b.PurgeOnStartup {
msgsPurged, purgeError := b.AMQPChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/)
msgsPurged, purgeError := b.ingressChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/)
if purgeError != nil {
b.Log().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue")
return q, purgeError
Expand All @@ -108,7 +109,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
if b.DLX != "" {
args["x-dead-letter-exchange"] = b.DLX
}
q, e := b.AMQPChannel.QueueDeclare(qName,
q, e := b.ingressChannel.QueueDeclare(qName,
true, /*durable*/
false, /*autoDelete*/
false, /*exclusive*/
Expand All @@ -124,7 +125,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) {
func (b *DefaultBus) bindServiceQueue() error {

if b.deadletterHandler != nil && b.DLX != "" {
err := b.AMQPChannel.ExchangeDeclare(b.DLX, /*name*/
err := b.ingressChannel.ExchangeDeclare(b.DLX, /*name*/
"fanout", /*kind*/
true, /*durable*/
false, /*autoDelete*/
Expand All @@ -144,7 +145,7 @@ func (b *DefaultBus) bindServiceQueue() error {
for _, subscription := range b.DelayedSubscriptions {
topic := subscription[0]
exchange := subscription[1]
e := b.AMQPChannel.ExchangeDeclare(exchange, /*name*/
e := b.ingressChannel.ExchangeDeclare(exchange, /*name*/
"topic", /*kind*/
true, /*durable*/
false, /*autoDelete*/
Expand Down Expand Up @@ -178,27 +179,32 @@ func (b *DefaultBus) Start() error {

var e error
//create amqo connection and channel
if b.amqpConn, e = b.connect(MaxRetryCount); e != nil {
if b.ingressConn, e = b.connect(MaxRetryCount); e != nil {
return e
}
if b.egressConn, e = b.connect(MaxRetryCount); e != nil {
return e
}

if b.AMQPChannel, e = b.createAMQPChannel(b.amqpConn); e != nil {
if b.ingressChannel, e = b.createAMQPChannel(b.ingressConn); e != nil {
return e
}
if b.outAMQPChannel, e = b.createAMQPChannel(b.amqpConn); e != nil {
if b.egressChannel, e = b.createAMQPChannel(b.egressConn); e != nil {
return e
}

//register on failure notifications
b.amqpErrors = make(chan *amqp.Error)
b.amqpBlocks = make(chan amqp.Blocking)
b.amqpConn.NotifyClose(b.amqpErrors)
b.amqpConn.NotifyBlocked(b.amqpBlocks)
b.outAMQPChannel.NotifyClose(b.amqpErrors)
b.ingressConn.NotifyClose(b.amqpErrors)
b.ingressConn.NotifyBlocked(b.amqpBlocks)
b.egressConn.NotifyClose(b.amqpErrors)
b.egressConn.NotifyBlocked(b.amqpBlocks)
b.egressChannel.NotifyClose(b.amqpErrors)
//TODO:Figure out what should be done

//init the outbox that sends the messages to the amqp transport and handles publisher confirms
if e := b.Outgoing.init(b.outAMQPChannel, b.Confirm, true); e != nil {
if e := b.Outgoing.init(b.egressChannel, b.Confirm, true); e != nil {
return e
}
/*
Expand All @@ -208,7 +214,7 @@ func (b *DefaultBus) Start() error {
if b.IsTxnl {

var amqpChan *amqp.Channel
if amqpChan, e = b.createAMQPChannel(b.amqpConn); e != nil {
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
return e
}
Expand Down Expand Up @@ -269,7 +275,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
workers := make([]*worker, 0)
for i := uint(0); i < workerNum; i++ {
//create a channel per worker as we can't share channels across go routines
amqpChan, createChanErr := b.createAMQPChannel(b.amqpConn)
amqpChan, createChanErr := b.createAMQPChannel(b.ingressConn)
if createChanErr != nil {
return nil, createChanErr
}
Expand Down Expand Up @@ -681,7 +687,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
}

func (b *DefaultBus) bindQueue(topic, exchange string) error {
return b.AMQPChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
}

type rpcPolicy struct {
Expand Down
24 changes: 17 additions & 7 deletions gbus/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ var _ Messaging = &defaultInvocationContext{}

type defaultInvocationContext struct {
*Glogged
invocingSvc string
bus *DefaultBus
inboundMsg *BusMessage
tx *sql.Tx
ctx context.Context
exchange string
routingKey string
invocingSvc string
bus *DefaultBus
inboundMsg *BusMessage
tx *sql.Tx
ctx context.Context
exchange string
routingKey string
deliveryInfo DeliveryInfo
}

type DeliveryInfo struct {
Attempt uint
MaxRetryCount uint
}

func (dfi *defaultInvocationContext) Log() logrus.FieldLogger {
Expand Down Expand Up @@ -79,3 +85,7 @@ func (dfi *defaultInvocationContext) Ctx() context.Context {
func (dfi *defaultInvocationContext) Routing() (exchange, routingKey string) {
return dfi.exchange, dfi.routingKey
}

func (dfi *defaultInvocationContext) DeliveryInfo() DeliveryInfo {
return dfi.deliveryInfo
}
4 changes: 4 additions & 0 deletions gbus/saga/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) {
return si.decoratedInvocation.Routing()
}

func (si *sagaInvocation) DeliveryInfo() gbus.DeliveryInfo {
return si.decoratedInvocation.DeliveryInfo()
}

//func (si *sagaInvocation) Log() logrus.FieldLogger {
// return si.decoratedInvocation.Log().WithField("saga_id", si.sagaID)
//}
8 changes: 6 additions & 2 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
//this is the action that will get retried
// each retry should run a new and separate transaction which should end with a commit or rollback

action := func(attempts uint) (actionErr error) {
action := func(attempt uint) (actionErr error) {
var tx *sql.Tx
var txCreateErr error
if worker.isTxnl {
Expand All @@ -343,7 +343,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
}

worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers")
worker.span.LogFields(slog.Uint64("attempt", uint64(attempts+1)))
worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1)))
defer func() {
if p := recover(); p != nil {
pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack())
Expand Down Expand Up @@ -373,6 +373,10 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
ctx: hsctx,
exchange: delivery.Exchange,
routingKey: delivery.RoutingKey,
deliveryInfo: DeliveryInfo{
Attempt: attempt,
MaxRetryCount: MaxRetryCount,
},
}
ctx.SetLogger(worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()))
handlerErr = handler(ctx, message)
Expand Down