Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added metrics on deadLetterHandler, refactored HandleDeadLetter inter… #122

Merged
merged 7 commits into from
Aug 18, 2019
2 changes: 1 addition & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type Saga interface {

//RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Deadlettering interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
HandleDeadletter(handler DeadLetterMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

Expand Down
9 changes: 7 additions & 2 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
}

//HandleDeadletter implements GBus.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) {
b.deadletterHandler = handler
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
b.registerDeadLetterHandler(handler)
}

//ReturnDeadToQueue returns a message to its original destination
Expand Down Expand Up @@ -699,6 +699,11 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
return nil
}

func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.deadletterHandler = handler
}

func (b *DefaultBus) bindQueue(topic, exchange string) error {
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
}
Expand Down
14 changes: 13 additions & 1 deletion gbus/message_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gbus

import (
"database/sql"
"github.com/streadway/amqp"
"reflect"
"runtime"
"strings"
Expand All @@ -9,8 +11,18 @@ import (
//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

type DeadLetterMessageHandler func(tx *sql.Tx, poision amqp.Delivery) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation for this exported type


rhinof marked this conversation as resolved.
Show resolved Hide resolved
func (mg MessageHandler) Name() string {
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
return nameFromFunc(mg)
}

func (dlmg DeadLetterMessageHandler) Name() string {
rhinof marked this conversation as resolved.
Show resolved Hide resolved
return nameFromFunc(dlmg)
}

func nameFromFunc(function interface{}) string {
funName := runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name()
splits := strings.Split(funName, ".")
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
return fn
Expand Down
7 changes: 4 additions & 3 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gbus

import (
"context"
"database/sql"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -36,7 +35,7 @@ type worker struct {
handlersLock *sync.Mutex
registrations []*Registration
rpcHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
deadletterHandler DeadLetterMessageHandler
b *DefaultBus
serializer Serializer
txProvider TxProvider
Expand Down Expand Up @@ -236,7 +235,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
_ = worker.reject(true, delivery)
return
}
err := worker.deadletterHandler(tx, delivery)
err := metrics.RunHandlerWithMetric(func() error {
return worker.deadletterHandler(tx, delivery)
}, worker.deadletterHandler.Name(), worker.log())
var reject bool
if err != nil {
worker.log().WithError(err).Error("failed handling deadletter")
Expand Down
10 changes: 10 additions & 0 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ func TestDeadlettering(t *testing.T) {
if count != 1 {
t.Error("Should have one rejected message")
}

// because deadMessageHandler is an anonymous function and is registered first its name will be "func1"
handlerMetrics := metrics.GetHandlerMetrics("func1")
if handlerMetrics == nil {
t.Fatal("DeadLetterHandler should be registered for metrics")
}
failureCount, _ := handlerMetrics.GetFailureCount()
if failureCount != 0 {
t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", failureCount)
}
}

func TestReturnDeadToQueue(t *testing.T) {
Expand Down