From 31d9fc551794c6078f80c347e4f2c26ca7118888 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Wed, 14 Aug 2019 10:42:29 +0300 Subject: [PATCH] v1.x rollup to master (#121) * add handler metrics to bus and saga (#101) * add handler metrics to bus and saga + tests * fix build * add 0 to the default buckets to catch fast message handling * PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers * PR correction - getting logger as a param * PR correction - new line in eof * PR corrections message handler + sync.map + latency as summary * add rejected messages metric * dead letter handler should reject messages on failures and rollbacks and ack on commit success (#105) * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * return an error from the saga store when deleting a saga if saga can not (#110) be found In order to deal with concurrent deletes of the sage saga instance we would wan't to indicate that deleting the saga failed if the saga is not stored so callers can take proper action * Persisted timeouts (#107) * decouple transaction manager from glue * moved timeout manager to gbus/tx package * initial commit in order to support persisted timeouts * first working version of a mysql persisted timeout manager * fixing ci lint errors * refactored ensure schema of timeout manager * cleanup timeout manager when bs shuts down * fixing formatting issues * changed logging level from Info to Debug when inserting a new timeout * resusing timeouts tablename (PR review) * renamed AcceptTimeoutFunction to SetTimeoutFunction on the TimeoutManager interface (PR review) * refactored glue to implement the Logged inetrface and use the GLogged helper struct * locking timeout record before executing timeout In order to prevent having a timeout beeing executed twice due to two concurrent grabbit instances running the same service a lock (FOR UPDATE) has been placed on the timeout record in the scope of the executing transaction * Commiting the select transaction when querying for pending timeouts * feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances * Enable returning a message back from the dead to the queue (#112) * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * return to q * return to q * return to q * return to q * return dead to q * allow no retries * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * added metric report on saga timeout (#114) 1) added reporting saga timeouts to the glue component 2) fixed mysql timeoutmanager error when trying to clear a timeout * Added documentation for grabbit metrics (#117) * added initial documentation for grabbit metrics * including metrics section in readme.md * fixing goreportcard issues (#118) * removed logging a warning when worker message channel returns an error (#116) * corrected saga metrics name and added to metrics documentation (#119) * corrected saga metrics name and added documentatio * corrected saga metric name * corrected typos * removed non transactional bus mode (#120) --- .gitignore | 2 +- README.md | 1 + docs/METRICS.md | 10 ++ gbus/abstractions.go | 37 ++++- gbus/builder/builder.go | 61 ++++----- gbus/bus.go | 200 ++++++++++++++------------- gbus/logged.go | 6 +- gbus/message_handler.go | 17 +++ gbus/messages.go | 1 + gbus/metrics/handler_metrics.go | 128 ++++++++++++++++++ gbus/metrics/message_metrics.go | 35 +++++ gbus/metrics/saga_metrics.go | 29 ++++ gbus/outbox.go | 4 +- gbus/registration.go | 6 + gbus/saga/def.go | 19 +-- gbus/saga/glue.go | 78 +++++++---- gbus/saga/instance.go | 22 ++- gbus/saga/instance_test.go | 4 +- gbus/saga/invocation.go | 4 - gbus/saga/stores/memstore.go | 100 -------------- gbus/saga/timeout.go | 51 ------- gbus/tx/mysql/timeout.go | 230 ++++++++++++++++++++++++++++++++ gbus/tx/mysql/txoutbox.go | 6 +- gbus/tx/sagastore.go | 17 ++- gbus/worker.go | 55 ++++---- go.mod | 27 +++- go.sum | 93 +++++++++++++ tests/bus_test.go | 122 ++++++++++++++--- tests/consts.go | 14 +- tests/metrics_test.go | 137 +++++++++++++++++++ tests/saga_test.go | 5 + 31 files changed, 1120 insertions(+), 401 deletions(-) create mode 100644 docs/METRICS.md create mode 100644 gbus/message_handler.go create mode 100644 gbus/metrics/handler_metrics.go create mode 100644 gbus/metrics/message_metrics.go create mode 100644 gbus/metrics/saga_metrics.go delete mode 100644 gbus/saga/stores/memstore.go delete mode 100644 gbus/saga/timeout.go create mode 100644 gbus/tx/mysql/timeout.go create mode 100644 tests/metrics_test.go diff --git a/.gitignore b/.gitignore index c4fe1d4..63ca3bf 100644 --- a/.gitignore +++ b/.gitignore @@ -9,7 +9,7 @@ debug # Test binary, built with `go test -c` *.test - +.vscode # Output of the go coverage tool, specifically when used with LiteIDE *.out .DS_Store diff --git a/README.md b/README.md index bcfcdab..665dc4c 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ A lightweight transactional message bus on top of RabbitMQ supporting: 5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern 6) Deadlettering 7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md) +8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus Planned: diff --git a/docs/METRICS.md b/docs/METRICS.md new file mode 100644 index 0000000..d2dec8d --- /dev/null +++ b/docs/METRICS.md @@ -0,0 +1,10 @@ +# Metrics + +grabbit exposes and reports the following metrics to Prometheus + +| Namespace | Subsystem | Name | Description | +| ------------- | ------------- | ----------------------------------| --------------------------------------------------------------------------- | +| grabbit | handler | [name of message handler]_result | records and counts each succesfull or failed execution of a message handler | +| grabbit | handler | [name of message handler]_latency | records the execution time of each handler | +| grabbit | messages | rejected_messages | increments each time a message gets rejected | +| grabbit | saga | timedout_sagas | counting the number of timedout saga instances | diff --git a/gbus/abstractions.go b/gbus/abstractions.go index eb47e30..b976768 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -3,16 +3,20 @@ package gbus import ( "context" "database/sql" - "github.com/sirupsen/logrus" "time" + "github.com/sirupsen/logrus" + "github.com/streadway/amqp" ) +//Semantics reopresents the semantics of a grabbit message type Semantics string const ( + //CMD represenst a messge with command semantics in grabbit CMD Semantics = "cmd" + //EVT represenst a messge with event semantics in grabbit EVT Semantics = "evt" ) @@ -25,7 +29,7 @@ type BusConfiguration struct { //Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus type Bus interface { HandlerRegister - RegisterDeadletterHandler + Deadlettering BusSwitch Messaging SagaRegister @@ -106,9 +110,6 @@ type HandlerRegister interface { HandleEvent(exchange, topic string, event Message, handler MessageHandler) error } -//MessageHandler signature for all command handlers -type MessageHandler func(invocation Invocation, message *BusMessage) error - //Saga is the base interface for all Sagas. type Saga interface { //StartedBy returns the messages that when received should create a new saga instance @@ -127,8 +128,9 @@ type Saga interface { } //RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue -type RegisterDeadletterHandler interface { +type Deadlettering interface { HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) + ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error } //RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess @@ -145,6 +147,14 @@ type SagaRegister interface { RegisterSaga(saga Saga, conf ...SagaConfFn) error } +//SagaGlue glues together all the parts needed in order to orchistrate saga instances +type SagaGlue interface { + SagaRegister + Logged + Start() error + Stop() error +} + //Builder is the main interface that should be used to create an instance of a Bus type Builder interface { PurgeOnStartUp() Builder @@ -217,6 +227,21 @@ type TxOutbox interface { Stop() error } +//TimeoutManager abstracts the implementation of determining when a saga should be timed out +type TimeoutManager interface { + //RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance + RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error + //ClearTimeout clears a timeout for a specific saga + ClearTimeout(tx *sql.Tx, sagaID string) error + //SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires + SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error) + //Start starts the timeout manager + Start() error + //Stop shuts the timeout manager down + Stop() error +} + +//Logged represents a grabbit component that can be logged type Logged interface { SetLogger(entry logrus.FieldLogger) Log() logrus.FieldLogger diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 016b100..9b4c0d1 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -2,13 +2,13 @@ package builder import ( "fmt" - "github.com/sirupsen/logrus" "sync" "time" + "github.com/sirupsen/logrus" + "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/saga" - "github.com/wework/grabbit/gbus/saga/stores" "github.com/wework/grabbit/gbus/serialization" "github.com/wework/grabbit/gbus/tx/mysql" ) @@ -36,9 +36,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb := &gbus.DefaultBus{ AmqpConnStr: builder.connStr, PrefetchCount: builder.PrefetchCount, - Outgoing: &gbus.AMQPOutbox{ - SvcName: svcName, - }, + SvcName: svcName, PurgeOnStartup: builder.purgeOnStartup, DelayedSubscriptions: [][]string{}, @@ -46,7 +44,6 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { RPCLock: &sync.Mutex{}, SenderLock: &sync.Mutex{}, ConsumerLock: &sync.Mutex{}, - IsTxnl: builder.txnl, Registrations: make([]*gbus.Registration, 0), RPCHandlers: make(map[string]gbus.MessageHandler), Serializer: builder.serializer, @@ -68,46 +65,46 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb.WorkerNum = builder.workerNum } var ( - sagaStore saga.Store + sagaStore saga.Store + timeoutManager gbus.TimeoutManager ) - if builder.txnl { - gb.IsTxnl = true - switch builder.txnlProvider { - case "mysql": - mysqltx, err := mysql.NewTxProvider(builder.txConnStr) + switch builder.txnlProvider { + + case "mysql": + mysqltx, err := mysql.NewTxProvider(builder.txConnStr) + if err != nil { + panic(err) + } + gb.TxProvider = mysqltx + //TODO move purge logic into the NewSagaStore factory method + sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx) + if builder.purgeOnStartup { + err := sagaStore.Purge() if err != nil { panic(err) } - gb.TxProvider = mysqltx - sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx) - if builder.purgeOnStartup { - err := sagaStore.Purge() - if err != nil { - panic(err) - } - } - gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup) - - default: - err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider) - panic(err) } - } else { - sagaStore = stores.NewInMemoryStore() - } + gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup) + timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup) + default: + err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider) + panic(err) + } if builder.usingPingTimeout { gb.DbPingTimeout = builder.dbPingTimeout } + //TODO move this into the NewSagaStore factory methods if builder.purgeOnStartup { err := sagaStore.Purge() if err != nil { panic(err) } } - gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider) + glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager) + gb.Glue = glue return gb } @@ -179,9 +176,9 @@ func (builder *defaultBuilder) ConfigureHealthCheck(timeoutInSeconds time.Durati } func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) gbus.Builder { - if config.MaxRetryCount > 0 { - gbus.MaxRetryCount = config.MaxRetryCount - } + + gbus.MaxRetryCount = config.MaxRetryCount + if config.BaseRetryDuration > 0 { gbus.BaseRetryDuration = time.Millisecond * time.Duration(config.BaseRetryDuration) } diff --git a/gbus/bus.go b/gbus/bus.go index cf4dc26..a0e7c4c 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/wework/grabbit/gbus/metrics" + "github.com/opentracing-contrib/go-amqp/amqptracer" "github.com/opentracing/opentracing-go" slog "github.com/opentracing/opentracing-go/log" @@ -24,7 +26,6 @@ var _ Bus = &DefaultBus{} type DefaultBus struct { *Safety *Glogged - Outgoing *AMQPOutbox Outbox TxOutbox PrefetchCount uint AmqpConnStr string @@ -39,6 +40,7 @@ type DefaultBus struct { amqpErrors chan *amqp.Error amqpBlocks chan amqp.Blocking Registrations []*Registration + amqpOutbox *AMQPOutbox RPCHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error @@ -50,18 +52,18 @@ type DefaultBus struct { DelayedSubscriptions [][]string PurgeOnStartup bool started bool - Glue SagaRegister + Glue SagaGlue TxProvider TxProvider - IsTxnl bool - WorkerNum uint - Serializer Serializer - DLX string - DefaultPolicies []MessagePolicy - Confirm bool - healthChan chan error - backpressure bool - DbPingTimeout time.Duration - amqpConnected bool + + WorkerNum uint + Serializer Serializer + DLX string + DefaultPolicies []MessagePolicy + Confirm bool + healthChan chan error + backpressure bool + DbPingTimeout time.Duration + amqpConnected bool } var ( @@ -201,37 +203,28 @@ func (b *DefaultBus) Start() error { b.egressConn.NotifyClose(b.amqpErrors) b.egressConn.NotifyBlocked(b.amqpBlocks) b.egressChannel.NotifyClose(b.amqpErrors) - //TODO:Figure out what should be done - //init the outbox that sends the messages to the amqp transport and handles publisher confirms - if e := b.Outgoing.init(b.egressChannel, b.Confirm, true); e != nil { - return e - } /* start the transactional outbox, make sure calling b.TxOutgoing.Start() is done only after b.Outgoing.init is called TODO://the design is crap and needs to be refactored */ - if b.IsTxnl { - - var amqpChan *amqp.Channel - if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil { - b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox") - return e - } - amqpChan.NotifyClose(b.amqpErrors) - amqpOutbox := &AMQPOutbox{ - SvcName: b.SvcName, - } - err := amqpOutbox.init(amqpChan, b.Confirm, false) - if err != nil { - b.Log().WithError(err).Error("failed initializing amqpOutbox") - return err - } - if startErr := b.Outbox.Start(amqpOutbox); startErr != nil { - b.Log().WithError(startErr).Error("failed to start transactional outbox") - return startErr - } - + var amqpChan *amqp.Channel + if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil { + b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox") + return e + } + amqpChan.NotifyClose(b.amqpErrors) + b.amqpOutbox = &AMQPOutbox{ + SvcName: b.SvcName, + } + err := b.amqpOutbox.init(amqpChan, b.Confirm, false) + if err != nil { + b.Log().WithError(err).Error("failed initializing amqpOutbox") + return err + } + if startErr := b.Outbox.Start(b.amqpOutbox); startErr != nil { + b.Log().WithError(startErr).Error("failed to start transactional outbox") + return startErr } //declare queue @@ -242,10 +235,10 @@ func (b *DefaultBus) Start() error { b.serviceQueue = q //bind queue - err := b.bindServiceQueue() - if err != nil { + bindErr := b.bindServiceQueue() + if bindErr != nil { b.Log().WithError(err).Error("could not bind service to queue") - return err + return bindErr } //declare rpc queue @@ -262,6 +255,10 @@ func (b *DefaultBus) Start() error { return createWorkersErr } + + if err := b.Glue.Start(); err != nil { + return err + } b.workers = workers b.started = true //start monitoring on amqp related errors @@ -293,7 +290,6 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { q: b.serviceQueue, rpcq: b.rpcQueue, svcName: b.SvcName, - isTxnl: b.IsTxnl, txProvider: b.TxProvider, rpcLock: b.RPCLock, rpcHandlers: b.RPCHandlers, @@ -333,19 +329,19 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { return err } } - b.Outgoing.shutdown() - b.started = false - if b.IsTxnl { - - err := b.Outbox.Stop() - if err != nil { - b.Log().WithError(err).Error("could not shutdown outbox") - return err - } - b.TxProvider.Dispose() + if err := b.Glue.Stop(); err != nil { + return err + } + b.started = false + err := b.Outbox.Stop() + if err != nil { + b.Log().WithError(err).Error("could not shutdown outbox") + return err } + b.amqpOutbox.Shutdown() + b.TxProvider.Dispose() return nil } @@ -360,11 +356,8 @@ func (b *DefaultBus) NotifyHealth(health chan error) { //GetHealth implements Health.GetHealth func (b *DefaultBus) GetHealth() HealthCard { - var dbConnected bool - if b.IsTxnl { - dbConnected = b.TxProvider.Ping(b.DbPingTimeout) - } + dbConnected := b.TxProvider.Ping(b.DbPingTimeout) return HealthCard{ DbConnected: dbConnected, @@ -377,11 +370,11 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er var shouldCommitTx bool var activeTx *sql.Tx //create a new transaction only if there is no active one already passed in - if b.IsTxnl && ambientTx == nil { + if ambientTx == nil { /* if the passed in ambient transaction is not nil it means that some caller has created the transaction - and knows when should this transaction bee committed or rolledback. + and knows when should this transaction be committed or rolledback. In these cases we only invoke the passed in action with the passed in transaction and do not commit/rollback the transaction.action If no ambient transaction is passed in then we create a new transaction and commit or rollback after @@ -404,11 +397,7 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er } actionErr := b.SafeWithRetries(retryAction, MaxRetryCount) - /* - if the bus is transactional and there is no ambient transaction then create a new one else use the ambient tranaction. - if the bus is not transactional a nil transaction reference will be passed - */ - if b.IsTxnl && shouldCommitTx { + if shouldCommitTx { if actionErr != nil { err := activeTx.Rollback() if err != nil { @@ -454,7 +443,12 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu rpcID: rpcID} b.Serializer.Register(reply.Payload) - err := b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc) + + sendRPC := func(tx *sql.Tx) error { + return b.sendImpl(ctx, tx, service, b.rpcQueue.Name, "", "", request, rpc) + } + + err := b.withTx(sendRPC, nil) if err != nil { b.Log().WithError(err).Error("could not send message") return nil, err @@ -498,6 +492,25 @@ func (b *DefaultBus) sendWithTx(ctx context.Context, ambientTx *sql.Tx, toServic return b.withTx(send, ambientTx) } +func (b *DefaultBus) returnDeadToQueue(ctx context.Context, ambientTx *sql.Tx, publishing *amqp.Publishing) error { + if !b.started { + return errors.New("bus not strated or already shutdown, make sure you call bus.Start() before sending messages") + } + //publishing.Headers. + exchange := fmt.Sprintf("%v", publishing.Headers["x-first-death-exchange"]) + routingKey := fmt.Sprintf("%v", publishing.Headers["x-first-death-queue"]) + + delete(publishing.Headers, "x-death") + delete(publishing.Headers, "x-first-death-queue") + delete(publishing.Headers, "x-first-death-reason") + delete(publishing.Headers, "x-first-death-exchange") + + send := func(tx *sql.Tx) error { + return b.publish(tx, exchange, routingKey, publishing) + } + return b.withTx(send, ambientTx) +} + //Publish implements GBus.Publish(topic, message) func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error { return b.publishWithTx(ctx, nil, exchange, topic, message, policies...) @@ -547,6 +560,11 @@ func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Deli b.deadletterHandler = handler } +//ReturnDeadToQueue returns a message to its original destination +func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error { + return b.returnDeadToQueue(ctx, nil, publishing) +} + //RegisterSaga impements GBus.RegisterSaga func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error { if b.Glue == nil { @@ -588,6 +606,28 @@ func (b *DefaultBus) monitorAMQPErrors() { } } +func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.Publishing) error { + publish := func() error { + + b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox") + saveErr := b.Outbox.Save(tx, exchange, routingKey, *msg) + if saveErr != nil { + b.Log().WithError(saveErr).Error("failed to save to transactional outbox") + } + return saveErr + } + //currently only one thread can publish at a time + //TODO:add a publishing workers + + err := b.SafeWithRetries(publish, MaxRetryCount) + + if err != nil { + b.Log().Printf("failed publishing message.\n error:%v", err) + return err + } + return err +} + func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) { b.SenderLock.Lock() defer b.SenderLock.Unlock() @@ -639,38 +679,10 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply key = topic } - publish := func() error { - //send to the transactional outbox if the bus is transactional - //otherwise send directly to amqp - if b.IsTxnl && tx != nil { - b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox") - saveErr := b.Outbox.Save(tx, exchange, key, msg) - if saveErr != nil { - b.Log().WithError(saveErr).Error("failed to save to transactional outbox") - } - return saveErr - } - //do not attempt to contact the borker if backpressure is being applied - if b.backpressure { - return errors.New("can't send message due to backpressure from amqp broker") - } - _, outgoingErr := b.Outgoing.Post(exchange, key, msg) - return outgoingErr - } - //currently only one thread can publish at a time - //TODO:add a publishing workers - - err = b.SafeWithRetries(publish, MaxRetryCount) - - if err != nil { - b.Log().Printf("failed publishing message.\n error:%v", err) - return err - } - return err + return b.publish(tx, exchange, key, &msg) } func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Message, handler MessageHandler) error { - b.HandlersLock.Lock() defer b.HandlersLock.Unlock() @@ -678,6 +690,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag b.Serializer.Register(msg) } + metrics.AddHandlerMetrics(handler.Name()) registration := NewRegistration(exchange, routingKey, msg, handler) b.Registrations = append(b.Registrations, registration) for _, worker := range b.workers { @@ -698,6 +711,7 @@ func (p rpcPolicy) Apply(publishing *amqp.Publishing) { publishing.Headers[RpcHeaderName] = p.rpcID } +//Log returns the default logrus.FieldLogger for the bus via the Glogged helper func (b *DefaultBus) Log() logrus.FieldLogger { if b.Glogged == nil { b.Glogged = &Glogged{ diff --git a/gbus/logged.go b/gbus/logged.go index 1590de8..36963aa 100644 --- a/gbus/logged.go +++ b/gbus/logged.go @@ -1,6 +1,3 @@ -// Copyright © 2019 Vladislav Shub -// All rights reserved to the We Company. - package gbus import ( @@ -9,10 +6,12 @@ import ( var _ Logged = &Glogged{} +//Glogged provides an easy way for structs with in the grabbit package to participate in the general logging schema of the bus type Glogged struct { log logrus.FieldLogger } +//SetLogger sets the default logrus.FieldLogger that should be used when logging a new message func (gl *Glogged) SetLogger(entry logrus.FieldLogger) { if gl == nil { gl = &Glogged{} @@ -20,6 +19,7 @@ func (gl *Glogged) SetLogger(entry logrus.FieldLogger) { gl.log = entry } +//Log returns the set default log or a new instance of a logrus.FieldLogger func (gl *Glogged) Log() logrus.FieldLogger { if gl == nil { gl = &Glogged{} diff --git a/gbus/message_handler.go b/gbus/message_handler.go new file mode 100644 index 0000000..776927b --- /dev/null +++ b/gbus/message_handler.go @@ -0,0 +1,17 @@ +package gbus + +import ( + "reflect" + "runtime" + "strings" +) + +//MessageHandler signature for all command handlers +type MessageHandler func(invocation Invocation, message *BusMessage) error + +func (mg MessageHandler) Name() string { + funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name() + splits := strings.Split(funName, ".") + fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1) + return fn +} diff --git a/gbus/messages.go b/gbus/messages.go index 6e9631a..123256f 100644 --- a/gbus/messages.go +++ b/gbus/messages.go @@ -61,6 +61,7 @@ func (bm *BusMessage) SetPayload(payload Message) { bm.Payload = payload } +//GetTraceLog returns an array of log entires containing all of the message properties func (bm *BusMessage) GetTraceLog() (fields []log.Field) { return []log.Field{ log.String("message", bm.PayloadFQN), diff --git a/gbus/metrics/handler_metrics.go b/gbus/metrics/handler_metrics.go new file mode 100644 index 0000000..4311796 --- /dev/null +++ b/gbus/metrics/handler_metrics.go @@ -0,0 +1,128 @@ +package metrics + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_model/go" + "github.com/sirupsen/logrus" + "sync" +) + +var ( + handlerMetricsByHandlerName = &sync.Map{} +) + +const ( + failure = "failure" + success = "success" + handlerResult = "result" + handlers = "handlers" + grabbitPrefix = "grabbit" +) + +type HandlerMetrics struct { + result *prometheus.CounterVec + latency prometheus.Summary +} + +func AddHandlerMetrics(handlerName string) { + handlerMetrics := newHandlerMetrics(handlerName) + _, exists := handlerMetricsByHandlerName.LoadOrStore(handlerName, handlerMetrics) + + if !exists { + prometheus.MustRegister(handlerMetrics.latency, handlerMetrics.result) + } +} + +func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error { + handlerMetrics := GetHandlerMetrics(handlerName) + defer func() { + if p := recover(); p != nil { + if handlerMetrics != nil { + handlerMetrics.result.WithLabelValues(failure).Inc() + } + + panic(p) + } + }() + + if handlerMetrics == nil { + logger.WithField("handler", handlerName).Warn("Running with metrics - couldn't find metrics for the given handler") + return handleMessage() + } + + err := trackTime(handleMessage, handlerMetrics.latency) + + if err != nil { + handlerMetrics.result.WithLabelValues(failure).Inc() + } else { + handlerMetrics.result.WithLabelValues(success).Inc() + } + + return err +} + +func GetHandlerMetrics(handlerName string) *HandlerMetrics { + entry, ok := handlerMetricsByHandlerName.Load(handlerName) + if ok { + return entry.(*HandlerMetrics) + } + + return nil +} + +func newHandlerMetrics(handlerName string) *HandlerMetrics { + return &HandlerMetrics{ + result: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: grabbitPrefix, + Subsystem: handlers, + Name: fmt.Sprintf("%s_result", handlerName), + Help: fmt.Sprintf("The %s's result", handlerName), + }, + []string{handlerResult}), + latency: prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: grabbitPrefix, + Subsystem: handlers, + Name: fmt.Sprintf("%s_latency", handlerName), + Help: fmt.Sprintf("The %s's latency", handlerName), + }), + } +} + +func trackTime(functionToTrack func() error, observer prometheus.Observer) error { + timer := prometheus.NewTimer(observer) + defer timer.ObserveDuration() + + return functionToTrack() +} + +func (hm *HandlerMetrics) GetSuccessCount() (float64, error) { + return hm.getLabeledCounterValue(success) +} + +func (hm *HandlerMetrics) GetFailureCount() (float64, error) { + return hm.getLabeledCounterValue(failure) +} + +func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) { + m := &io_prometheus_client.Metric{} + err := hm.latency.Write(m) + if err != nil { + return nil, err + } + + return m.GetSummary().SampleCount, nil +} + +func (hm *HandlerMetrics) getLabeledCounterValue(label string) (float64, error) { + m := &io_prometheus_client.Metric{} + err := hm.result.WithLabelValues(label).Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} diff --git a/gbus/metrics/message_metrics.go b/gbus/metrics/message_metrics.go new file mode 100644 index 0000000..8bf0dfb --- /dev/null +++ b/gbus/metrics/message_metrics.go @@ -0,0 +1,35 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_model/go" +) + +var ( + rejectedMessages = newRejectedMessagesCounter() +) + +func ReportRejectedMessage() { + rejectedMessages.Inc() +} + +func GetRejectedMessagesValue() (float64, error) { + m := &io_prometheus_client.Metric{} + err := rejectedMessages.Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} + +func newRejectedMessagesCounter() prometheus.Counter { + return promauto.NewCounter(prometheus.CounterOpts{ + Namespace: grabbitPrefix, + Subsystem: "messages", + Name: "rejected_messages", + Help: "counting the rejected messages", + }) +} \ No newline at end of file diff --git a/gbus/metrics/saga_metrics.go b/gbus/metrics/saga_metrics.go new file mode 100644 index 0000000..188f4a0 --- /dev/null +++ b/gbus/metrics/saga_metrics.go @@ -0,0 +1,29 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + io_prometheus_client "github.com/prometheus/client_model/go" +) + +var SagaTimeoutCounter = newSagaTimeoutCounter() + +func GetSagaTimeoutCounterValue() (float64, error) { + m := &io_prometheus_client.Metric{} + err := SagaTimeoutCounter.Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} + +func newSagaTimeoutCounter() prometheus.Counter { + return promauto.NewCounter(prometheus.CounterOpts{ + Namespace: grabbitPrefix, + Subsystem: "saga", + Name: "timedout_sagas", + Help: "counting the number of timedout saga instances", + }) +} diff --git a/gbus/outbox.go b/gbus/outbox.go index 323a37a..3358250 100644 --- a/gbus/outbox.go +++ b/gbus/outbox.go @@ -50,8 +50,8 @@ func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) erro return nil } -func (out *AMQPOutbox) shutdown() { - out.stop <- true +func (out *AMQPOutbox) Shutdown() { + close(out.stop) } diff --git a/gbus/registration.go b/gbus/registration.go index 9dbdc10..3122e92 100644 --- a/gbus/registration.go +++ b/gbus/registration.go @@ -4,12 +4,14 @@ import ( "strings" ) +//MessageFilter matches rabbitmq topic patterns type MessageFilter struct { Exchange string RoutingKey string MsgName string } +//Matches the passed in exchange, routingKey, msgName with the defined filter func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool { targetExchange := strings.ToLower(exchange) @@ -59,15 +61,18 @@ func matchWords(input, pattern []string) bool { return false } +//Registration represents a message handler's registration for a given exchange, topic and msg combination type Registration struct { info *MessageFilter Handler MessageHandler } +//Matches the registration with the given xchange, routingKey, msgName func (sub *Registration) Matches(exchange, routingKey, msgName string) bool { return sub.info.Matches(exchange, routingKey, msgName) } +//NewRegistration creates a new registration func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration { reg := Registration{ info: NewMessageFilter(exchange, routingKey, message), @@ -75,6 +80,7 @@ func NewRegistration(exchange, routingKey string, message Message, handler Messa return ® } +//NewMessageFilter creates a new MessageFilter func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter { filter := &MessageFilter{ Exchange: strings.ToLower(exchange), diff --git a/gbus/saga/def.go b/gbus/saga/def.go index e390817..14a8792 100644 --- a/gbus/saga/def.go +++ b/gbus/saga/def.go @@ -2,15 +2,16 @@ package saga import ( "reflect" - "runtime" - "strings" "sync" + "github.com/wework/grabbit/gbus/metrics" + "github.com/wework/grabbit/gbus" ) var _ gbus.HandlerRegister = &Def{} +//MsgToFuncPair helper struct type MsgToFuncPair struct { Filter *gbus.MessageFilter SagaFuncName string @@ -49,22 +50,14 @@ func (sd *Def) getHandledMessages() []string { } func (sd *Def) addMsgToHandlerMapping(exchange, routingKey string, message gbus.Message, handler gbus.MessageHandler) { - - fn := getFunNameFromHandler(handler) - + handlerName := handler.Name() + metrics.AddHandlerMetrics(handlerName) msgToFunc := &MsgToFuncPair{ Filter: gbus.NewMessageFilter(exchange, routingKey, message), - SagaFuncName: fn} + SagaFuncName: handlerName} sd.msgToFunc = append(sd.msgToFunc, msgToFunc) } -func getFunNameFromHandler(handler gbus.MessageHandler) string { - funName := runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name() - splits := strings.Split(funName, ".") - fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1) - return fn -} - func (sd *Def) newInstance() *Instance { instance := NewInstance(sd.sagaType, sd.msgToFunc) return sd.configureSaga(instance) diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 8367b29..d6e71e2 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -2,6 +2,7 @@ package saga import ( "database/sql" + "errors" "fmt" "reflect" "strings" @@ -9,6 +10,7 @@ import ( "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/metrics" ) func fqnsFromMessages(objs []gbus.Message) []string { @@ -20,10 +22,14 @@ func fqnsFromMessages(objs []gbus.Message) []string { return fqns } -var _ gbus.SagaRegister = &Glue{} +//ErrInstanceNotFound is returned by the saga store if a saga lookup by saga id returns no valid instances +var ErrInstanceNotFound = errors.New("saga not be found") -//Glue ties the incoming messages from the Bus with the needed Saga instances +var _ gbus.SagaGlue = &Glue{} + +//Glue t/* */ies the incoming messages from the Bus with the needed Saga instances type Glue struct { + *gbus.Glogged svcName string bus gbus.Bus sagaDefs []*Def @@ -31,7 +37,7 @@ type Glue struct { alreadyRegistred map[string]bool msgToDefMap map[string][]*Def sagaStore Store - timeoutManger TimeoutManager + timeoutManager gbus.TimeoutManager } func (imsm *Glue) isSagaAlreadyRegistered(sagaType reflect.Type) bool { @@ -71,7 +77,7 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error { imsm.addMsgNameToDef(msgName, def) } - imsm.log(). + imsm.Log(). WithFields(logrus.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}). Info("registered saga with messages") @@ -112,25 +118,27 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) startNew := def.shouldStartNewSaga(message) if startNew { newInstance := def.newInstance() - imsm.log(). + imsm.Log(). WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). Info("created new saga") if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil { - imsm.log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") + imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") return invkErr } if !newInstance.isComplete() { - imsm.log().WithField("saga_id", newInstance.ID).Info("saving new saga") + imsm.Log().WithField("saga_id", newInstance.ID).Info("saving new saga") if e := imsm.sagaStore.SaveNewSaga(invocation.Tx(), def.sagaType, newInstance); e != nil { - imsm.log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed") + imsm.Log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed") return e } if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { - imsm.log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") - imsm.timeoutManger.RequestTimeout(imsm.svcName, newInstance.ID, duration) + imsm.Log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") + if tme := imsm.timeoutManager.RegisterTimeout(invocation.Tx(), newInstance.ID, duration); tme != nil { + return tme + } } } return nil @@ -138,7 +146,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) instance, getErr := imsm.sagaStore.GetSagaByID(invocation.Tx(), message.SagaCorrelationID) if getErr != nil { - imsm.log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id") + imsm.Log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id") return getErr } if instance == nil { @@ -147,7 +155,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) } def.configureSaga(instance) if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { - imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") + imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } @@ -158,18 +166,18 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return e } else { - imsm.log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") + imsm.Log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") instances, e := imsm.sagaStore.GetSagasByType(invocation.Tx(), def.sagaType) if e != nil { return e } - imsm.log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") + imsm.Log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") for _, instance := range instances { def.configureSaga(instance) if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { - imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") + imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } e = imsm.completeOrUpdateSaga(invocation.Tx(), instance) @@ -192,7 +200,7 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat ctx: invocation.Ctx(), invokingService: imsm.svcName, } - sginv.SetLogger(imsm.log().WithFields(logrus.Fields{ + sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{ "saga_id": instance.ID, "saga_type": instance.String(), "message_name": message.PayloadFQN, @@ -205,9 +213,14 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error { if instance.isComplete() { - imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted") + imsm.Log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted") + + deleteErr := imsm.sagaStore.DeleteSaga(tx, instance) + if deleteErr != nil { + return deleteErr + } - return imsm.sagaStore.DeleteSaga(tx, instance) + return imsm.timeoutManager.ClearTimeout(tx, instance.ID) } return imsm.sagaStore.UpdateSaga(tx, instance) @@ -231,26 +244,41 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler) } -func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { +//TimeoutSaga fetches a saga instance and calls its timeout interface +func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error { saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID) + + //we are assuming that if the TimeoutSaga has been called but no instance returned from the store the saga + //has been completed already and + if err == ErrInstanceNotFound { + return nil + } if err != nil { return err } timeoutErr := saga.timeout(tx, imsm.bus) if timeoutErr != nil { - imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") + imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") return timeoutErr } + + metrics.SagaTimeoutCounter.Inc() return imsm.completeOrUpdateSaga(tx, saga) } -func (imsm *Glue) log() logrus.FieldLogger { - return imsm.bus.Log().WithField("_service", imsm.svcName) +//Start starts the glue instance up +func (imsm *Glue) Start() error { + return imsm.timeoutManager.Start() +} + +//Stop starts the glue instance up +func (imsm *Glue) Stop() error { + return imsm.timeoutManager.Stop() } //NewGlue creates a new Sagamanager -func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) *Glue { +func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, getLog func() logrus.FieldLogger, timeoutManager gbus.TimeoutManager) *Glue { g := &Glue{ svcName: svcName, bus: bus, @@ -259,7 +287,9 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) alreadyRegistred: make(map[string]bool), msgToDefMap: make(map[string][]*Def), sagaStore: sagaStore, + timeoutManager: timeoutManager, } - g.timeoutManger = TimeoutManager{bus: bus, txp: txp, glue: g} + + timeoutManager.SetTimeoutFunction(g.TimeoutSaga) return g } diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 9513c40..a178384 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -3,10 +3,12 @@ package saga import ( "database/sql" "fmt" - "github.com/sirupsen/logrus" "reflect" "time" + "github.com/sirupsen/logrus" + "github.com/wework/grabbit/gbus/metrics" + "github.com/rs/xid" "github.com/wework/grabbit/gbus" ) @@ -45,12 +47,21 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati invocation.Log().WithFields(logrus.Fields{ "method_name": methodName, "saga_id": si.ID, }).Info("invoking method on saga") - returns := method.Call(params) - val := returns[0] - if !val.IsNil() { - return val.Interface().(error) + err := metrics.RunHandlerWithMetric(func() error { + returns := method.Call(params) + + val := returns[0] + if !val.IsNil() { + return val.Interface().(error) + } + return nil + }, methodName, invocation.Log()) + + if err != nil { + return err } + invocation.Log().WithFields(logrus.Fields{ "method_name": methodName, "saga_id": si.ID, }).Info("saga instance invoked") @@ -94,6 +105,7 @@ func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error { return saga.Timeout(tx, bus) } +//NewInstance creates a new saga instance func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance { var newSagaPtr interface{} diff --git a/gbus/saga/instance_test.go b/gbus/saga/instance_test.go index 4d828dd..f5bc56a 100644 --- a/gbus/saga/instance_test.go +++ b/gbus/saga/instance_test.go @@ -21,10 +21,10 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) { exchange, routingKey := "", "kong" invocationStub := &sagaInvocation{} - failName := getFunNameFromHandler(s.Fail) + failName := gbus.MessageHandler(s.Fail).Name() failFilter := gbus.NewMessageFilter(exchange, routingKey, m1) - passName := getFunNameFromHandler(s.Pass) + passName := gbus.MessageHandler(s.Pass).Name() passFilter := gbus.NewMessageFilter(exchange, routingKey, m2) //map the filter to correct saga function name diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index 18c85e6..b2cfbed 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -83,7 +83,3 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) { func (si *sagaInvocation) DeliveryInfo() gbus.DeliveryInfo { return si.decoratedInvocation.DeliveryInfo() } - -//func (si *sagaInvocation) Log() logrus.FieldLogger { -// return si.decoratedInvocation.Log().WithField("saga_id", si.sagaID) -//} diff --git a/gbus/saga/stores/memstore.go b/gbus/saga/stores/memstore.go deleted file mode 100644 index 0d43734..0000000 --- a/gbus/saga/stores/memstore.go +++ /dev/null @@ -1,100 +0,0 @@ -package stores - -import ( - "database/sql" - "errors" - "reflect" - - "github.com/wework/grabbit/gbus" - "github.com/wework/grabbit/gbus/saga" -) - -//InMemorySagaStore stores the saga instances in-memory, not intended for production use -type InMemorySagaStore struct { - instances map[reflect.Type][]*saga.Instance -} - -//GetSagaByID implements SagaStore.GetSagaByID -func (store *InMemorySagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, error) { - for _, instances := range store.instances { - for _, instance := range instances { - if instance.ID == sagaID { - return instance, nil - } - } - } - return nil, errors.New("no saga found for provided id") -} - -//RegisterSagaType implements SagaStore.RegisterSagaType -func (store *InMemorySagaStore) RegisterSagaType(saga gbus.Saga) { - -} - -//SaveNewSaga implements SagaStore.SaveNewSaga -func (store *InMemorySagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstance *saga.Instance) error { - instances := store.instances[sagaType] - if instances == nil { - instances = make([]*saga.Instance, 0) - - } - instances = append(instances, newInstance) - store.instances[sagaType] = instances - - return nil - -} - -//UpdateSaga implements SagaStore.UpdateSaga -func (store *InMemorySagaStore) UpdateSaga(tx *sql.Tx, instance *saga.Instance) error { - - return nil -} - -//DeleteSaga implements SagaStore.DeleteSaga -func (store *InMemorySagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error { - - for key, value := range store.instances { - var sagaIndexFound bool - var sagaIndexToDelete int - for i := 0; i < len(value); i++ { - candidate := value[i] - if candidate.ID == instance.ID { - sagaIndexToDelete = i - sagaIndexFound = true - break - } - } - if sagaIndexFound { - value[sagaIndexToDelete] = value[len(value)-1] - value = value[:len(value)-1] - store.instances[key] = value - - } - } - return nil -} - -//GetSagasByType implements SagaStore.GetSagasByType -func (store *InMemorySagaStore) GetSagasByType(tx *sql.Tx, t reflect.Type) ([]*saga.Instance, error) { - instances := make([]*saga.Instance, 0) - - for key, val := range store.instances { - if key == t { - instances = append(instances, val...) - } - } - - return instances, nil -} - -//Purge is used for nothing in this case since the store is not persisted use pgstore or mysqlstore -func (store *InMemorySagaStore) Purge() error { - return nil -} - -//NewInMemoryStore is a factory method for the InMemorySagaStore -func NewInMemoryStore() saga.Store { - return &InMemorySagaStore{ - instances: make(map[reflect.Type][]*saga.Instance)} -} diff --git a/gbus/saga/timeout.go b/gbus/saga/timeout.go deleted file mode 100644 index 4012353..0000000 --- a/gbus/saga/timeout.go +++ /dev/null @@ -1,51 +0,0 @@ -package saga - -import ( - "time" - - "github.com/wework/grabbit/gbus" -) - -//TimeoutManager manages timeouts for sagas -//TODO:Make it persistent -type TimeoutManager struct { - bus gbus.Bus - glue *Glue - txp gbus.TxProvider -} - -//RequestTimeout requests a timeout from the timeout manager -func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.Duration) { - - go func(svcName, sagaID string, tm *TimeoutManager) { - c := time.After(duration) - <-c - //TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus - if tm.txp == nil { - tme := tm.glue.timeoutSaga(nil, sagaID) - if tme != nil { - tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") - } - return - } - tx, txe := tm.txp.New() - if txe != nil { - tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction") - } else { - callErr := tm.glue.timeoutSaga(tx, sagaID) - if callErr != nil { - tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") - rlbe := tx.Rollback() - if rlbe != nil { - tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") - } - } else { - cmte := tx.Commit() - if cmte != nil { - tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction") - } - } - } - - }(svcName, sagaID, tm) -} diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go new file mode 100644 index 0000000..ae8ca33 --- /dev/null +++ b/gbus/tx/mysql/timeout.go @@ -0,0 +1,230 @@ +package mysql + +import ( + "database/sql" + "regexp" + "strings" + "time" + + "github.com/sirupsen/logrus" + + "github.com/wework/grabbit/gbus" +) + +var _ gbus.TimeoutManager = &TimeoutManager{} + +//TimeoutManager is a mysql implementation of a persistent timeoutmanager +type TimeoutManager struct { + Bus gbus.Bus + Log func() logrus.FieldLogger + TimeoutSaga func(*sql.Tx, string) error + Txp gbus.TxProvider + SvcName string + timeoutsTableName string + exit chan bool +} + +func (tm *TimeoutManager) ensureSchema() error { + tblName := tm.timeoutsTableName + tx, e := tm.Txp.New() + if e != nil { + tm.Log().WithError(e).Error("failed to create schema for mysql timeout manager") + return e + } + + createTableSQL := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( + rec_id INT PRIMARY KEY AUTO_INCREMENT, + saga_id VARCHAR(255) UNIQUE NOT NULL, + timeout DATETIME NOT NULL, + INDEX (timeout), + INDEX (saga_id) + )` + + if _, e := tx.Exec(createTableSQL); e != nil { + if rbkErr := tx.Rollback(); rbkErr != nil { + tm.Log().Warn("timeout manager failed to rollback transaction") + } + return e + } + return tx.Commit() +} + +func (tm *TimeoutManager) purge() error { + purgeSQL := `DELETE FROM ` + tm.timeoutsTableName + + tx, e := tm.Txp.New() + if e != nil { + tm.Log().WithError(e).Error("failed to purge timeout manager") + return e + } + + if _, execErr := tx.Exec(purgeSQL); execErr != nil { + tm.Log().WithError(execErr).Error("failed to purge timeout manager") + return tx.Rollback() + } + return tx.Commit() +} + +//Start starts the timeout manager +func (tm *TimeoutManager) Start() error { + go tm.trackTimeouts() + return nil +} + +//Stop shuts down the timeout manager +func (tm *TimeoutManager) Stop() error { + tm.exit <- true + return nil +} + +func (tm *TimeoutManager) trackTimeouts() { + tick := time.NewTicker(time.Second * 1).C + for { + select { + case <-tick: + tx, txe := tm.Txp.New() + if txe != nil { + tm.Log().WithError(txe).Warn("timeout manager failed to create a transaction") + continue + } + now := time.Now().UTC() + getTimeoutsSQL := `select saga_id from ` + tm.timeoutsTableName + ` where timeout < ? LIMIT 100` + rows, selectErr := tx.Query(getTimeoutsSQL, now) + if selectErr != nil { + tm.Log().WithError(selectErr).Error("timeout manager failed to query for pending timeouts") + rows.Close() + continue + } + + sagaIDs := make([]string, 0) + for rows.Next() { + + var sagaID string + + if err := rows.Scan(&sagaID); err != nil { + tm.Log().WithError(err).Error("failed to scan timeout record") + } + sagaIDs = append(sagaIDs, sagaID) + } + if cmtErr := tx.Commit(); cmtErr != nil { + continue + } + tm.executeTimeout(sagaIDs) + case <-tm.exit: + return + } + } +} + +func (tm *TimeoutManager) lockTimeoutRecord(tx *sql.Tx, sagaID string) error { + + // FOR UPDATE should create a lock on the row that we are processing and skip lock should ensure that we don't wait for a locked row but skip it. + // This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances + selectTimeout := `SELECT saga_id FROM ` + tm.timeoutsTableName + ` WHERE saga_id = ? FOR UPDATE SKIP LOCKED` + row := tx.QueryRow(selectTimeout, sagaID) + //scan the row so we can determine if the lock has been successfully acquired + //in case the timeout has been already executed by a different instance of grabbit the scan will + //return an error and the timeout will not get processed + var x string + return row.Scan(&x) +} + +func (tm *TimeoutManager) executeTimeout(sagaIDs []string) { + + for _, sagaID := range sagaIDs { + tx, txe := tm.Txp.New() + if txe != nil { + tm.Log().WithError(txe).Warn("timeout manager failed to create a transaction") + return + } + lckErr := tm.lockTimeoutRecord(tx, sagaID) + if lckErr != nil { + tm.Log().WithField("saga_id", sagaID).Info("failed to obtain lock for saga timeout") + _ = tx.Rollback() + continue + } + callErr := tm.TimeoutSaga(tx, sagaID) + clrErr := tm.ClearTimeout(tx, sagaID) + + if callErr != nil || clrErr != nil { + logEntry := tm.Log() + if callErr != nil { + logEntry = logEntry.WithError(callErr) + } else { + logEntry = logEntry.WithError(clrErr) + } + logEntry.WithField("sagaID", sagaID).Error("timing out a saga failed") + rlbe := tx.Rollback() + if rlbe != nil { + tm.Log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") + } + return + } + + cmte := tx.Commit() + if cmte != nil { + tm.Log().WithError(cmte).Warn("timeout manager failed to commit transaction") + } + } + +} + +//RegisterTimeout requests a timeout from the timeout manager +func (tm *TimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error { + + timeoutTime := time.Now().UTC().Add(duration) + + insertSQL := "INSERT INTO " + tm.timeoutsTableName + " (saga_id, timeout) VALUES(?, ?)" + _, insertErr := tx.Exec(insertSQL, sagaID, timeoutTime) + if insertErr == nil { + tm.Log().WithField("timeout_duration", duration).Debug("timout inserted into timeout manager") + } + + return insertErr +} + +//ClearTimeout clears a timeout for a specific saga +func (tm *TimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error { + + deleteSQL := `delete from ` + tm.timeoutsTableName + ` where saga_id = ?` + _, err := tx.Exec(deleteSQL, sagaID) + return err +} + +//SetTimeoutFunction accepts the timeouting function +func (tm *TimeoutManager) SetTimeoutFunction(timeoutFunc func(tx *sql.Tx, sagaID string) error) { + tm.TimeoutSaga = timeoutFunc +} + +//GetTimeoutsTableName returns the table name in which to store timeouts +func getTimeoutsTableName(svcName string) string { + + var re = regexp.MustCompile(`-|;|\\|`) + sanitized := re.ReplaceAllString(svcName, "") + + return strings.ToLower("grabbit_" + sanitized + "_timeouts") +} + +//NewTimeoutManager creates a new instance of a mysql based TimeoutManager +func NewTimeoutManager(bus gbus.Bus, txp gbus.TxProvider, logger func() logrus.FieldLogger, svcName string, purge bool) *TimeoutManager { + + timeoutsTableName := getTimeoutsTableName(svcName) + tm := &TimeoutManager{ + Log: logger, + Bus: bus, + Txp: txp, + SvcName: svcName, + timeoutsTableName: timeoutsTableName, + exit: make(chan bool)} + + if err := tm.ensureSchema(); err != nil { + panic(err) + } + if purge { + if err := tm.purge(); err != nil { + panic(err) + } + } + return tm + +} diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 2c49610..2081a36 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -195,8 +195,8 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { recID := outbox.recordsPendingConfirms[deliveryTag] outbox.gl.Unlock() /* - since the messages get sent to rabbitmq and then the outbox table gets updated with the deilvery tag for teh record - it may be that we recived a acked deliveryTag that is not yet registered in the outbox table. + since the messages get sent to rabbitmq and then the outbox table gets updated with the deilvery tag for the record + it may be that we received a acked deliveryTag that is not yet registered in the outbox table. in that case we just place the deliveryTag back in the ack channel so it can be picked up and re processed later we place it in the channel using a new goroutine so to not deadlock if there is only a single goroutine draining the ack channel */ @@ -310,7 +310,7 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, if cmtErr := tx.Commit(); cmtErr != nil { outbox.log().WithError(cmtErr).Error("Error committing outbox transaction") } else { - //only after the tx has commited successfully add the recordids so they can be picked up by confirms + //only after the tx has committed successfully add the recordids so they can be picked up by confirms outbox.gl.Lock() defer outbox.gl.Unlock() for deliveryTag, recID := range successfulDeliveries { diff --git a/gbus/tx/sagastore.go b/gbus/tx/sagastore.go index c7e1547..617b925 100644 --- a/gbus/tx/sagastore.go +++ b/gbus/tx/sagastore.go @@ -4,6 +4,7 @@ import ( "bytes" "database/sql" "encoding/gob" + "errors" "fmt" "reflect" "regexp" @@ -116,8 +117,16 @@ func (store *SagaStore) RegisterSagaType(saga gbus.Saga) { func (store *SagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error { tblName := store.GetSagatableName() deleteSQL := `DELETE FROM ` + tblName + ` WHERE saga_id= ?` - _, err := tx.Exec(deleteSQL, instance.ID) - return err + result, err := tx.Exec(deleteSQL, instance.ID) + if err != nil { + return err + } + rowsEffected, e := result.RowsAffected() + if rowsEffected == 0 || e != nil { + return errors.New("couldn't delete saga, saga not found orr an error occurred") + } + + return nil } //GetSagaByID implements interface method store.GetSagaByID @@ -133,6 +142,10 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, store.log().WithError(err).Error("could not close rows") } }() + + if err != nil && err == sql.ErrNoRows { + return nil, saga.ErrInstanceNotFound + } if err != nil { store.log().WithError(err). WithFields(log.Fields{"saga_id": sagaID, "table_name": store.GetSagatableName()}). diff --git a/gbus/worker.go b/gbus/worker.go index 2ca5683..1a16a02 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -6,12 +6,12 @@ import ( "errors" "fmt" "math/rand" - "reflect" - "runtime" "runtime/debug" "sync" "time" + "github.com/wework/grabbit/gbus/metrics" + "github.com/Rican7/retry" "github.com/Rican7/retry/backoff" "github.com/Rican7/retry/jitter" @@ -37,7 +37,6 @@ type worker struct { registrations []*Registration rpcHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error - isTxnl bool b *DefaultBus serializer Serializer txProvider TxProvider @@ -127,8 +126,6 @@ func (worker *worker) consumeMessages() { if shouldProceed { worker.processMessage(delivery, isRPCreply) - } else { - worker.log().WithField("message_id", delivery.MessageId).Warn("no proceed") } } @@ -236,22 +233,30 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { if txCreateErr != nil { worker.log().WithError(txCreateErr).Error("failed creating new tx") worker.span.LogFields(slog.Error(txCreateErr)) - _ = worker.ack(delivery) + _ = worker.reject(true, delivery) return } - var fn func() error err := worker.deadletterHandler(tx, delivery) + var reject bool if err != nil { worker.log().WithError(err).Error("failed handling deadletter") worker.span.LogFields(slog.Error(err)) - fn = tx.Rollback + err = worker.SafeWithRetries(tx.Rollback, MaxRetryCount) + reject = true } else { - fn = tx.Commit + err = worker.SafeWithRetries(tx.Commit, MaxRetryCount) } - err = worker.SafeWithRetries(fn, MaxRetryCount) + if err != nil { worker.log().WithError(err).Error("Rollback/Commit deadletter handler message") worker.span.LogFields(slog.Error(err)) + reject = true + } + + if reject { + _ = worker.reject(true, delivery) + } else { + _ = worker.ack(delivery) } } @@ -322,6 +327,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { _ = worker.ack(delivery) } else { _ = worker.reject(false, delivery) + metrics.ReportRejectedMessage() } } @@ -331,16 +337,13 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan // each retry should run a new and separate transaction which should end with a commit or rollback action := func(attempt uint) (actionErr error) { - var tx *sql.Tx - var txCreateErr error - if worker.isTxnl { - tx, txCreateErr = worker.txProvider.New() + + tx, txCreateErr := worker.txProvider.New() if txCreateErr != nil { worker.log().WithError(txCreateErr).Error("failed creating new tx") worker.span.LogFields(slog.Error(txCreateErr)) return txCreateErr } - } worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers") worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1))) @@ -349,12 +352,10 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") actionErr = errors.New(pncMsg) - if worker.isTxnl { - rbkErr := tx.Rollback() + rbkErr := tx.Rollback() if rbkErr != nil { worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler panic") } - } worker.span.LogFields(slog.Error(actionErr)) } worker.span.Finish() @@ -363,7 +364,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan var hspan opentracing.Span var hsctx context.Context for _, handler := range handlers { - hspan, hsctx = opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()) + hspan, hsctx = opentracing.StartSpanFromContext(sctx, handler.Name()) ctx := &defaultInvocationContext{ invocingSvc: delivery.ReplyTo, @@ -378,8 +379,10 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan MaxRetryCount: MaxRetryCount, }, } - ctx.SetLogger(worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name())) - handlerErr = handler(ctx, message) + ctx.SetLogger(worker.log().WithField("handler", handler.Name())) + handlerErr = metrics.RunHandlerWithMetric(func() error { + return handler(ctx, message) + }, handler.Name(), worker.log()) if handlerErr != nil { hspan.LogFields(slog.Error(handlerErr)) break @@ -388,22 +391,18 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan } if handlerErr != nil { hspan.LogFields(slog.Error(handlerErr)) - if worker.isTxnl { - rbkErr := tx.Rollback() + rbkErr := tx.Rollback() if rbkErr != nil { worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") } - } hspan.Finish() return handlerErr } - if worker.isTxnl { - cmtErr := tx.Commit() + cmtErr := tx.Commit() if cmtErr != nil { - worker.log().WithError(cmtErr).Error("failed commiting transaction after invoking handlers") + worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") return cmtErr } - } return nil } diff --git a/go.mod b/go.mod index 450e785..ddd8cf2 100644 --- a/go.mod +++ b/go.mod @@ -3,27 +3,42 @@ module github.com/wework/grabbit require ( github.com/DataDog/zstd v1.4.0 // indirect github.com/Rican7/retry v0.1.0 - github.com/Shopify/sarama v1.22.1 // indirect + github.com/Shopify/sarama v1.23.0 // indirect github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15 + github.com/eapache/go-resiliency v1.2.0 // indirect + github.com/go-kit/kit v0.9.0 // indirect github.com/go-sql-driver/mysql v1.4.1 - github.com/golang/protobuf v1.3.1 + github.com/gogo/protobuf v1.2.1 // indirect + github.com/golang/protobuf v1.3.2 + github.com/jcmturner/gofork v1.0.0 // indirect + github.com/kisielk/errcheck v1.2.0 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kr/pretty v0.1.0 // indirect + github.com/kr/pty v1.1.8 // indirect github.com/linkedin/goavro v2.1.0+incompatible github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.0.5+incompatible // indirect + github.com/pkg/errors v0.8.1 // indirect + github.com/prometheus/client_golang v1.0.0 + github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 + github.com/prometheus/common v0.6.0 // indirect + github.com/prometheus/procfs v0.0.3 // indirect + github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 // indirect github.com/rs/xid v1.2.1 github.com/sirupsen/logrus v1.4.2 github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 - golang.org/x/net v0.0.0-20190603091049-60506f45cf65 // indirect - golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed // indirect - golang.org/x/text v0.3.2 // indirect - google.golang.org/appengine v1.6.0 // indirect + github.com/stretchr/objx v0.2.0 // indirect + golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect + golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect + golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect + golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d // indirect + google.golang.org/appengine v1.6.1 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect gopkg.in/yaml.v2 v2.2.2 // indirect ) diff --git a/go.sum b/go.sum index f96bd08..a8e86ca 100644 --- a/go.sum +++ b/go.sum @@ -5,10 +5,18 @@ github.com/Rican7/retry v0.1.0 h1:FqK94z34ly8Baa6K+G8Mmza9rYWTKOJk+yckIBB5qVk= github.com/Rican7/retry v0.1.0/go.mod h1:FgOROf8P5bebcC1DS0PdOQiqGUridaZvikzUmkFW6gg= github.com/Shopify/sarama v1.22.1 h1:exyEsKLGyCsDiqpV5Lr4slFi8ev2KiM3cP1KZ6vnCQ0= github.com/Shopify/sarama v1.22.1/go.mod h1:FRzlvRpMFO/639zY1SDxUxkqH97Y0ndM5CbGj6oG3As= +github.com/Shopify/sarama v1.23.0 h1:slvlbm7bxyp7sKQbUwha5BQdZTqurhRoI+zbKorVigQ= +github.com/Shopify/sarama v1.23.0/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A= github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15 h1:QuKWm+/gc4/EuT8SCBAn1qcTh576rg0KoLfi7a0ArMM= github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15/go.mod h1:NBrM4f6cInyw9KSBFONNXzpvPQ/WGige7ON42RICbWM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -16,31 +24,60 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= +github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -53,49 +90,105 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= +github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= +github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 h1:eUm8ma4+yPknhXtkYlWh3tMkE6gBjXZToDned9s2gbQ= +github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/gokrb5.v7 v7.3.0 h1:0709Jtq/6QXEuWRfAm260XqlpcwL1vxtO1tUE2qK8Z4= +gopkg.in/jcmturner/gokrb5.v7 v7.3.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4= gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/tests/bus_test.go b/tests/bus_test.go index eb21c5b..953a0a2 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "github.com/wework/grabbit/gbus/metrics" "reflect" "sync" "testing" @@ -144,6 +145,11 @@ func TestSubscribingOnTopic(t *testing.T) { <-proceed } +var ( + handlerRetryProceed = make(chan bool) + attempts = 0 +) + func TestHandlerRetry(t *testing.T) { c1 := Command1{} @@ -153,34 +159,45 @@ func TestHandlerRetry(t *testing.T) { bus := createBusForTest() - proceed := make(chan bool) cmdHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { return invocation.Reply(noopTraceContext(), reply) } - attempts := 0 - replyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { - if attempts == 0 { - attempts++ - return fmt.Errorf("expecting retry on errors") - } else if attempts == 1 { - attempts++ - panic("expecting retry on panics") - } else { - proceed <- true - } - return nil - } - bus.HandleMessage(c1, cmdHandler) - bus.HandleMessage(r1, replyHandler) + bus.HandleMessage(r1, handleRetry) bus.Start() defer bus.Shutdown() bus.Send(noopTraceContext(), testSvc1, cmd) - <-proceed + <-handlerRetryProceed + + hm := metrics.GetHandlerMetrics("handleRetry") + if hm == nil { + t.Error("Metrics for handleRetry should be initiated") + } + f, _ := hm.GetFailureCount() + s, _ := hm.GetSuccessCount() + if f != 2 { + t.Errorf("Failure count should be 2 but was %f", f) + } + if s != 1 { + t.Errorf("Success count should be 1 but was %f", s) + } +} + +func handleRetry(invocation gbus.Invocation, message *gbus.BusMessage) error { + if attempts == 0 { + attempts++ + return fmt.Errorf("expecting retry on errors") + } else if attempts == 1 { + attempts++ + panic("expecting retry on panics") + } else { + handlerRetryProceed <- true + } + return nil } func TestRPC(t *testing.T) { @@ -215,8 +232,8 @@ func TestDeadlettering(t *testing.T) { var waitgroup sync.WaitGroup waitgroup.Add(2) poision := gbus.NewBusMessage(PoisionMessage{}) - service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true) - deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true) + service1 := createNamedBusForTest(testSvc1) + deadletterSvc := createNamedBusForTest("deadletterSvc") deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { waitgroup.Done() @@ -239,6 +256,55 @@ func TestDeadlettering(t *testing.T) { service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) waitgroup.Wait() + count, _ := metrics.GetRejectedMessagesValue() + if count != 1 { + t.Error("Should have one rejected message") + } +} + +func TestReturnDeadToQueue(t *testing.T) { + + var visited bool + proceed := make(chan bool, 0) + poision := gbus.NewBusMessage(Command1{}) + + service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true, + gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) + + deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true, + gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) + + deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { + pub := amqpDeliveryToPublishing(poision) + deadletterSvc.ReturnDeadToQueue(context.Background(), &pub) + return nil + } + + faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + if visited { + proceed <- true + return nil + } + visited = true + return errors.New("fail") + } + + deadletterSvc.HandleDeadletter(deadMessageHandler) + service1.HandleMessage(Command1{}, faultyHandler) + + deadletterSvc.Start() + defer deadletterSvc.Shutdown() + service1.Start() + defer service1.Shutdown() + + service1.Send(context.Background(), testSvc1, poision) + + select { + case <-proceed: + fmt.Println("success") + case <-time.After(2 * time.Second): + t.Fatal("timeout, failed to resend dead message to queue") + } } func TestRegistrationAfterBusStarts(t *testing.T) { @@ -354,6 +420,24 @@ func noopTraceContext() context.Context { // return ctx } +func amqpDeliveryToPublishing(del amqp.Delivery) (pub amqp.Publishing) { + pub.Headers = del.Headers + pub.ContentType = del.ContentType + pub.ContentEncoding = del.ContentEncoding + pub.DeliveryMode = del.DeliveryMode + pub.Priority = del.Priority + pub.CorrelationId = del.CorrelationId + pub.ReplyTo = del.ReplyTo + pub.Expiration = del.Expiration + pub.MessageId = del.MessageId + pub.Timestamp = del.Timestamp + pub.Type = del.Type + pub.UserId = del.UserId + pub.AppId = del.AppId + pub.Body = del.Body + return +} + type panicPolicy struct { } diff --git a/tests/consts.go b/tests/consts.go index ef34b42..51fb49a 100644 --- a/tests/consts.go +++ b/tests/consts.go @@ -20,18 +20,14 @@ func init() { testSvc3 = "testSvc3" } -func createBusForTest() gbus.Bus { - return createNamedBusForTest(testSvc1) -} - -func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbus.Bus { +func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration) gbus.Bus { busBuilder := builder. New(). Bus(connStr). WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}). WorkerNum(3, 1). WithConfirms(). - WithConfiguration(gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15}) + WithConfiguration(conf) if txnl { busBuilder = busBuilder.Txnl("mysql", "rhinof:rhinof@/rhinof") @@ -46,6 +42,10 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu return busBuilder.Build(svcName) } +func createBusForTest() gbus.Bus { + return createNamedBusForTest(testSvc1) +} + func createNamedBusForTest(svcName string) gbus.Bus { - return createBusWithOptions(svcName, "dead-grabbit", true, true) + return createBusWithConfig(svcName, "dead-grabbit", true, true, gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15}) } diff --git a/tests/metrics_test.go b/tests/metrics_test.go new file mode 100644 index 0000000..dcfcc75 --- /dev/null +++ b/tests/metrics_test.go @@ -0,0 +1,137 @@ +package tests + +import ( + "errors" + "github.com/sirupsen/logrus" + "github.com/wework/grabbit/gbus/metrics" + "testing" +) + +var ( + logger logrus.FieldLogger + runningTries = 5 +) + +func TestAddHandlerMetrics(t *testing.T) { + name := "handler1" + metrics.AddHandlerMetrics(name) + hm := metrics.GetHandlerMetrics(name) + + if hm == nil { + t.Error("Failed to create handler metrics") + } + + metrics.AddHandlerMetrics(name) + hm1 := metrics.GetHandlerMetrics(name) + + if hm1 == nil { + t.Error("Failed to create handler metrics") + } + + if hm1 != hm { + t.Error("Created two handlers with the same name") + } + + differentName := "handler2" + metrics.AddHandlerMetrics(differentName) + hm2 := metrics.GetHandlerMetrics(differentName) + + if hm2 == nil { + t.Error("Failed to create handler metrics") + } + + if hm2 == hm { + t.Error("Failed to create a different handler metrics") + } +} + +func TestRunHandlerWithMetric_FailureCounter(t *testing.T) { + logger = logrus.WithField("testCase", "TestRunHandlerWithMetric_FailureCounter") + name := "failure" + metrics.AddHandlerMetrics(name) + hm := metrics.GetHandlerMetrics(name) + + if hm == nil { + t.Errorf("Couldn't find handler with the name %s", name) + } + failure := func() error { + return errors.New("error in running handler") + } + + for i := 1; i < runningTries; i++ { + err := metrics.RunHandlerWithMetric(failure, name, logger) + + if err == nil { + t.Error("Failed handler run should return an error") + } + + count, err := hm.GetFailureCount() + + if err != nil { + t.Errorf("Failed to get counter value: %e", err) + } + if count != float64(i) { + t.Errorf("Expected to get %f as the value of the failure counter, but got %f", float64(i), count) + } + } +} + +func TestRunHandlerWithMetric_SuccessCounter(t *testing.T) { + logger = logrus.WithField("testCase", "TestRunHandlerWithMetric_SuccessCounter") + name := "success" + metrics.AddHandlerMetrics(name) + success := func() error { + return nil + } + hm := metrics.GetHandlerMetrics(name) + + if hm == nil { + t.Errorf("Couldn't find handler with the name %s", name) + } + + for i := 1; i < runningTries; i++ { + err := metrics.RunHandlerWithMetric(success, name, logger) + + if err != nil { + t.Error("Successful handler run shouldn't return an error") + } + + count, err := hm.GetSuccessCount() + + if err != nil { + t.Errorf("Failed to get counter value: %e", err) + } + if count != float64(i) { + t.Errorf("Expected to get %f as the value of the success counter, but got %f", float64(i), count) + } + } +} + +func TestRunHandlerWithMetric_Latency(t *testing.T) { + logger = logrus.WithField("testCase", "TestRunHandlerWithMetric_ExceededRetriesCounter") + name := "latency" + metrics.AddHandlerMetrics(name) + success := func() error { + return nil + } + hm := metrics.GetHandlerMetrics(name) + + if hm == nil { + t.Errorf("Couldn't find handler with the name %s", name) + } + + for i := 1; i < runningTries; i++ { + _ = metrics.RunHandlerWithMetric(success, name, logger) + sc, err := hm.GetLatencySampleCount() + + if err != nil { + t.Errorf("Failed to get latency value: %e", err) + } + if sc == nil { + t.Errorf("Expected latency sample count not be nil") + } + if *sc != uint64(i) { + t.Errorf("Expected to get %d as the value of the latency sample count, but got %d", uint64(i), *sc) + } + } +} diff --git a/tests/saga_test.go b/tests/saga_test.go index c4064a6..d9b380d 100644 --- a/tests/saga_test.go +++ b/tests/saga_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/metrics" ) /* @@ -225,6 +226,10 @@ func TestSagaTimeout(t *testing.T) { } <-proceed + timeoutCounter, e := metrics.GetSagaTimeoutCounterValue() + if timeoutCounter != 1 || e != nil { + t.Errorf("saga timeout counter expected to be 1 actual %v", timeoutCounter) + } } func TestSagaSelfMessaging(t *testing.T) {