From 4c1c8123139d3acb4d204b6f22b52fff5dc9c4a2 Mon Sep 17 00:00:00 2001 From: adiweiss <46281796+adiweiss@users.noreply.github.com> Date: Sun, 18 Aug 2019 14:58:44 +0300 Subject: [PATCH 1/3] sanitize migrations table name (#130) --- gbus/tx/mysql/migrations.go | 12 ++++++++++-- tests/bus_test.go | 11 +++++++++++ tests/consts.go | 2 ++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/gbus/tx/mysql/migrations.go b/gbus/tx/mysql/migrations.go index c629d9a..6f78f14 100644 --- a/gbus/tx/mysql/migrations.go +++ b/gbus/tx/mysql/migrations.go @@ -2,8 +2,9 @@ package mysql import ( "database/sql" + "regexp" + "strings" - "fmt" "github.com/lopezator/migrator" "github.com/wework/grabbit/gbus/tx" ) @@ -86,7 +87,7 @@ func TimoutTableMigration(svcName string) *migrator.Migration { //EnsureSchema implements Grabbit's migrations strategy func EnsureSchema(db *sql.DB, svcName string) { - migrationsTable := fmt.Sprintf("grabbitMigrations_%s", svcName) + migrationsTable := sanitizedMigrationsTable(svcName) migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations( OutboxMigrations(svcName), @@ -101,3 +102,10 @@ func EnsureSchema(db *sql.DB, svcName string) { panic(err) } } + +func sanitizedMigrationsTable(svcName string) string { + var re = regexp.MustCompile(`-|;|\\|`) + sanitized := re.ReplaceAllString(svcName, "") + + return strings.ToLower("grabbitMigrations_" + sanitized) +} diff --git a/tests/bus_test.go b/tests/bus_test.go index 953a0a2..b5d5293 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -412,6 +412,17 @@ func TestHealthCheck(t *testing.T) { } } +func TestSanitizingSvcName(t *testing.T) { + svc4 := createNamedBusForTest(testSvc4) + err := svc4.Start() + if err != nil { + t.Error(err.Error()) + } + defer svc4.Shutdown() + + fmt.Println("succeeded sanitizing service name") +} + func noopTraceContext() context.Context { return context.Background() // tracer := opentracing.NoopTracer{} diff --git a/tests/consts.go b/tests/consts.go index 51fb49a..964ae02 100644 --- a/tests/consts.go +++ b/tests/consts.go @@ -12,12 +12,14 @@ var connStr string var testSvc1 string var testSvc2 string var testSvc3 string +var testSvc4 string func init() { connStr = "amqp://rabbitmq:rabbitmq@localhost" testSvc1 = "testSvc1" testSvc2 = "testSvc2" testSvc3 = "testSvc3" + testSvc4 = "test-svc4" } func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration) gbus.Bus { From 6c2a9e6ebc37e3fbd06a6c25e8b45b668a8cdb02 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 18 Aug 2019 14:59:18 +0300 Subject: [PATCH 2/3] more linting fixes for goreportcard (#129) --- .github/stale.yml | 17 +++++++++++++++++ gbus/abstractions.go | 2 +- gbus/builder/builder.go | 2 ++ gbus/invocation.go | 1 + gbus/metrics/handler_metrics.go | 6 ++++++ gbus/metrics/message_metrics.go | 4 +++- gbus/outbox.go | 1 - 7 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 .github/stale.yml diff --git a/.github/stale.yml b/.github/stale.yml new file mode 100644 index 0000000..27cec56 --- /dev/null +++ b/.github/stale.yml @@ -0,0 +1,17 @@ +# Number of days of inactivity before an issue becomes stale +daysUntilStale: 7 +# Number of days of inactivity before a stale issue is closed +daysUntilClose: 7 +# Issues with these labels will never be considered stale + exemptLabels: + - bug + - enhancment +# Label to use when marking an issue as stale +staleLabel: wontfix +# Comment to post when marking an issue as stale. Set to `false` to disable +markComment: > + This issue has been automatically marked as stale because it has not had + recent activity. It will be closed if no further activity occurs. Thank you + for your contributions. +# Comment to post when closing a stale issue. Set to `false` to disable +closeComment: false diff --git a/gbus/abstractions.go b/gbus/abstractions.go index b976768..cf02687 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -127,7 +127,7 @@ type Saga interface { New() Saga } -//RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue +//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue type Deadlettering interface { HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 77d299e..60583b5 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -77,7 +77,9 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } gb.TxProvider = mysqltx + mysql.EnsureSchema(mysqltx.Database, gb.SvcName) + //TODO move purge logic into the NewSagaStore factory method sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx) if builder.purgeOnStartup { diff --git a/gbus/invocation.go b/gbus/invocation.go index b67a695..f107e67 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -23,6 +23,7 @@ type defaultInvocationContext struct { deliveryInfo DeliveryInfo } +//DeliveryInfo provdes information as to the attempted deilvery of the invocation type DeliveryInfo struct { Attempt uint MaxRetryCount uint diff --git a/gbus/metrics/handler_metrics.go b/gbus/metrics/handler_metrics.go index 040737e..a666be5 100644 --- a/gbus/metrics/handler_metrics.go +++ b/gbus/metrics/handler_metrics.go @@ -26,6 +26,7 @@ type handlerMetrics struct { latency prometheus.Summary } +//AddHandlerMetrics adds a handlere to be tracked with metrics func AddHandlerMetrics(handlerName string) { handlerMetrics := newHandlerMetrics(handlerName) _, exists := handlerMetricsByHandlerName.LoadOrStore(handlerName, handlerMetrics) @@ -35,6 +36,7 @@ func AddHandlerMetrics(handlerName string) { } } +//RunHandlerWithMetric runs a specific handler with metrics being collected and reported to prometheus func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error { handlerMetrics := GetHandlerMetrics(handlerName) defer func() { @@ -63,6 +65,7 @@ func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger return err } +//GetHandlerMetrics gets the metrics handler associated with the handlerName func GetHandlerMetrics(handlerName string) *handlerMetrics { entry, ok := handlerMetricsByHandlerName.Load(handlerName) if ok { @@ -99,14 +102,17 @@ func trackTime(functionToTrack func() error, observer prometheus.Observer) error return functionToTrack() } +//GetSuccessCount gets the value of the handlers success value func (hm *handlerMetrics) GetSuccessCount() (float64, error) { return hm.getLabeledCounterValue(success) } +//GetFailureCount gets the value of the handlers failure value func (hm *handlerMetrics) GetFailureCount() (float64, error) { return hm.getLabeledCounterValue(failure) } +//GetLatencySampleCount gets the value of the handlers latency value func (hm *handlerMetrics) GetLatencySampleCount() (*uint64, error) { m := &io_prometheus_client.Metric{} err := hm.latency.Write(m) diff --git a/gbus/metrics/message_metrics.go b/gbus/metrics/message_metrics.go index 753ebca..5d03805 100644 --- a/gbus/metrics/message_metrics.go +++ b/gbus/metrics/message_metrics.go @@ -3,17 +3,19 @@ package metrics import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_model/go" + io_prometheus_client "github.com/prometheus/client_model/go" ) var ( rejectedMessages = newRejectedMessagesCounter() ) +//ReportRejectedMessage reports a message being rejected to the metrics counter func ReportRejectedMessage() { rejectedMessages.Inc() } +//GetRejectedMessagesValue gets the value of the rejected message counter func GetRejectedMessagesValue() (float64, error) { m := &io_prometheus_client.Metric{} err := rejectedMessages.Write(m) diff --git a/gbus/outbox.go b/gbus/outbox.go index 7e35540..dac293a 100644 --- a/gbus/outbox.go +++ b/gbus/outbox.go @@ -53,7 +53,6 @@ func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) erro //Shutdown stops the outbox func (out *AMQPOutbox) Shutdown() { close(out.stop) - } //Post implements Outbox.Send From 58b7cecb096692c86cfbaaf509ac50c02032cf54 Mon Sep 17 00:00:00 2001 From: Yuval Mendelbaum <43959535+yuvmendel@users.noreply.github.com> Date: Sun, 18 Aug 2019 15:07:50 +0300 Subject: [PATCH 3/3] =?UTF-8?q?added=20metrics=20on=20deadLetterHandler,?= =?UTF-8?q?=20refactored=20HandleDeadLetter=20inter=E2=80=A6=20(#122)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * added metrics on deadLetterHandler, refactored HandleDeadLetter interface to receive new DeadLetterMessageHandler type * fix dead letter test and a build error * added documentation for DeadLetterMessageHandler, also fixed poison spelling throughout code * retrigger build --- gbus/abstractions.go | 2 +- gbus/bus.go | 11 ++++++++--- gbus/message_handler.go | 16 +++++++++++++++- gbus/worker.go | 7 ++++--- tests/bus_test.go | 32 +++++++++++++++++++++++++------- tests/testMessages.go | 6 +++--- 6 files changed, 56 insertions(+), 18 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index cf02687..3b3aa75 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -129,7 +129,7 @@ type Saga interface { //Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue type Deadlettering interface { - HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) + HandleDeadletter(handler DeadLetterMessageHandler) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error } diff --git a/gbus/bus.go b/gbus/bus.go index 4443691..e3e6e8e 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -43,7 +43,7 @@ type DefaultBus struct { amqpOutbox *AMQPOutbox RPCHandlers map[string]MessageHandler - deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error + deadletterHandler DeadLetterMessageHandler HandlersLock *sync.Mutex RPCLock *sync.Mutex SenderLock *sync.Mutex @@ -548,8 +548,8 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler } //HandleDeadletter implements GBus.HandleDeadletter -func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) { - b.deadletterHandler = handler +func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) { + b.registerDeadLetterHandler(handler) } //ReturnDeadToQueue returns a message to its original destination @@ -691,6 +691,11 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag return nil } +func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) { + metrics.AddHandlerMetrics(handler.Name()) + b.deadletterHandler = handler +} + func (b *DefaultBus) bindQueue(topic, exchange string) error { return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/) } diff --git a/gbus/message_handler.go b/gbus/message_handler.go index 67a7d6c..8111a87 100644 --- a/gbus/message_handler.go +++ b/gbus/message_handler.go @@ -1,6 +1,8 @@ package gbus import ( + "database/sql" + "github.com/streadway/amqp" "reflect" "runtime" "strings" @@ -9,9 +11,21 @@ import ( //MessageHandler signature for all command handlers type MessageHandler func(invocation Invocation, message *BusMessage) error +//DeadLetterMessageHandler signature for dead letter handler +type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error + //Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type func (mg MessageHandler) Name() string { - funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name() + return nameFromFunc(mg) +} + +//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type +func (dlmg DeadLetterMessageHandler) Name() string { + return nameFromFunc(dlmg) +} + +func nameFromFunc(function interface{}) string { + funName := runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name() splits := strings.Split(funName, ".") fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1) return fn diff --git a/gbus/worker.go b/gbus/worker.go index aa35fb9..216b771 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -2,7 +2,6 @@ package gbus import ( "context" - "database/sql" "errors" "fmt" "math/rand" @@ -36,7 +35,7 @@ type worker struct { handlersLock *sync.Mutex registrations []*Registration rpcHandlers map[string]MessageHandler - deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error + deadletterHandler DeadLetterMessageHandler b *DefaultBus serializer Serializer txProvider TxProvider @@ -215,7 +214,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { _ = worker.reject(true, delivery) return } - err := worker.deadletterHandler(tx, delivery) + err := metrics.RunHandlerWithMetric(func() error { + return worker.deadletterHandler(tx, delivery) + }, worker.deadletterHandler.Name(), worker.log()) var reject bool if err != nil { worker.log().WithError(err).Error("failed handling deadletter") diff --git a/tests/bus_test.go b/tests/bus_test.go index b5d5293..265684e 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -231,11 +231,11 @@ func TestDeadlettering(t *testing.T) { var waitgroup sync.WaitGroup waitgroup.Add(2) - poision := gbus.NewBusMessage(PoisionMessage{}) + poison := gbus.NewBusMessage(PoisonMessage{}) service1 := createNamedBusForTest(testSvc1) deadletterSvc := createNamedBusForTest("deadletterSvc") - deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { + deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error { waitgroup.Done() return nil } @@ -252,7 +252,7 @@ func TestDeadlettering(t *testing.T) { service1.Start() defer service1.Shutdown() - service1.Send(context.Background(), testSvc1, poision) + service1.Send(context.Background(), testSvc1, poison) service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) waitgroup.Wait() @@ -260,13 +260,31 @@ func TestDeadlettering(t *testing.T) { if count != 1 { t.Error("Should have one rejected message") } + + //because deadMessageHandler is an anonymous function and is registered first its name will be "func1" + handlerMetrics := metrics.GetHandlerMetrics("func1") + if handlerMetrics == nil { + t.Fatal("DeadLetterHandler should be registered for metrics") + } + failureCount, _ := handlerMetrics.GetFailureCount() + if failureCount != 0 { + t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", failureCount) + } + handlerMetrics = metrics.GetHandlerMetrics("func2") + if handlerMetrics == nil { + t.Fatal("faulty should be registered for metrics") + } + failureCount, _ = handlerMetrics.GetFailureCount() + if failureCount == 1 { + t.Errorf("faulty should have failed once, but it failed %f times", failureCount) + } } func TestReturnDeadToQueue(t *testing.T) { var visited bool proceed := make(chan bool, 0) - poision := gbus.NewBusMessage(Command1{}) + poison := gbus.NewBusMessage(Command1{}) service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) @@ -274,8 +292,8 @@ func TestReturnDeadToQueue(t *testing.T) { deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) - deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { - pub := amqpDeliveryToPublishing(poision) + deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error { + pub := amqpDeliveryToPublishing(poison) deadletterSvc.ReturnDeadToQueue(context.Background(), &pub) return nil } @@ -297,7 +315,7 @@ func TestReturnDeadToQueue(t *testing.T) { service1.Start() defer service1.Shutdown() - service1.Send(context.Background(), testSvc1, poision) + service1.Send(context.Background(), testSvc1, poison) select { case <-proceed: diff --git a/tests/testMessages.go b/tests/testMessages.go index 1cc3c54..54aa9c0 100644 --- a/tests/testMessages.go +++ b/tests/testMessages.go @@ -9,11 +9,11 @@ var _ gbus.Message = &Reply2{} var _ gbus.Message = &Event1{} var _ gbus.Message = &Event2{} -type PoisionMessage struct { +type PoisonMessage struct { } -func (PoisionMessage) SchemaName() string { - //an empty schema name will result in a message being treated as poision +func (PoisonMessage) SchemaName() string { + //an empty schema name will result in a message being treated as poison return "" }