Skip to content

Commit

Permalink
Merge branch 'v1.x' into v1.1.0_rollup
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron committed Aug 18, 2019
2 parents 6b3fb54 + 78dfa6e commit 07a8137
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 22 deletions.
4 changes: 2 additions & 2 deletions gbus/abstractions.go
Expand Up @@ -127,9 +127,9 @@ type Saga interface {
New() Saga
}

//RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Deadlettering interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
HandleDeadletter(handler DeadLetterMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

Expand Down
11 changes: 8 additions & 3 deletions gbus/bus.go
Expand Up @@ -43,7 +43,7 @@ type DefaultBus struct {
amqpOutbox *AMQPOutbox

RPCHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
deadletterHandler DeadLetterMessageHandler
HandlersLock *sync.Mutex
RPCLock *sync.Mutex
SenderLock *sync.Mutex
Expand Down Expand Up @@ -548,8 +548,8 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
}

//HandleDeadletter implements GBus.HandleDeadletter
func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) {
b.deadletterHandler = handler
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
b.registerDeadLetterHandler(handler)
}

//ReturnDeadToQueue returns a message to its original destination
Expand Down Expand Up @@ -691,6 +691,11 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
return nil
}

func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
metrics.AddHandlerMetrics(handler.Name())
b.deadletterHandler = handler
}

func (b *DefaultBus) bindQueue(topic, exchange string) error {
return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/)
}
Expand Down
1 change: 1 addition & 0 deletions gbus/invocation.go
Expand Up @@ -23,6 +23,7 @@ type defaultInvocationContext struct {
deliveryInfo DeliveryInfo
}

//DeliveryInfo provdes information as to the attempted deilvery of the invocation
type DeliveryInfo struct {
Attempt uint
MaxRetryCount uint
Expand Down
16 changes: 15 additions & 1 deletion gbus/message_handler.go
@@ -1,6 +1,8 @@
package gbus

import (
"database/sql"
"github.com/streadway/amqp"
"reflect"
"runtime"
"strings"
Expand All @@ -9,9 +11,21 @@ import (
//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//DeadLetterMessageHandler signature for dead letter handler
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error

//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
func (mg MessageHandler) Name() string {
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
return nameFromFunc(mg)
}

//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
func (dlmg DeadLetterMessageHandler) Name() string {
return nameFromFunc(dlmg)
}

func nameFromFunc(function interface{}) string {
funName := runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name()
splits := strings.Split(funName, ".")
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
return fn
Expand Down
6 changes: 6 additions & 0 deletions gbus/metrics/handler_metrics.go
Expand Up @@ -26,6 +26,7 @@ type handlerMetrics struct {
latency prometheus.Summary
}

//AddHandlerMetrics adds a handlere to be tracked with metrics
func AddHandlerMetrics(handlerName string) {
handlerMetrics := newHandlerMetrics(handlerName)
_, exists := handlerMetricsByHandlerName.LoadOrStore(handlerName, handlerMetrics)
Expand All @@ -35,6 +36,7 @@ func AddHandlerMetrics(handlerName string) {
}
}

//RunHandlerWithMetric runs a specific handler with metrics being collected and reported to prometheus
func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error {
handlerMetrics := GetHandlerMetrics(handlerName)
defer func() {
Expand Down Expand Up @@ -63,6 +65,7 @@ func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger
return err
}

//GetHandlerMetrics gets the metrics handler associated with the handlerName
func GetHandlerMetrics(handlerName string) *handlerMetrics {
entry, ok := handlerMetricsByHandlerName.Load(handlerName)
if ok {
Expand Down Expand Up @@ -99,14 +102,17 @@ func trackTime(functionToTrack func() error, observer prometheus.Observer) error
return functionToTrack()
}

//GetSuccessCount gets the value of the handlers success value
func (hm *handlerMetrics) GetSuccessCount() (float64, error) {
return hm.getLabeledCounterValue(success)
}

//GetFailureCount gets the value of the handlers failure value
func (hm *handlerMetrics) GetFailureCount() (float64, error) {
return hm.getLabeledCounterValue(failure)
}

//GetLatencySampleCount gets the value of the handlers latency value
func (hm *handlerMetrics) GetLatencySampleCount() (*uint64, error) {
m := &io_prometheus_client.Metric{}
err := hm.latency.Write(m)
Expand Down
4 changes: 3 additions & 1 deletion gbus/metrics/message_metrics.go
Expand Up @@ -3,17 +3,19 @@ package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_model/go"
io_prometheus_client "github.com/prometheus/client_model/go"
)

var (
rejectedMessages = newRejectedMessagesCounter()
)

//ReportRejectedMessage reports a message being rejected to the metrics counter
func ReportRejectedMessage() {
rejectedMessages.Inc()
}

//GetRejectedMessagesValue gets the value of the rejected message counter
func GetRejectedMessagesValue() (float64, error) {
m := &io_prometheus_client.Metric{}
err := rejectedMessages.Write(m)
Expand Down
12 changes: 10 additions & 2 deletions gbus/tx/mysql/migrations.go
Expand Up @@ -2,8 +2,9 @@ package mysql

import (
"database/sql"
"regexp"
"strings"

"fmt"
"github.com/lopezator/migrator"
"github.com/wework/grabbit/gbus/tx"
)
Expand Down Expand Up @@ -86,7 +87,7 @@ func TimoutTableMigration(svcName string) *migrator.Migration {

//EnsureSchema implements Grabbit's migrations strategy
func EnsureSchema(db *sql.DB, svcName string) {
migrationsTable := fmt.Sprintf("grabbitMigrations_%s", svcName)
migrationsTable := sanitizedMigrationsTable(svcName)

migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations(
OutboxMigrations(svcName),
Expand All @@ -101,3 +102,10 @@ func EnsureSchema(db *sql.DB, svcName string) {
panic(err)
}
}

func sanitizedMigrationsTable(svcName string) string {
var re = regexp.MustCompile(`-|;|\\|`)
sanitized := re.ReplaceAllString(svcName, "")

return strings.ToLower("grabbitMigrations_" + sanitized)
}
7 changes: 4 additions & 3 deletions gbus/worker.go
Expand Up @@ -2,7 +2,6 @@ package gbus

import (
"context"
"database/sql"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -36,7 +35,7 @@ type worker struct {
handlersLock *sync.Mutex
registrations []*Registration
rpcHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
deadletterHandler DeadLetterMessageHandler
b *DefaultBus
serializer Serializer
txProvider TxProvider
Expand Down Expand Up @@ -215,7 +214,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
_ = worker.reject(true, delivery)
return
}
err := worker.deadletterHandler(tx, delivery)
err := metrics.RunHandlerWithMetric(func() error {
return worker.deadletterHandler(tx, delivery)
}, worker.deadletterHandler.Name(), worker.log())
var reject bool
if err != nil {
worker.log().WithError(err).Error("failed handling deadletter")
Expand Down
43 changes: 36 additions & 7 deletions tests/bus_test.go
Expand Up @@ -231,11 +231,11 @@ func TestDeadlettering(t *testing.T) {

var waitgroup sync.WaitGroup
waitgroup.Add(2)
poision := gbus.NewBusMessage(PoisionMessage{})
poison := gbus.NewBusMessage(PoisonMessage{})
service1 := createNamedBusForTest(testSvc1)
deadletterSvc := createNamedBusForTest("deadletterSvc")

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
waitgroup.Done()
return nil
}
Expand All @@ -252,30 +252,48 @@ func TestDeadlettering(t *testing.T) {
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poision)
service1.Send(context.Background(), testSvc1, poison)
service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{}))

waitgroup.Wait()
count, _ := metrics.GetRejectedMessagesValue()
if count != 1 {
t.Error("Should have one rejected message")
}

//because deadMessageHandler is an anonymous function and is registered first its name will be "func1"
handlerMetrics := metrics.GetHandlerMetrics("func1")
if handlerMetrics == nil {
t.Fatal("DeadLetterHandler should be registered for metrics")
}
failureCount, _ := handlerMetrics.GetFailureCount()
if failureCount != 0 {
t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", failureCount)
}
handlerMetrics = metrics.GetHandlerMetrics("func2")
if handlerMetrics == nil {
t.Fatal("faulty should be registered for metrics")
}
failureCount, _ = handlerMetrics.GetFailureCount()
if failureCount == 1 {
t.Errorf("faulty should have failed once, but it failed %f times", failureCount)
}
}

func TestReturnDeadToQueue(t *testing.T) {

var visited bool
proceed := make(chan bool, 0)
poision := gbus.NewBusMessage(Command1{})
poison := gbus.NewBusMessage(Command1{})

service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
pub := amqpDeliveryToPublishing(poision)
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
pub := amqpDeliveryToPublishing(poison)
deadletterSvc.ReturnDeadToQueue(context.Background(), &pub)
return nil
}
Expand All @@ -297,7 +315,7 @@ func TestReturnDeadToQueue(t *testing.T) {
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poision)
service1.Send(context.Background(), testSvc1, poison)

select {
case <-proceed:
Expand Down Expand Up @@ -412,6 +430,17 @@ func TestHealthCheck(t *testing.T) {
}
}

func TestSanitizingSvcName(t *testing.T) {
svc4 := createNamedBusForTest(testSvc4)
err := svc4.Start()
if err != nil {
t.Error(err.Error())
}
defer svc4.Shutdown()

fmt.Println("succeeded sanitizing service name")
}

func noopTraceContext() context.Context {
return context.Background()
// tracer := opentracing.NoopTracer{}
Expand Down
2 changes: 2 additions & 0 deletions tests/consts.go
Expand Up @@ -12,12 +12,14 @@ var connStr string
var testSvc1 string
var testSvc2 string
var testSvc3 string
var testSvc4 string

func init() {
connStr = "amqp://rabbitmq:rabbitmq@localhost"
testSvc1 = "testSvc1"
testSvc2 = "testSvc2"
testSvc3 = "testSvc3"
testSvc4 = "test-svc4"
}

func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration) gbus.Bus {
Expand Down
6 changes: 3 additions & 3 deletions tests/testMessages.go
Expand Up @@ -9,11 +9,11 @@ var _ gbus.Message = &Reply2{}
var _ gbus.Message = &Event1{}
var _ gbus.Message = &Event2{}

type PoisionMessage struct {
type PoisonMessage struct {
}

func (PoisionMessage) SchemaName() string {
//an empty schema name will result in a message being treated as poision
func (PoisonMessage) SchemaName() string {
//an empty schema name will result in a message being treated as poison
return ""
}

Expand Down

0 comments on commit 07a8137

Please sign in to comment.