Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added metrics on deadLetterHandler, refactored HandleDeadLetter inter… #122

Merged
merged 7 commits into from
Aug 18, 2019
2 changes: 1 addition & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type Saga interface {

//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Deadlettering interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
HandleDeadletter(handler DeadLetterMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

Expand Down
11 changes: 8 additions & 3 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type DefaultBus struct {
amqpOutbox *AMQPOutbox

RPCHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
deadletterHandler DeadLetterMessageHandler
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
Expand Down Expand Up @@ -548,8 +548,8 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
}

//HandleDeadletter implements GBus.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) {
b.deadletterHandler = handler
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
b.registerDeadLetterHandler(handler)
}

//ReturnDeadToQueue returns a message to its original destination
Expand Down Expand Up @@ -691,6 +691,11 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
return nil
}

func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.deadletterHandler = handler
}

func (b *DefaultBus) bindQueue(topic, exchange string) error {
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
}
Expand Down
16 changes: 15 additions & 1 deletion gbus/message_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gbus

import (
"database/sql"
"github.com/streadway/amqp"
"reflect"
"runtime"
"strings"
Expand All @@ -9,9 +11,21 @@ import (
//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//DeadLetterMessageHandler signature for dead letter handler
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error

rhinof marked this conversation as resolved.
Show resolved Hide resolved
//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
func (mg MessageHandler) Name() string {
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
return nameFromFunc(mg)
}

//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
func (dlmg DeadLetterMessageHandler) Name() string {
rhinof marked this conversation as resolved.
Show resolved Hide resolved
return nameFromFunc(dlmg)
}

func nameFromFunc(function interface{}) string {
funName := runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name()
splits := strings.Split(funName, ".")
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
return fn
Expand Down
7 changes: 4 additions & 3 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gbus

import (
"context"
"database/sql"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -36,7 +35,7 @@ type worker struct {
handlersLock *sync.Mutex
registrations []*Registration
rpcHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
deadletterHandler DeadLetterMessageHandler
b *DefaultBus
serializer Serializer
txProvider TxProvider
Expand Down Expand Up @@ -215,7 +214,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
_ = worker.reject(true, delivery)
return
}
err := worker.deadletterHandler(tx, delivery)
err := metrics.RunHandlerWithMetric(func() error {
return worker.deadletterHandler(tx, delivery)
}, worker.deadletterHandler.Name(), worker.log())
var reject bool
if err != nil {
worker.log().WithError(err).Error("failed handling deadletter")
Expand Down
32 changes: 25 additions & 7 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ func TestDeadlettering(t *testing.T) {

var waitgroup sync.WaitGroup
waitgroup.Add(2)
poision := gbus.NewBusMessage(PoisionMessage{})
poison := gbus.NewBusMessage(PoisonMessage{})
service1 := createNamedBusForTest(testSvc1)
deadletterSvc := createNamedBusForTest("deadletterSvc")

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
waitgroup.Done()
return nil
}
Expand All @@ -252,30 +252,48 @@ func TestDeadlettering(t *testing.T) {
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poision)
service1.Send(context.Background(), testSvc1, poison)
service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{}))

waitgroup.Wait()
count, _ := metrics.GetRejectedMessagesValue()
if count != 1 {
t.Error("Should have one rejected message")
}

//because deadMessageHandler is an anonymous function and is registered first its name will be "func1"
handlerMetrics := metrics.GetHandlerMetrics("func1")
if handlerMetrics == nil {
t.Fatal("DeadLetterHandler should be registered for metrics")
}
failureCount, _ := handlerMetrics.GetFailureCount()
if failureCount != 0 {
t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", failureCount)
}
handlerMetrics = metrics.GetHandlerMetrics("func2")
if handlerMetrics == nil {
t.Fatal("faulty should be registered for metrics")
}
failureCount, _ = handlerMetrics.GetFailureCount()
if failureCount == 1 {
t.Errorf("faulty should have failed once, but it failed %f times", failureCount)
}
}

func TestReturnDeadToQueue(t *testing.T) {

var visited bool
proceed := make(chan bool, 0)
poision := gbus.NewBusMessage(Command1{})
poison := gbus.NewBusMessage(Command1{})

service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
pub := amqpDeliveryToPublishing(poision)
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
pub := amqpDeliveryToPublishing(poison)
deadletterSvc.ReturnDeadToQueue(context.Background(), &pub)
return nil
}
Expand All @@ -297,7 +315,7 @@ func TestReturnDeadToQueue(t *testing.T) {
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poision)
service1.Send(context.Background(), testSvc1, poison)

select {
case <-proceed:
Expand Down
6 changes: 3 additions & 3 deletions tests/testMessages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ var _ gbus.Message = &Reply2{}
var _ gbus.Message = &Event1{}
var _ gbus.Message = &Event2{}

type PoisionMessage struct {
type PoisonMessage struct {
}

func (PoisionMessage) SchemaName() string {
//an empty schema name will result in a message being treated as poision
func (PoisonMessage) SchemaName() string {
//an empty schema name will result in a message being treated as poison
return ""
}

Expand Down