Skip to content

Commit

Permalink
Merge branch 'v1.x' into metrics_on_dead_letter_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
yuvmendel committed Aug 18, 2019
2 parents 8440cf1 + 6c2a9e6 commit 5b8039e
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 5 deletions.
17 changes: 17 additions & 0 deletions .github/stale.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Number of days of inactivity before an issue becomes stale
daysUntilStale: 7
# Number of days of inactivity before a stale issue is closed
daysUntilClose: 7
# Issues with these labels will never be considered stale
exemptLabels:
- bug
- enhancment
# Label to use when marking an issue as stale
staleLabel: wontfix
# Comment to post when marking an issue as stale. Set to `false` to disable
markComment: >
This issue has been automatically marked as stale because it has not had
recent activity. It will be closed if no further activity occurs. Thank you
for your contributions.
# Comment to post when closing a stale issue. Set to `false` to disable
closeComment: false
2 changes: 1 addition & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type Saga interface {
New() Saga
}

//RegisterDeadletterHandler provides the ability to handle messages that were rejected as poison and arrive to the deadletter queue
//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type Deadlettering interface {
HandleDeadletter(handler DeadLetterMessageHandler)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
Expand Down
2 changes: 2 additions & 0 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
panic(err)
}
gb.TxProvider = mysqltx

mysql.EnsureSchema(mysqltx.Database, gb.SvcName)

//TODO move purge logic into the NewSagaStore factory method
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
if builder.purgeOnStartup {
Expand Down
1 change: 1 addition & 0 deletions gbus/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type defaultInvocationContext struct {
deliveryInfo DeliveryInfo
}

//DeliveryInfo provdes information as to the attempted deilvery of the invocation
type DeliveryInfo struct {
Attempt uint
MaxRetryCount uint
Expand Down
6 changes: 6 additions & 0 deletions gbus/metrics/handler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type handlerMetrics struct {
latency prometheus.Summary
}

//AddHandlerMetrics adds a handlere to be tracked with metrics
func AddHandlerMetrics(handlerName string) {
handlerMetrics := newHandlerMetrics(handlerName)
_, exists := handlerMetricsByHandlerName.LoadOrStore(handlerName, handlerMetrics)
Expand All @@ -35,6 +36,7 @@ func AddHandlerMetrics(handlerName string) {
}
}

//RunHandlerWithMetric runs a specific handler with metrics being collected and reported to prometheus
func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error {
handlerMetrics := GetHandlerMetrics(handlerName)
defer func() {
Expand Down Expand Up @@ -63,6 +65,7 @@ func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger
return err
}

//GetHandlerMetrics gets the metrics handler associated with the handlerName
func GetHandlerMetrics(handlerName string) *handlerMetrics {
entry, ok := handlerMetricsByHandlerName.Load(handlerName)
if ok {
Expand Down Expand Up @@ -99,14 +102,17 @@ func trackTime(functionToTrack func() error, observer prometheus.Observer) error
return functionToTrack()
}

//GetSuccessCount gets the value of the handlers success value
func (hm *handlerMetrics) GetSuccessCount() (float64, error) {
return hm.getLabeledCounterValue(success)
}

//GetFailureCount gets the value of the handlers failure value
func (hm *handlerMetrics) GetFailureCount() (float64, error) {
return hm.getLabeledCounterValue(failure)
}

//GetLatencySampleCount gets the value of the handlers latency value
func (hm *handlerMetrics) GetLatencySampleCount() (*uint64, error) {
m := &io_prometheus_client.Metric{}
err := hm.latency.Write(m)
Expand Down
4 changes: 3 additions & 1 deletion gbus/metrics/message_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_model/go"
io_prometheus_client "github.com/prometheus/client_model/go"
)

var (
rejectedMessages = newRejectedMessagesCounter()
)

//ReportRejectedMessage reports a message being rejected to the metrics counter
func ReportRejectedMessage() {
rejectedMessages.Inc()
}

//GetRejectedMessagesValue gets the value of the rejected message counter
func GetRejectedMessagesValue() (float64, error) {
m := &io_prometheus_client.Metric{}
err := rejectedMessages.Write(m)
Expand Down
1 change: 0 additions & 1 deletion gbus/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) erro
//Shutdown stops the outbox
func (out *AMQPOutbox) Shutdown() {
close(out.stop)

}

//Post implements Outbox.Send
Expand Down
12 changes: 10 additions & 2 deletions gbus/tx/mysql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package mysql

import (
"database/sql"
"regexp"
"strings"

"fmt"
"github.com/lopezator/migrator"
"github.com/wework/grabbit/gbus/tx"
)
Expand Down Expand Up @@ -86,7 +87,7 @@ func TimoutTableMigration(svcName string) *migrator.Migration {

//EnsureSchema implements Grabbit's migrations strategy
func EnsureSchema(db *sql.DB, svcName string) {
migrationsTable := fmt.Sprintf("grabbitMigrations_%s", svcName)
migrationsTable := sanitizedMigrationsTable(svcName)

migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations(
OutboxMigrations(svcName),
Expand All @@ -101,3 +102,10 @@ func EnsureSchema(db *sql.DB, svcName string) {
panic(err)
}
}

func sanitizedMigrationsTable(svcName string) string {
var re = regexp.MustCompile(`-|;|\\|`)
sanitized := re.ReplaceAllString(svcName, "")

return strings.ToLower("grabbitMigrations_" + sanitized)
}
11 changes: 11 additions & 0 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,17 @@ func TestHealthCheck(t *testing.T) {
}
}

func TestSanitizingSvcName(t *testing.T) {
svc4 := createNamedBusForTest(testSvc4)
err := svc4.Start()
if err != nil {
t.Error(err.Error())
}
defer svc4.Shutdown()

fmt.Println("succeeded sanitizing service name")
}

func noopTraceContext() context.Context {
return context.Background()
// tracer := opentracing.NoopTracer{}
Expand Down
2 changes: 2 additions & 0 deletions tests/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ var connStr string
var testSvc1 string
var testSvc2 string
var testSvc3 string
var testSvc4 string

func init() {
connStr = "amqp://rabbitmq:rabbitmq@localhost"
testSvc1 = "testSvc1"
testSvc2 = "testSvc2"
testSvc3 = "testSvc3"
testSvc4 = "test-svc4"
}

func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration) gbus.Bus {
Expand Down

0 comments on commit 5b8039e

Please sign in to comment.