Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add handler metrics to bus and saga #101

Merged
merged 11 commits into from
Jul 22, 2019
3 changes: 0 additions & 3 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ type HandlerRegister interface {
HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
}

//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//Saga is the base interface for all Sagas.
type Saga interface {
//StartedBy returns the messages that when received should create a new saga instance
Expand Down
3 changes: 2 additions & 1 deletion gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/wework/grabbit/gbus/metrics"
"runtime/debug"
"sync"
"time"
Expand Down Expand Up @@ -670,14 +671,14 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
}

func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Message, handler MessageHandler) error {

b.HandlersLock.Lock()
defer b.HandlersLock.Unlock()

if msg != nil {
b.Serializer.Register(msg)
}

metrics.AddHandlerMetrics(handler.Name())
registration := NewRegistration(exchange, routingKey, msg, handler)
b.Registrations = append(b.Registrations, registration)
for _, worker := range b.workers {
Expand Down
17 changes: 17 additions & 0 deletions gbus/message_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package gbus

import (
"reflect"
"runtime"
"strings"
)

//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

func (mg MessageHandler) Name() string {
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
splits := strings.Split(funName, ".")
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
return fn
}
128 changes: 128 additions & 0 deletions gbus/metrics/handler_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package metrics

import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_model/go"
"github.com/sirupsen/logrus"
"sync"
)

var (
handlerMetricsByHandlerName = &sync.Map{}
)

const (
failure = "failure"
success = "success"
handlerResult = "result"
handlers = "handlers"
grabbitPrefix = "grabbit"
)

type HandlerMetrics struct {
result *prometheus.CounterVec
latency prometheus.Summary
}

func AddHandlerMetrics(handlerName string) {
handlerMetrics := newHandlerMetrics(handlerName)
_, exists := handlerMetricsByHandlerName.LoadOrStore(handlerName, handlerMetrics)

if !exists {
prometheus.MustRegister(handlerMetrics.latency, handlerMetrics.result)
}
}

func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error {
handlerMetrics := GetHandlerMetrics(handlerName)
defer func() {
if p := recover(); p != nil {
if handlerMetrics != nil {
handlerMetrics.result.WithLabelValues(failure).Inc()
}

panic(p)
}
}()

if handlerMetrics == nil {
logger.WithField("handler", handlerName).Warn("Running with metrics - couldn't find metrics for the given handler")
return handleMessage()
}

err := trackTime(handleMessage, handlerMetrics.latency)

if err != nil {
handlerMetrics.result.WithLabelValues(failure).Inc()
} else {
handlerMetrics.result.WithLabelValues(success).Inc()
}

return err
}

func GetHandlerMetrics(handlerName string) *HandlerMetrics {
entry, ok := handlerMetricsByHandlerName.Load(handlerName)
if ok {
return entry.(*HandlerMetrics)
}

return nil
}

func newHandlerMetrics(handlerName string) *HandlerMetrics {
return &HandlerMetrics{
result: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: grabbitPrefix,
Subsystem: handlers,
Name: fmt.Sprintf("%s_result", handlerName),
Help: fmt.Sprintf("The %s's result", handlerName),
},
[]string{handlerResult}),
latency: prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: grabbitPrefix,
Subsystem: handlers,
Name: fmt.Sprintf("%s_latency", handlerName),
Help: fmt.Sprintf("The %s's latency", handlerName),
}),
}
}

func trackTime(functionToTrack func() error, observer prometheus.Observer) error {
timer := prometheus.NewTimer(observer)
defer timer.ObserveDuration()

return functionToTrack()
}

func (hm *HandlerMetrics) GetSuccessCount() (float64, error) {
return hm.getLabeledCounterValue(success)
}

func (hm *HandlerMetrics) GetFailureCount() (float64, error) {
return hm.getLabeledCounterValue(failure)
}

func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) {
m := &io_prometheus_client.Metric{}
err := hm.latency.Write(m)
if err != nil {
return nil, err
}

return m.GetSummary().SampleCount, nil
}

func (hm *HandlerMetrics) getLabeledCounterValue(label string) (float64, error) {
m := &io_prometheus_client.Metric{}
err := hm.result.WithLabelValues(label).Write(m)

if err != nil {
return 0, err
}

return m.GetCounter().GetValue(), nil
}
35 changes: 35 additions & 0 deletions gbus/metrics/message_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_model/go"
)

var (
rejectedMessages = newRejectedMessagesCounter()
)

func ReportRejectedMessage() {
rejectedMessages.Inc()
}

func GetRejectedMessagesValue() (float64, error) {
m := &io_prometheus_client.Metric{}
err := rejectedMessages.Write(m)

if err != nil {
return 0, err
}

return m.GetCounter().GetValue(), nil
}

func newRejectedMessagesCounter() prometheus.Counter {
return promauto.NewCounter(prometheus.CounterOpts{
Namespace: grabbitPrefix,
Subsystem: "messages",
Name: "rejected_messages",
Help: "counting the rejected messages",
})
}
17 changes: 4 additions & 13 deletions gbus/saga/def.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package saga

import (
"github.com/wework/grabbit/gbus/metrics"
"reflect"
"runtime"
"strings"
"sync"

"github.com/wework/grabbit/gbus"
Expand Down Expand Up @@ -49,22 +48,14 @@ func (sd *Def) getHandledMessages() []string {
}

func (sd *Def) addMsgToHandlerMapping(exchange, routingKey string, message gbus.Message, handler gbus.MessageHandler) {

fn := getFunNameFromHandler(handler)

handlerName := handler.Name()
metrics.AddHandlerMetrics(handlerName)
msgToFunc := &MsgToFuncPair{
Filter: gbus.NewMessageFilter(exchange, routingKey, message),
SagaFuncName: fn}
SagaFuncName: handlerName}
sd.msgToFunc = append(sd.msgToFunc, msgToFunc)
}

func getFunNameFromHandler(handler gbus.MessageHandler) string {
funName := runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()
splits := strings.Split(funName, ".")
fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1)
return fn
}

func (sd *Def) newInstance() *Instance {
instance := NewInstance(sd.sagaType, sd.msgToFunc)
return sd.configureSaga(instance)
Expand Down
18 changes: 14 additions & 4 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus/metrics"
"reflect"
"time"

Expand Down Expand Up @@ -45,12 +46,21 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati
invocation.Log().WithFields(logrus.Fields{
"method_name": methodName, "saga_id": si.ID,
}).Info("invoking method on saga")
returns := method.Call(params)

val := returns[0]
if !val.IsNil() {
return val.Interface().(error)
err := metrics.RunHandlerWithMetric(func() error {
returns := method.Call(params)

val := returns[0]
if !val.IsNil() {
return val.Interface().(error)
}
return nil
}, methodName, invocation.Log())

if err != nil {
return err
}

invocation.Log().WithFields(logrus.Fields{
"method_name": methodName, "saga_id": si.ID,
}).Info("saga instance invoked")
Expand Down
4 changes: 2 additions & 2 deletions gbus/saga/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) {
exchange, routingKey := "", "kong"
invocationStub := &sagaInvocation{}

failName := getFunNameFromHandler(s.Fail)
failName := gbus.MessageHandler(s.Fail).Name()
failFilter := gbus.NewMessageFilter(exchange, routingKey, m1)

passName := getFunNameFromHandler(s.Pass)
passName := gbus.MessageHandler(s.Pass).Name()
passFilter := gbus.NewMessageFilter(exchange, routingKey, m2)

//map the filter to correct saga function name
Expand Down
12 changes: 7 additions & 5 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/wework/grabbit/gbus/metrics"
"math/rand"
"reflect"
"runtime"
"runtime/debug"
"sync"
"time"
Expand Down Expand Up @@ -322,6 +321,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
_ = worker.ack(delivery)
} else {
_ = worker.reject(false, delivery)
metrics.ReportRejectedMessage()
}
}

Expand Down Expand Up @@ -363,7 +363,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
var hspan opentracing.Span
var hsctx context.Context
for _, handler := range handlers {
hspan, hsctx = opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name())
hspan, hsctx = opentracing.StartSpanFromContext(sctx, handler.Name())

ctx := &defaultInvocationContext{
invocingSvc: delivery.ReplyTo,
Expand All @@ -378,8 +378,10 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
MaxRetryCount: MaxRetryCount,
},
}
ctx.SetLogger(worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()))
handlerErr = handler(ctx, message)
ctx.SetLogger(worker.log().WithField("handler", handler.Name()))
handlerErr = metrics.RunHandlerWithMetric(func() error {
return handler(ctx, message)
}, handler.Name(), worker.log())
if handlerErr != nil {
hspan.LogFields(slog.Error(handlerErr))
break
Expand Down
27 changes: 21 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,42 @@ module github.com/wework/grabbit
require (
github.com/DataDog/zstd v1.4.0 // indirect
github.com/Rican7/retry v0.1.0
github.com/Shopify/sarama v1.22.1 // indirect
github.com/Shopify/sarama v1.23.0 // indirect
github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect
github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/go-kit/kit v0.9.0 // indirect
github.com/go-sql-driver/mysql v1.4.1
github.com/golang/protobuf v1.3.1
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/protobuf v1.3.2
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/kisielk/errcheck v1.2.0 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kr/pty v1.1.8 // indirect
github.com/linkedin/goavro v2.1.0+incompatible
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620
github.com/opentracing/opentracing-go v1.1.0
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/common v0.6.0 // indirect
github.com/prometheus/procfs v0.0.3 // indirect
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 // indirect
github.com/rs/xid v1.2.1
github.com/sirupsen/logrus v1.4.2
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
golang.org/x/net v0.0.0-20190603091049-60506f45cf65 // indirect
golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/appengine v1.6.0 // indirect
github.com/stretchr/objx v0.2.0 // indirect
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect
golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d // indirect
google.golang.org/appengine v1.6.1 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)
Loading