Skip to content

Commit

Permalink
v1.x rollup to master (#121)
Browse files Browse the repository at this point in the history
* add handler metrics to bus and saga (#101)

* add handler metrics to bus and saga + tests

* fix build

* add 0 to the default buckets to catch fast message handling

* PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers

* PR correction - getting logger as a param

* PR correction - new line in eof

* PR corrections message handler + sync.map + latency as summary

* add rejected messages metric

* dead letter handler should reject messages on failures and rollbacks and ack on commit success  (#105)

* dead letter handler should reject messages on failures and rollbacks

* dead letter handler should reject messages on failures and rollbacks

* dead letter handler should reject messages on failures and rollbacks

* dead letter handler should reject messages on failures and rollbacks

* return an error from the saga store when deleting a saga if saga can not (#110)

be found

In order to deal with concurrent deletes of the sage saga instance we
would wan't to indicate that deleting the saga failed if the saga is not
stored so callers can take proper action

* Persisted timeouts (#107)

* decouple transaction manager from glue

* moved timeout manager to gbus/tx package

* initial commit in order to support persisted timeouts

* first working version of a mysql persisted timeout manager

* fixing ci lint errors

* refactored ensure schema of timeout manager

* cleanup timeout manager when bs shuts down

* fixing formatting issues

* changed logging level from Info to Debug when inserting a new timeout

* resusing timeouts tablename (PR review)

* renamed AcceptTimeoutFunction to SetTimeoutFunction on the
TimeoutManager interface (PR review)

* refactored glue to implement the Logged inetrface and use the GLogged
helper struct

* locking timeout record before executing timeout

In order to prevent having a timeout beeing executed twice due to two
concurrent grabbit instances running the same service a lock (FOR
UPDATE) has been placed on the timeout record  in the scope of the executing transaction

* Commiting the select transaction when querying for pending timeouts

* feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances

* Enable returning a message back from the dead  to the queue (#112)

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* return to q

* return to q

* return to q

* return to q

* return dead to q

* allow no retries

* test - resend dead to queue

* test - resend dead to queue

* test - resend dead to queue

* test - resend dead to queue - fixes after cr

* test - resend dead to queue - fixes after cr

* test - resend dead to queue - fixes after cr

* added metric report on saga timeout (#114)

1) added reporting saga timeouts to the glue component
2) fixed mysql timeoutmanager error when trying to clear a timeout

* Added documentation for grabbit metrics (#117)

* added initial documentation for grabbit metrics

*  including metrics section in readme.md

* fixing goreportcard issues (#118)

* removed logging a warning when worker message channel returns an error (#116)

* corrected saga metrics name and added to metrics documentation (#119)

* corrected saga metrics name and added documentatio

* corrected saga metric name

* corrected typos

* removed non transactional bus mode (#120)
  • Loading branch information
Guy Baron committed Aug 14, 2019
1 parent 44f71ca commit 31d9fc5
Show file tree
Hide file tree
Showing 31 changed files with 1,120 additions and 401 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ debug

# Test binary, built with `go test -c`
*.test

.vscode
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
.DS_Store
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
6) Deadlettering
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus

Planned:

Expand Down
10 changes: 10 additions & 0 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Metrics

grabbit exposes and reports the following metrics to Prometheus

| Namespace | Subsystem | Name | Description |
| ------------- | ------------- | ----------------------------------| --------------------------------------------------------------------------- |
| grabbit | handler | [name of message handler]_result | records and counts each succesfull or failed execution of a message handler |
| grabbit | handler | [name of message handler]_latency | records the execution time of each handler |
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
37 changes: 31 additions & 6 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package gbus
import (
"context"
"database/sql"
"github.com/sirupsen/logrus"
"time"

"github.com/sirupsen/logrus"

"github.com/streadway/amqp"
)

//Semantics reopresents the semantics of a grabbit message
type Semantics string

const (
//CMD represenst a messge with command semantics in grabbit
CMD Semantics = "cmd"
//EVT represenst a messge with event semantics in grabbit
EVT Semantics = "evt"
)

Expand All @@ -25,7 +29,7 @@ type BusConfiguration struct {
//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type Bus interface {
HandlerRegister
RegisterDeadletterHandler
Deadlettering
BusSwitch
Messaging
SagaRegister
Expand Down Expand Up @@ -106,9 +110,6 @@ type HandlerRegister interface {
HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
}

//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//Saga is the base interface for all Sagas.
type Saga interface {
//StartedBy returns the messages that when received should create a new saga instance
Expand All @@ -127,8 +128,9 @@ type Saga interface {
}

//RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type RegisterDeadletterHandler interface {
type Deadlettering interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
Expand All @@ -145,6 +147,14 @@ type SagaRegister interface {
RegisterSaga(saga Saga, conf ...SagaConfFn) error
}

//SagaGlue glues together all the parts needed in order to orchistrate saga instances
type SagaGlue interface {
SagaRegister
Logged
Start() error
Stop() error
}

//Builder is the main interface that should be used to create an instance of a Bus
type Builder interface {
PurgeOnStartUp() Builder
Expand Down Expand Up @@ -217,6 +227,21 @@ type TxOutbox interface {
Stop() error
}

//TimeoutManager abstracts the implementation of determining when a saga should be timed out
type TimeoutManager interface {
//RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance
RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error
//ClearTimeout clears a timeout for a specific saga
ClearTimeout(tx *sql.Tx, sagaID string) error
//SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires
SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error)
//Start starts the timeout manager
Start() error
//Stop shuts the timeout manager down
Stop() error
}

//Logged represents a grabbit component that can be logged
type Logged interface {
SetLogger(entry logrus.FieldLogger)
Log() logrus.FieldLogger
Expand Down
61 changes: 29 additions & 32 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package builder

import (
"fmt"
"github.com/sirupsen/logrus"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/saga"
"github.com/wework/grabbit/gbus/saga/stores"
"github.com/wework/grabbit/gbus/serialization"
"github.com/wework/grabbit/gbus/tx/mysql"
)
Expand Down Expand Up @@ -36,17 +36,14 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
gb := &gbus.DefaultBus{
AmqpConnStr: builder.connStr,
PrefetchCount: builder.PrefetchCount,
Outgoing: &gbus.AMQPOutbox{
SvcName: svcName,
},

SvcName: svcName,
PurgeOnStartup: builder.purgeOnStartup,
DelayedSubscriptions: [][]string{},
HandlersLock: &sync.Mutex{},
RPCLock: &sync.Mutex{},
SenderLock: &sync.Mutex{},
ConsumerLock: &sync.Mutex{},
IsTxnl: builder.txnl,
Registrations: make([]*gbus.Registration, 0),
RPCHandlers: make(map[string]gbus.MessageHandler),
Serializer: builder.serializer,
Expand All @@ -68,46 +65,46 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
gb.WorkerNum = builder.workerNum
}
var (
sagaStore saga.Store
sagaStore saga.Store
timeoutManager gbus.TimeoutManager
)
if builder.txnl {
gb.IsTxnl = true
switch builder.txnlProvider {

case "mysql":
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
switch builder.txnlProvider {

case "mysql":
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
if err != nil {
panic(err)
}
gb.TxProvider = mysqltx
//TODO move purge logic into the NewSagaStore factory method
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
}
gb.TxProvider = mysqltx
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)

default:
err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider)
panic(err)
}
} else {
sagaStore = stores.NewInMemoryStore()
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)

default:
err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider)
panic(err)
}
if builder.usingPingTimeout {
gb.DbPingTimeout = builder.dbPingTimeout
}

//TODO move this into the NewSagaStore factory methods
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
}
}
gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider)
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
gb.Glue = glue
return gb
}

Expand Down Expand Up @@ -179,9 +176,9 @@ func (builder *defaultBuilder) ConfigureHealthCheck(timeoutInSeconds time.Durati
}

func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) gbus.Builder {
if config.MaxRetryCount > 0 {
gbus.MaxRetryCount = config.MaxRetryCount
}

gbus.MaxRetryCount = config.MaxRetryCount

if config.BaseRetryDuration > 0 {
gbus.BaseRetryDuration = time.Millisecond * time.Duration(config.BaseRetryDuration)
}
Expand Down
Loading

0 comments on commit 31d9fc5

Please sign in to comment.