From 74b2a5dbf80ca7d10a9d462f05916b50e93d3b12 Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sat, 20 Apr 2019 15:08:46 +0300 Subject: [PATCH 1/7] added tracing --- gbus/bus.go | 35 +++++++++------ gbus/saga/timeout.go | 6 --- gbus/tx/mysql/txoutbox.go | 10 +++++ gbus/worker.go | 92 +++++++++++++++++++++++++++++---------- go.mod | 9 ++-- go.sum | 18 +++++--- tests/bus_test.go | 48 ++++++++++++++++++++ 7 files changed, 165 insertions(+), 53 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index 94c7b5b..de14d8c 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -5,7 +5,9 @@ import ( "database/sql" "errors" "fmt" - + "github.com/opentracing-contrib/go-amqp/amqptracer" + "github.com/opentracing/opentracing-go" + slog "github.com/opentracing/opentracing-go/log" "runtime/debug" "sync" "time" @@ -549,17 +551,23 @@ func (b *DefaultBus) monitorAMQPErrors() { func (b *DefaultBus) sendImpl(ctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) { b.SenderLock.Lock() - defer b.SenderLock.Unlock() - + span, ctx := opentracing.StartSpanFromContext(ctx, "sendImpl") defer func() { if err := recover(); err != nil { errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack()) er = errors.New(errMsg) + span.LogFields(slog.Error(er)) } + b.SenderLock.Unlock() + span.Finish() }() headers := message.GetAMQPHeaders() + err := amqptracer.Inject(span, headers) + if err != nil { + b.log().WithError(err).Error("could not inject headers") + } buffer, err := b.Serializer.Encode(message.Payload) if err != nil { @@ -575,17 +583,16 @@ func (b *DefaultBus) sendImpl(ctx context.Context, tx *sql.Tx, toService, replyT ContentEncoding: b.Serializer.Name(), Headers: headers, } + span.LogFields( + slog.String("message", message.PayloadFQN), + slog.String("ID", message.ID), + slog.String("SagaID", message.SagaID), + slog.String("CorrelationID", message.CorrelationID), + slog.String("SagaCorrelationID", message.SagaCorrelationID), + slog.String("Semantics", message.Semantics), + slog.String("replyTo", replyTo), + ) - /* TODO:FIX Opentracing context - sp := opentracing.SpanFromContext(ctx) - if sp != nil { - defer sp.Finish() - } - // Inject the span context into the AMQP header. - if err := amqptracer.Inject(sp, msg.Headers); err != nil { - return err - } - */ for _, defaultPolicy := range b.DefaultPolicies { defaultPolicy.Apply(&msg) } @@ -609,7 +616,7 @@ func (b *DefaultBus) sendImpl(ctx context.Context, tx *sql.Tx, toService, replyT b.log().WithField("message_id", msg.MessageId).Info("sending message to outbox") saveErr := b.Outbox.Save(tx, exchange, key, msg) if saveErr != nil { - log.Printf("fialed to save to transactional outbox\n%v", saveErr) + log.WithError(saveErr).Error("failed to save to transactional outbox") } return saveErr } diff --git a/gbus/saga/timeout.go b/gbus/saga/timeout.go index e19649a..1f8fe8e 100644 --- a/gbus/saga/timeout.go +++ b/gbus/saga/timeout.go @@ -25,12 +25,6 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D SagaID: sagaID} msg := gbus.NewBusMessage(reuqestTimeout) msg.SagaCorrelationID = sagaID - /* TODO:FIX Opentracing - span := opentracing.GlobalTracer().StartSpan("timeout") - if span != nil { - defer span.Finish() - } - */ if err := tm.bus.Send(context.Background(), svcName, msg); err != nil { //TODO: add logger logrus.WithError(err).Error("could not send timeout to bus") diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index dd4fc73..4e1b420 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -354,6 +354,16 @@ func (outbox *TxOutbox) migrate0_9To1_0(tx *sql.Tx, svcName string) error { return nil } +func (outbox *TxOutbox) migrate1_0To1_1(tx *sql.Tx, svcName string) error { + tblName := getOutboxName(svcName) + alter := `ALTER TABLE ` + tblName + ` ADD COLUMN ctx ;` + _, execErr := tx.Exec(alter) + if execErr != nil { + outbox.log().WithField("sql_err", execErr).Info("renaming column") + } + return nil +} + func getOutboxName(svcName string) string { return strings.ToLower("grabbit_" + sanitizeTableName(svcName) + "_outbox") diff --git a/gbus/worker.go b/gbus/worker.go index 2b080bd..0cffe1f 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -5,7 +5,11 @@ import ( "database/sql" "errors" "fmt" - + "github.com/opentracing-contrib/go-amqp/amqptracer" + "github.com/opentracing/opentracing-go" + slog "github.com/opentracing/opentracing-go/log" + "reflect" + "runtime" "runtime/debug" "sync" "time" @@ -219,37 +223,39 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { } func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { + var span opentracing.Span + var ctx context.Context + var spanOptions []opentracing.StartSpanOption + + spCtx, err := amqptracer.Extract(delivery.Headers) + if err != nil { + worker.log().WithError(err).Error("could not extract SpanContext from headers") + } else { + spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx)) + } + span, ctx = opentracing.StartSpanFromContext(context.Background(), "processMessage", spanOptions...) //catch all error handling so goroutine will not crash defer func() { if r := recover(); r != nil { logEntry := worker.log().WithField("worker", worker.consumerTag) if err, ok := r.(error); ok { + span.LogFields(slog.Error(err)) logEntry = logEntry.WithError(err) } else { logEntry = logEntry.WithField("panic", r) } - + span.LogFields(slog.String("panic", "failed to process message")) logEntry.Error("failed to process message") } + span.Finish() }() worker.log().WithFields(log.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") - // worker.log("%v GOT MSG - Worker %v - MessageId %v", worker.svcName, worker.consumerTag, delivery.MessageId) - /*TODO:FIX Opentracing - spCtx, _ := amqptracer.Extract(delivery.Headers) - sp := opentracing.StartSpan( - "processMessage", - opentracing.FollowsFrom(spCtx), - ) - if sp != nil { - defer sp.Finish() - } - */ - // Update the context with the span for the subsequent reference. //handle a message that originated from a deadletter exchange if worker.isDead(delivery) { + span.LogFields(slog.Error(errors.New("delivery is dead"))) worker.log().Info("invoking deadletter handler") worker.invokeDeadletterHandler(delivery) return @@ -257,10 +263,23 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { bm, err := worker.extractBusMessage(delivery) if err != nil { + span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) //reject poison message - worker.Reject(false, delivery) + err = worker.Reject(false, delivery) + if err != nil { + span.LogFields(slog.Error(err)) + } return } + span.LogFields( + slog.String("message", bm.PayloadFQN), + slog.String("ID", bm.ID), + slog.String("SagaID", bm.SagaID), + slog.String("CorrelationID", bm.CorrelationID), + slog.String("SagaCorrelationID", bm.SagaCorrelationID), + slog.String("Semantics", bm.Semantics), + slog.Bool("isRPCreply", isRPCreply), + ) //TODO:Dedup message handlers := worker.resolveHandlers(isRPCreply, bm, delivery) if len(handlers) == 0 { @@ -269,9 +288,13 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { log.Fields{"message-name": bm.PayloadFQN, "message-type": bm.Semantics}). Warn("Message received but no handlers found") + span.LogFields(slog.String("grabbit", "no handlers found")) // worker.log("Message received but no handlers found\nMessage name:%v\nMessage Type:%v\nRejecting message", bm.PayloadFQN, bm.Semantics) //remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue - worker.Ack(delivery) + err = worker.Ack(delivery) + if err != nil { + span.LogFields(slog.Error(err)) + } return } @@ -281,13 +304,17 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { tx, txErr = worker.txProvider.New() if txErr != nil { worker.log().WithError(txErr).Error("failed to create transaction") + span.LogFields(slog.Error(txErr)) //reject the message but requeue it so it gets redelivered until we can create transactions - worker.Reject(true, delivery) + err = worker.Reject(true, delivery) + if err != nil { + span.LogFields(slog.Error(err)) + } return } } var ackErr, commitErr, rollbackErr, rejectErr error - invkErr := worker.invokeHandlers(context.Background(), handlers, bm, &delivery, tx) + invkErr := worker.invokeHandlers(ctx, handlers, bm, &delivery, tx) // if all handlers executed with out errors then commit the transactional if the bus is transactional // if the tranaction committed successfully then ack the message. @@ -303,22 +330,31 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { if ackErr != nil { // if this fails then the message will be eventually redeilvered by RabbitMQ //sp handlers should be idempotent. - worker.log().WithField("ack_error", ackErr).Warn("failed to send ack to the broker") + span.LogFields(slog.Error(ackErr)) + worker.log().WithError(ackErr).Warn("failed to send ack to the broker") } } else { + span.LogFields(slog.Error(commitErr)) worker.log().WithError(commitErr).Error("failed committing transaction") //if the commit failed we will reject the message - worker.Reject(false, delivery) + err = worker.Reject(false, delivery) + if err != nil { + span.LogFields(slog.Error(err)) + worker.log().WithError(err).Warn("failed to send reject to the broker") + + } } } else { /*if the bus in not transactional just try acking the message*/ ackErr = worker.Ack(delivery) if ackErr != nil { - worker.log().WithField("ack_error", ackErr).Warn("failed to send ack to the broker") + span.LogFields(slog.Error(ackErr)) + worker.log().WithError(ackErr).Warn("failed to send ack to the broker") } } //else there was an error in the invokation then try rollingback the transaction and reject the message } else { + span.LogFields(slog.Error(invkErr)) worker.log().WithError(invkErr).WithFields(log.Fields{"message_name": bm.PayloadFQN, "semantics": bm.Semantics}).Error("Failed to consume message due to failure of one or more handlers.\n Message rejected as poison") if worker.isTxnl { @@ -326,12 +362,14 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { rollbackErr = worker.SafeWithRetries(tx.Rollback, MaxRetryCount) if rollbackErr != nil { + span.LogFields(slog.Error(rollbackErr)) worker.log().WithError(rollbackErr).Error("failed to rollback transaction") } } rejectErr = worker.Reject(false, delivery) if rejectErr != nil { + span.LogFields(slog.Error(rejectErr)) worker.log().WithError(rejectErr).Error("failed to reject message") } } @@ -340,26 +378,32 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHandler, message *BusMessage, delivery *amqp.Delivery, tx *sql.Tx) (err error) { action := func(attempts uint) (actionErr error) { + span, sctx := opentracing.StartSpanFromContext(sctx, "invokeHandlers") + span.LogFields(slog.Uint64("attempt", uint64(attempts+1))) defer func() { if p := recover(); p != nil { - pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") actionErr = errors.New(pncMsg) + span.LogFields(slog.Error(actionErr)) } + span.Finish() }() for _, handler := range handlers { + hspan, hsctx := opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()) + hspan.Finish() + ctx := &defaultInvocationContext{ invocingSvc: delivery.ReplyTo, bus: worker.b, inboundMsg: message, tx: tx, - ctx: sctx, + ctx: hsctx, exchange: delivery.Exchange, routingKey: delivery.RoutingKey} - e := handler(ctx, message) if e != nil { + hspan.LogFields(slog.Error(e)) return e } } diff --git a/go.mod b/go.mod index 4019b56..61a9675 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/wework/grabbit require ( - github.com/DataDog/zstd v1.3.8 // indirect + github.com/DataDog/zstd v1.4.0 // indirect github.com/Rican7/retry v0.1.0 github.com/Shopify/sarama v1.22.0 // indirect github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect @@ -10,15 +10,18 @@ require ( github.com/golang/protobuf v1.3.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kr/pretty v0.1.0 // indirect - github.com/lib/pq v1.0.0 + github.com/lib/pq v1.1.0 github.com/linkedin/goavro v2.1.0+incompatible github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect + github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 + github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/rs/xid v1.2.1 github.com/sirupsen/logrus v1.4.1 github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 - golang.org/x/sys v0.0.0-20190411185658-b44545bcd369 // indirect + golang.org/x/net v0.0.0-20190420063019-afa5a82059c6 // indirect + golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a // indirect google.golang.org/appengine v1.5.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect diff --git a/go.sum b/go.sum index 30c1d9e..2ea3a46 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14= github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/DataDog/zstd v1.3.8 h1:wMrT3Ulre3EsZQi6lPUYWFoA/+fPTW2hYc+GxtXjQEg= -github.com/DataDog/zstd v1.3.8/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo= +github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Rican7/retry v0.1.0 h1:FqK94z34ly8Baa6K+G8Mmza9rYWTKOJk+yckIBB5qVk= github.com/Rican7/retry v0.1.0/go.mod h1:FgOROf8P5bebcC1DS0PdOQiqGUridaZvikzUmkFW6gg= github.com/Shopify/sarama v1.22.0 h1:rtiODsvY4jW6nUV6n3K+0gx/8WlAwVt+Ixt6RIvpYyo= @@ -42,8 +42,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0 h1:/5u4a+KGJptBRqGzPvYQL9p0d/tPR4S31+Tnzj9lEO4= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -51,6 +51,10 @@ github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 h1:OhtbNVqXz6DuVXGvwPYXnNwQy1n2rI+2mID9CQOok9U= +github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620/go.mod h1:UTAgTV5+tXpWiYqczgUb2kCslN9sqcshFQdmHSTyzlU= +github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -80,6 +84,8 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190420063019-afa5a82059c6 h1:HdqqaWmYAUI7/dmByKKEw+yxDksGSo+9GjkUc9Zp34E= +golang.org/x/net v0.0.0-20190420063019-afa5a82059c6/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= @@ -87,8 +93,8 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190411185658-b44545bcd369 h1:aBlRBZoCuZNRDClvfkDoklQqdLzBaA3uViASg2z2p24= -golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a h1:XCr/YX7O0uxRkLq2k1ApNQMims9eCioF9UpzIPBDmuo= +golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c= diff --git a/tests/bus_test.go b/tests/bus_test.go index 8941ddb..ad1bda9 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -4,6 +4,9 @@ import ( "context" "database/sql" "fmt" + "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" + "github.com/opentracing/opentracing-go/mocktracer" "reflect" "testing" @@ -252,6 +255,51 @@ func TestRegistrationAfterBusStarts(t *testing.T) { } +func TestOpenTracingReporting(t *testing.T) { + event := Event1{} + b := createBusForTest() + mockTracer := mocktracer.New() + opentracing.SetGlobalTracer(mockTracer) + + span, ctx := opentracing.StartSpanFromContext(context.Background(), "test_trace") + + span.LogFields(olog.String("event", "TestOpenTracingReporting")) + + proceed := make(chan bool) + eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + proceed <- true + return nil + } + + err := b.HandleEvent("test_exchange", "test_topic", event, eventHandler) + if err != nil { + t.Fatal(err) + } + + err = b.Start() + if err != nil { + t.Fatal(err) + } + defer func() { + err := b.Shutdown() + if err != nil { + t.Fatal(err) + } + }() + err = b.Publish(ctx, "test_exchange", "test_topic", gbus.NewBusMessage(event)) + if err != nil { + t.Fatal(err) + } + + <-proceed + time.Sleep(2 * time.Second) + span.Finish() + spans := mockTracer.FinishedSpans() + if len(spans) < 2 { + t.Fatal("didn't send any traces in the code") + } +} + func noopTraceContext() context.Context { return context.Background() // tracer := opentracing.NoopTracer{} From f8869abeaf2c0eba4c74c2f5a461e8eecc13a7cd Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sat, 20 Apr 2019 15:17:42 +0300 Subject: [PATCH 2/7] go imports... --- gbus/bus.go | 6 +++--- gbus/saga/glue.go | 1 - gbus/saga/timeout.go | 1 - gbus/serialization/proto.go | 1 - gbus/tx/mysql/sagastore.go | 2 -- gbus/tx/mysql/txoutbox.go | 3 +-- gbus/worker.go | 6 +++--- tests/bus_test.go | 7 +++---- 8 files changed, 10 insertions(+), 17 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index de14d8c..52ce2bc 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -5,13 +5,13 @@ import ( "database/sql" "errors" "fmt" - "github.com/opentracing-contrib/go-amqp/amqptracer" - "github.com/opentracing/opentracing-go" - slog "github.com/opentracing/opentracing-go/log" "runtime/debug" "sync" "time" + "github.com/opentracing-contrib/go-amqp/amqptracer" + "github.com/opentracing/opentracing-go" + slog "github.com/opentracing/opentracing-go/log" "github.com/rs/xid" log "github.com/sirupsen/logrus" "github.com/streadway/amqp" diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 882e1dd..61643ba 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -3,7 +3,6 @@ package saga import ( "database/sql" "fmt" - "reflect" "strings" "sync" diff --git a/gbus/saga/timeout.go b/gbus/saga/timeout.go index 1f8fe8e..4592c35 100644 --- a/gbus/saga/timeout.go +++ b/gbus/saga/timeout.go @@ -5,7 +5,6 @@ import ( "time" "github.com/sirupsen/logrus" - "github.com/wework/grabbit/gbus" ) diff --git a/gbus/serialization/proto.go b/gbus/serialization/proto.go index b627d2d..bff127a 100644 --- a/gbus/serialization/proto.go +++ b/gbus/serialization/proto.go @@ -10,7 +10,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" - "github.com/wework/grabbit/gbus" ) diff --git a/gbus/tx/mysql/sagastore.go b/gbus/tx/mysql/sagastore.go index dc31a23..75e263c 100644 --- a/gbus/tx/mysql/sagastore.go +++ b/gbus/tx/mysql/sagastore.go @@ -5,9 +5,7 @@ import ( "fmt" log "github.com/sirupsen/logrus" - "github.com/wework/grabbit/gbus" - "github.com/wework/grabbit/gbus/saga" "github.com/wework/grabbit/gbus/tx" ) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 4e1b420..39fe32b 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -9,9 +9,8 @@ import ( "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/rs/xid" + log "github.com/sirupsen/logrus" "github.com/streadway/amqp" "github.com/wework/grabbit/gbus" ) diff --git a/gbus/worker.go b/gbus/worker.go index 0cffe1f..756d2b5 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -5,9 +5,6 @@ import ( "database/sql" "errors" "fmt" - "github.com/opentracing-contrib/go-amqp/amqptracer" - "github.com/opentracing/opentracing-go" - slog "github.com/opentracing/opentracing-go/log" "reflect" "runtime" "runtime/debug" @@ -17,6 +14,9 @@ import ( "github.com/Rican7/retry" "github.com/Rican7/retry/backoff" "github.com/Rican7/retry/strategy" + "github.com/opentracing-contrib/go-amqp/amqptracer" + "github.com/opentracing/opentracing-go" + slog "github.com/opentracing/opentracing-go/log" log "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) diff --git a/tests/bus_test.go b/tests/bus_test.go index ad1bda9..049aef3 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -4,14 +4,13 @@ import ( "context" "database/sql" "fmt" - "github.com/opentracing/opentracing-go" - olog "github.com/opentracing/opentracing-go/log" - "github.com/opentracing/opentracing-go/mocktracer" - "reflect" "testing" "time" + "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" + "github.com/opentracing/opentracing-go/mocktracer" log "github.com/sirupsen/logrus" "github.com/streadway/amqp" "github.com/wework/grabbit/gbus" From 4a3e7cf4245cf0f1c62f762cf3e0d3a4c23427bb Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sat, 20 Apr 2019 15:21:50 +0300 Subject: [PATCH 3/7] added some more logs --- gbus/worker.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/gbus/worker.go b/gbus/worker.go index 756d2b5..65cb241 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -268,6 +268,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { err = worker.Reject(false, delivery) if err != nil { span.LogFields(slog.Error(err)) + worker.log().WithError(err).Error("could not reject message") } return } @@ -293,6 +294,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { //remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue err = worker.Ack(delivery) if err != nil { + worker.log().WithError(err).Error("could not ack the message") span.LogFields(slog.Error(err)) } return @@ -308,6 +310,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { //reject the message but requeue it so it gets redelivered until we can create transactions err = worker.Reject(true, delivery) if err != nil { + worker.log().WithError(err).Error("failed to reject message") span.LogFields(slog.Error(err)) } return From ec1143fefb8de86491c95067e2ce5c5499cb1529 Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sat, 20 Apr 2019 15:22:55 +0300 Subject: [PATCH 4/7] removed some usless code --- gbus/tx/mysql/txoutbox.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 39fe32b..21660ba 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -353,16 +353,6 @@ func (outbox *TxOutbox) migrate0_9To1_0(tx *sql.Tx, svcName string) error { return nil } -func (outbox *TxOutbox) migrate1_0To1_1(tx *sql.Tx, svcName string) error { - tblName := getOutboxName(svcName) - alter := `ALTER TABLE ` + tblName + ` ADD COLUMN ctx ;` - _, execErr := tx.Exec(alter) - if execErr != nil { - outbox.log().WithField("sql_err", execErr).Info("renaming column") - } - return nil -} - func getOutboxName(svcName string) string { return strings.ToLower("grabbit_" + sanitizeTableName(svcName) + "_outbox") From 31b73a1cfd6cddbdf3b07d6917937498f6828d6f Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sun, 21 Apr 2019 17:55:32 +0300 Subject: [PATCH 5/7] code review changes and some refactoring --- gbus/bus.go | 10 +--- gbus/messages.go | 13 +++++ gbus/worker.go | 148 ++++++++++++++++++++--------------------------- 3 files changed, 77 insertions(+), 94 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index 52ce2bc..c1a8cd2 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -583,15 +583,7 @@ func (b *DefaultBus) sendImpl(ctx context.Context, tx *sql.Tx, toService, replyT ContentEncoding: b.Serializer.Name(), Headers: headers, } - span.LogFields( - slog.String("message", message.PayloadFQN), - slog.String("ID", message.ID), - slog.String("SagaID", message.SagaID), - slog.String("CorrelationID", message.CorrelationID), - slog.String("SagaCorrelationID", message.SagaCorrelationID), - slog.String("Semantics", message.Semantics), - slog.String("replyTo", replyTo), - ) + span.LogFields(message.GetTraceLog()...) for _, defaultPolicy := range b.DefaultPolicies { defaultPolicy.Apply(&msg) diff --git a/gbus/messages.go b/gbus/messages.go index f5076ff..8234a1f 100644 --- a/gbus/messages.go +++ b/gbus/messages.go @@ -1,6 +1,7 @@ package gbus import ( + "github.com/opentracing/opentracing-go/log" "github.com/rs/xid" "github.com/streadway/amqp" ) @@ -60,6 +61,18 @@ func (bm *BusMessage) SetPayload(payload Message) { bm.Payload = payload } +func (bm *BusMessage) GetTraceLog() (fields []log.Field) { + return []log.Field{ + log.String("message", bm.PayloadFQN), + log.String("ID", bm.ID), + log.String("SagaID", bm.SagaID), + log.String("CorrelationID", bm.CorrelationID), + log.String("SagaCorrelationID", bm.SagaCorrelationID), + log.String("Semantics", bm.Semantics), + log.String("RPCID", bm.RPCID), + } +} + func castToString(i interface{}) string { v, ok := i.(string) if !ok { diff --git a/gbus/worker.go b/gbus/worker.go index 65cb241..5019e26 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -41,6 +41,7 @@ type worker struct { txProvider TxProvider amqpErrors chan *amqp.Error stop chan bool + span opentracing.Span } func (worker *worker) Start() error { @@ -189,14 +190,24 @@ func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery return handlers } -func (worker *worker) Ack(delivery amqp.Delivery) error { +func (worker *worker) ack(delivery amqp.Delivery) error { ack := func() error { return delivery.Ack(false /*multiple*/) } - return worker.SafeWithRetries(ack, MaxRetryCount) + err := worker.SafeWithRetries(ack, MaxRetryCount) + if err != nil { + worker.log().WithError(err).Error("could not ack the message") + worker.span.LogFields(slog.Error(err)) + } + return err } -func (worker *worker) Reject(requeue bool, delivery amqp.Delivery) error { +func (worker *worker) reject(requeue bool, delivery amqp.Delivery) error { reject := func() error { return delivery.Reject(requeue /*multiple*/) } - return worker.SafeWithRetries(reject, MaxRetryCount) + err := worker.SafeWithRetries(reject, MaxRetryCount) + if err != nil { + worker.log().WithError(err).Error("could not reject the message") + worker.span.LogFields(slog.Error(err)) + } + return err } func (worker *worker) isDead(delivery amqp.Delivery) bool { @@ -210,20 +221,28 @@ func (worker *worker) isDead(delivery amqp.Delivery) bool { func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { tx, txCreateErr := worker.txProvider.New() if txCreateErr != nil { - worker.Ack(delivery) + worker.log().WithError(txCreateErr).Error("failed creating new tx") + worker.span.LogFields(slog.Error(txCreateErr)) + _ = worker.ack(delivery) return } - deadErr := worker.deadletterHandler(tx, delivery) - if deadErr != nil { - worker.SafeWithRetries(tx.Rollback, MaxRetryCount) - + var fn func() error + err := worker.deadletterHandler(tx, delivery) + if err != nil { + worker.log().WithError(err).Error("failed handling deadletter") + worker.span.LogFields(slog.Error(err)) + fn = tx.Rollback } else { - worker.SafeWithRetries(tx.Commit, MaxRetryCount) + fn = tx.Commit + } + err = worker.SafeWithRetries(fn, MaxRetryCount) + if err != nil { + worker.log().WithError(err).Error("Rollback/Commit deadletter handler message") + worker.span.LogFields(slog.Error(err)) } } func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { - var span opentracing.Span var ctx context.Context var spanOptions []opentracing.StartSpanOption @@ -233,29 +252,29 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { } else { spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx)) } - span, ctx = opentracing.StartSpanFromContext(context.Background(), "processMessage", spanOptions...) + worker.span, ctx = opentracing.StartSpanFromContext(context.Background(), "processMessage", spanOptions...) //catch all error handling so goroutine will not crash defer func() { if r := recover(); r != nil { logEntry := worker.log().WithField("worker", worker.consumerTag) if err, ok := r.(error); ok { - span.LogFields(slog.Error(err)) + worker.span.LogFields(slog.Error(err)) logEntry = logEntry.WithError(err) } else { logEntry = logEntry.WithField("panic", r) } - span.LogFields(slog.String("panic", "failed to process message")) + worker.span.LogFields(slog.String("panic", "failed to process message")) logEntry.Error("failed to process message") } - span.Finish() + worker.span.Finish() }() worker.log().WithFields(log.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") //handle a message that originated from a deadletter exchange if worker.isDead(delivery) { - span.LogFields(slog.Error(errors.New("delivery is dead"))) + worker.span.LogFields(slog.Error(errors.New("handling dead-letter delivery"))) worker.log().Info("invoking deadletter handler") worker.invokeDeadletterHandler(delivery) return @@ -263,24 +282,13 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { bm, err := worker.extractBusMessage(delivery) if err != nil { - span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) + worker.span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) //reject poison message - err = worker.Reject(false, delivery) - if err != nil { - span.LogFields(slog.Error(err)) - worker.log().WithError(err).Error("could not reject message") - } + _ = worker.reject(false, delivery) return } - span.LogFields( - slog.String("message", bm.PayloadFQN), - slog.String("ID", bm.ID), - slog.String("SagaID", bm.SagaID), - slog.String("CorrelationID", bm.CorrelationID), - slog.String("SagaCorrelationID", bm.SagaCorrelationID), - slog.String("Semantics", bm.Semantics), - slog.Bool("isRPCreply", isRPCreply), - ) + worker.span.LogFields(bm.GetTraceLog()...) + //TODO:Dedup message handlers := worker.resolveHandlers(isRPCreply, bm, delivery) if len(handlers) == 0 { @@ -289,14 +297,10 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { log.Fields{"message-name": bm.PayloadFQN, "message-type": bm.Semantics}). Warn("Message received but no handlers found") - span.LogFields(slog.String("grabbit", "no handlers found")) + worker.span.LogFields(slog.String("grabbit", "no handlers found")) // worker.log("Message received but no handlers found\nMessage name:%v\nMessage Type:%v\nRejecting message", bm.PayloadFQN, bm.Semantics) //remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue - err = worker.Ack(delivery) - if err != nil { - worker.log().WithError(err).Error("could not ack the message") - span.LogFields(slog.Error(err)) - } + _ = worker.ack(delivery) return } @@ -306,91 +310,65 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { tx, txErr = worker.txProvider.New() if txErr != nil { worker.log().WithError(txErr).Error("failed to create transaction") - span.LogFields(slog.Error(txErr)) + worker.span.LogFields(slog.Error(txErr)) //reject the message but requeue it so it gets redelivered until we can create transactions - err = worker.Reject(true, delivery) - if err != nil { - worker.log().WithError(err).Error("failed to reject message") - span.LogFields(slog.Error(err)) - } + _ = worker.reject(true, delivery) return } } - var ackErr, commitErr, rollbackErr, rejectErr error - invkErr := worker.invokeHandlers(ctx, handlers, bm, &delivery, tx) + err = worker.invokeHandlers(ctx, handlers, bm, &delivery, tx) // if all handlers executed with out errors then commit the transactional if the bus is transactional // if the tranaction committed successfully then ack the message. // if the bus is not transactional then just ack the message - if invkErr == nil { - + if err == nil { if worker.isTxnl { - commitErr = worker.SafeWithRetries(tx.Commit, MaxRetryCount) - if commitErr == nil { + err = worker.SafeWithRetries(tx.Commit, MaxRetryCount) + if err == nil { worker.log().Info("bus transaction committed successfully ") //ack the message - ackErr = worker.Ack(delivery) - if ackErr != nil { - // if this fails then the message will be eventually redeilvered by RabbitMQ - //sp handlers should be idempotent. - span.LogFields(slog.Error(ackErr)) - worker.log().WithError(ackErr).Warn("failed to send ack to the broker") - } - + _ = worker.ack(delivery) } else { - span.LogFields(slog.Error(commitErr)) - worker.log().WithError(commitErr).Error("failed committing transaction") + worker.span.LogFields(slog.Error(err)) + worker.log().WithError(err).Error("failed committing transaction") //if the commit failed we will reject the message - err = worker.Reject(false, delivery) - if err != nil { - span.LogFields(slog.Error(err)) - worker.log().WithError(err).Warn("failed to send reject to the broker") - - } + _ = worker.reject(false, delivery) } } else { /*if the bus in not transactional just try acking the message*/ - ackErr = worker.Ack(delivery) - if ackErr != nil { - span.LogFields(slog.Error(ackErr)) - worker.log().WithError(ackErr).Warn("failed to send ack to the broker") - } + _ = worker.ack(delivery) } //else there was an error in the invokation then try rollingback the transaction and reject the message } else { - span.LogFields(slog.Error(invkErr)) - worker.log().WithError(invkErr).WithFields(log.Fields{"message_name": bm.PayloadFQN, "semantics": bm.Semantics}).Error("Failed to consume message due to failure of one or more handlers.\n Message rejected as poison") + worker.span.LogFields(slog.Error(err)) + worker.log().WithError(err).WithFields(log.Fields{"message_name": bm.PayloadFQN, "semantics": bm.Semantics}).Error("Failed to consume message due to failure of one or more handlers.\n Message rejected as poison") if worker.isTxnl { worker.log().Warn("rolling back transaction") - rollbackErr = worker.SafeWithRetries(tx.Rollback, MaxRetryCount) + err = worker.SafeWithRetries(tx.Rollback, MaxRetryCount) - if rollbackErr != nil { - span.LogFields(slog.Error(rollbackErr)) - worker.log().WithError(rollbackErr).Error("failed to rollback transaction") + if err != nil { + worker.span.LogFields(slog.Error(err)) + worker.log().WithError(err).Error("failed to rollback transaction") } } - rejectErr = worker.Reject(false, delivery) - if rejectErr != nil { - span.LogFields(slog.Error(rejectErr)) - worker.log().WithError(rejectErr).Error("failed to reject message") - } + _ = worker.reject(false, delivery) } } func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHandler, message *BusMessage, delivery *amqp.Delivery, tx *sql.Tx) (err error) { action := func(attempts uint) (actionErr error) { - span, sctx := opentracing.StartSpanFromContext(sctx, "invokeHandlers") - span.LogFields(slog.Uint64("attempt", uint64(attempts+1))) + worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers") + worker.span.LogFields(slog.Uint64("attempt", uint64(attempts+1))) defer func() { if p := recover(); p != nil { pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") actionErr = errors.New(pncMsg) - span.LogFields(slog.Error(actionErr)) + worker.span.LogFields(slog.Error(actionErr)) } - span.Finish() + worker.span.Finish() }() for _, handler := range handlers { hspan, hsctx := opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()) From ed2e7da7bf9e1696cb5a076476bb0cd812638f6d Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sun, 21 Apr 2019 17:57:33 +0300 Subject: [PATCH 6/7] fixed code review change moved b.SenderLock.Unlock() into its own defer --- gbus/bus.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index c1a8cd2..accd63b 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -551,15 +551,14 @@ func (b *DefaultBus) monitorAMQPErrors() { func (b *DefaultBus) sendImpl(ctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) { b.SenderLock.Lock() + defer b.SenderLock.Unlock() span, ctx := opentracing.StartSpanFromContext(ctx, "sendImpl") defer func() { if err := recover(); err != nil { - errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack()) er = errors.New(errMsg) span.LogFields(slog.Error(er)) } - b.SenderLock.Unlock() span.Finish() }() From e1a58afa2f38d47ded3200dd9e87f39b3b01044f Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sun, 21 Apr 2019 18:17:15 +0300 Subject: [PATCH 7/7] added tests for panic --- tests/bus_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/bus_test.go b/tests/bus_test.go index 049aef3..ff0fb51 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -299,6 +299,30 @@ func TestOpenTracingReporting(t *testing.T) { } } +func TestSendingPanic(t *testing.T) { + event := Event1{} + b := createBusForTest() + err := b.Start() + if err != nil { + t.Fatal(err) + } + defer func() { + err := b.Shutdown() + if err != nil { + t.Fatal(err) + } + }() + defer func() { + if p := recover(); p != nil { + t.Fatal("expected not to have to recover this should be handled in grabbit", p) + } + }() + err = b.Publish(context.Background(), "test_exchange", "test_topic", gbus.NewBusMessage(event), &panicPolicy{}) + if err == nil { + t.Fatal("Expected to panic and return an error but not crash") + } +} + func noopTraceContext() context.Context { return context.Background() // tracer := opentracing.NoopTracer{} @@ -306,3 +330,10 @@ func noopTraceContext() context.Context { // ctx := opentracing.ContextWithSpan(context.Background(), span) // return ctx } + +type panicPolicy struct { +} + +func (p panicPolicy) Apply(publishing *amqp.Publishing) { + panic("vlad") +}