From e3f27d5c414640077da2ae7e3b59dcaf147975e0 Mon Sep 17 00:00:00 2001 From: Hugo Haas Date: Thu, 30 May 2019 07:44:19 -0700 Subject: [PATCH 01/10] Fixed typo in backpressure variable name --- gbus/bus.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index 54a2368..7154e07 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -56,7 +56,7 @@ type DefaultBus struct { DefaultPolicies []MessagePolicy Confirm bool healthChan chan error - backpreasure bool + backpressure bool rabbitFailure bool DbPingTimeout time.Duration } @@ -358,7 +358,7 @@ func (b *DefaultBus) GetHealth() HealthCard { return HealthCard{ DbConnected: dbConnected, - RabbitBackPressure: b.backpreasure, + RabbitBackPressure: b.backpressure, RabbitConnected: !b.rabbitFailure, } } @@ -575,7 +575,7 @@ func (b *DefaultBus) monitorAMQPErrors() { } else { b.log().WithField("reason", blocked.Reason).Info("amqp connection unblocked") } - b.backpreasure = blocked.Active + b.backpressure = blocked.Active case amqpErr := <-b.amqpErrors: b.rabbitFailure = true b.log().WithField("amqp_error", amqpErr).Error("amqp error") @@ -648,9 +648,9 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply } return saveErr } - //do not attempt to contact the borker if backpreasure is being applied - if b.backpreasure { - return errors.New("can't send message due to backpreasure from amqp broker") + //do not attempt to contact the borker if backpressure is being applied + if b.backpressure { + return errors.New("can't send message due to backpressure from amqp broker") } _, outgoingErr := b.Outgoing.Post(exchange, key, msg) return outgoingErr From 592852b807f9ead623e5c56193fe949bed4c4283 Mon Sep 17 00:00:00 2001 From: Hugo Haas Date: Thu, 30 May 2019 07:45:59 -0700 Subject: [PATCH 02/10] Added instructions about dependencies installation --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 9123586..2367aee 100644 --- a/README.md +++ b/README.md @@ -141,5 +141,6 @@ if e != nil{ ## Testing +0) ensure that you have the dependencies installed: `go get -v -t -d ./...` 1) make sure to first: `docker-compose up -V -d` 2) then to run the tests: `go test ./...` From c12d674d82dd4a7017f8c1b435abc6448bd32272 Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Tue, 11 Jun 2019 14:12:48 +0300 Subject: [PATCH 03/10] feat(logger) added logger to be passed through the sagas --- gbus/abstractions.go | 10 ++++- gbus/builder/builder.go | 23 +++++++++--- gbus/bus.go | 82 +++++++++++++++++++++-------------------- gbus/invocation.go | 8 ++++ gbus/saga/glue.go | 4 +- gbus/saga/instance.go | 1 - gbus/saga/invocation.go | 8 ++++ gbus/worker.go | 50 ++++++++++++++----------- 8 files changed, 117 insertions(+), 69 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 42f5410..a44e8e4 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -3,6 +3,7 @@ package gbus import ( "context" "database/sql" + "github.com/sirupsen/logrus" "time" "github.com/streadway/amqp" @@ -17,7 +18,7 @@ const ( //BusConfiguration provides configuration passed to the bus builder type BusConfiguration struct { - MaxRetryCount uint + MaxRetryCount uint BaseRetryDuration int } @@ -29,6 +30,7 @@ type Bus interface { Messaging SagaRegister Health + Logged } //Message a common interface that passes to the serializers to allow decoding and encoding of content @@ -185,6 +187,7 @@ type Invocation interface { Bus() Messaging Tx() *sql.Tx Ctx() context.Context + Logger() logrus.FieldLogger Routing() (exchange, routingKey string) } @@ -209,3 +212,8 @@ type TxOutbox interface { Start(amqpOut *AMQPOutbox) error Stop() error } + +type Logged interface { + SetLogger(entry logrus.FieldLogger) + Logger() logrus.FieldLogger +} diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index b704015..9d622a8 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -2,6 +2,7 @@ package builder import ( "fmt" + "github.com/sirupsen/logrus" "sync" "time" @@ -27,13 +28,14 @@ type defaultBuilder struct { confirm bool dbPingTimeout time.Duration usingPingTimeout bool + logger logrus.FieldLogger } func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb := &gbus.DefaultBus{ AmqpConnStr: builder.connStr, - PrefetchCount: 1, + PrefetchCount: builder.PrefetchCount, Outgoing: &gbus.AMQPOutbox{ SvcName: svcName, }, @@ -50,15 +52,21 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { Serializer: builder.serializer, DLX: builder.dlx, DefaultPolicies: builder.defaultPolicies, - DbPingTimeout: 3} + DbPingTimeout: 3, + Confirm: builder.confirm, + } + + if builder.logger != nil { + gb.SetLogger(builder.logger.WithField("_service", svcName)) + } else { + gb.SetLogger(logrus.WithField("_service", svcName)) + } - gb.Confirm = builder.confirm if builder.workerNum < 1 { gb.WorkerNum = 1 } else { gb.WorkerNum = builder.workerNum } - gb.PrefetchCount = builder.PrefetchCount var ( sagaStore saga.Store ) @@ -175,11 +183,16 @@ func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) g gbus.MaxRetryCount = config.MaxRetryCount } if config.BaseRetryDuration > 0 { - gbus.BaseRetryDuration = time.Millisecond*time.Duration(config.BaseRetryDuration) + gbus.BaseRetryDuration = time.Millisecond * time.Duration(config.BaseRetryDuration) } return builder } +func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builder { + builder.logger = logger + return builder +} + //New :) func New() Nu { return Nu{} diff --git a/gbus/bus.go b/gbus/bus.go index 8c65c98..8032fc8 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -13,11 +13,12 @@ import ( "github.com/opentracing/opentracing-go" slog "github.com/opentracing/opentracing-go/log" "github.com/rs/xid" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) var _ SagaRegister = &DefaultBus{} +var _ Bus = &DefaultBus{} //DefaultBus implements the Bus interface type DefaultBus struct { @@ -36,6 +37,7 @@ type DefaultBus struct { amqpErrors chan *amqp.Error amqpBlocks chan amqp.Blocking Registrations []*Registration + logger logrus.FieldLogger RPCHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error @@ -67,7 +69,7 @@ var ( MaxRetryCount uint = 3 //BaseRetryDuration defines the basic milliseconds that the retry algorithm uses //for a random retry time. Default is 10 but it is configurable. - BaseRetryDuration = 10*time.Millisecond + BaseRetryDuration = 10 * time.Millisecond //RpcHeaderName used to define the header in grabbit for RPC RpcHeaderName = "x-grabbit-msg-rpc-id" ) @@ -97,7 +99,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) { if b.PurgeOnStartup { msgsPurged, purgeError := b.AMQPChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/) if purgeError != nil { - b.log().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue") + b.Logger().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue") return q, purgeError } } @@ -113,7 +115,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) { false, /*noWait*/ args /*args*/) if e != nil { - b.log().WithError(e).Error("failed to declare queue") + b.Logger().WithError(e).Error("failed to declare queue") } b.serviceQueue = q return q, e @@ -130,12 +132,12 @@ func (b *DefaultBus) bindServiceQueue() error { false, /*noWait*/ nil /*args amqp.Table*/) if err != nil { - b.log().WithError(err).Error("could not declare exchange") + b.Logger().WithError(err).Error("could not declare exchange") return err } err = b.bindQueue("", b.DLX) if err != nil { - b.log().WithError(err).Error("could not bind exchange") + b.Logger().WithError(err).Error("could not bind exchange") return err } } @@ -150,12 +152,12 @@ func (b *DefaultBus) bindServiceQueue() error { false, /*noWait*/ nil /*args amqp.Table*/) if e != nil { - b.log().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange") + b.Logger().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange") return e } e = b.bindQueue(topic, exchange) if e != nil { - b.log().WithError(e).WithFields(log.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange") + b.Logger().WithError(e).WithFields(logrus.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange") return e } @@ -207,7 +209,7 @@ func (b *DefaultBus) Start() error { var amqpChan *amqp.Channel if amqpChan, e = b.createAMQPChannel(b.amqpConn); e != nil { - b.log().WithError(e).Error("failed to create amqp channel for transactional outbox") + b.Logger().WithError(e).Error("failed to create amqp channel for transactional outbox") return e } amqpChan.NotifyClose(b.amqpErrors) @@ -216,11 +218,11 @@ func (b *DefaultBus) Start() error { } err := amqpOutbox.init(amqpChan, b.Confirm, false) if err != nil { - b.log().WithError(err).Error("failed initializing amqpOutbox") + b.Logger().WithError(err).Error("failed initializing amqpOutbox") return err } if startErr := b.Outbox.Start(amqpOutbox); startErr != nil { - b.log().WithError(startErr).Error("failed to start transactional outbox") + b.Logger().WithError(startErr).Error("failed to start transactional outbox") return startErr } @@ -236,7 +238,7 @@ func (b *DefaultBus) Start() error { //bind queue err := b.bindServiceQueue() if err != nil { - b.log().WithError(err).Error("could not bind service to queue") + b.Logger().WithError(err).Error("could not bind service to queue") return err } @@ -246,11 +248,11 @@ func (b *DefaultBus) Start() error { return e } - b.log().WithField("number_of_workers", b.WorkerNum).Info("initiating workers") + b.Logger().WithField("number_of_workers", b.WorkerNum).Info("initiating workers") workers, createWorkersErr := b.createBusWorkers(b.WorkerNum) if createWorkersErr != nil { - b.log().WithError(createWorkersErr).Error("error creating channel for worker") + b.Logger().WithError(createWorkersErr).Error("error creating channel for worker") return createWorkersErr } @@ -274,7 +276,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { qosErr := amqpChan.Qos(int(b.PrefetchCount), 0, false) if qosErr != nil { - log.Printf("failed to set worker qos\n %v", qosErr) + b.Logger().Printf("failed to set worker qos\n %v", qosErr) } tag := fmt.Sprintf("%s_worker_%d", b.SvcName, i) @@ -298,7 +300,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { err := w.Start() if err != nil { - log.WithError(err).Error("failed to start worker") + b.Logger().WithError(err).Error("failed to start worker") } workers = append(workers, w) @@ -309,19 +311,19 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { //Shutdown implements GBus.Start() func (b *DefaultBus) Shutdown() (shutdwonErr error) { - b.log().Info("Bus shuting down") + b.Logger().Info("Bus shuting down") defer func() { if p := recover(); p != nil { pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) shutdwonErr = errors.New(pncMsg) - b.log().WithError(shutdwonErr).Error("error when shutting down bus") + b.Logger().WithError(shutdwonErr).Error("error when shutting down bus") } }() for _, worker := range b.workers { err := worker.Stop() if err != nil { - b.log().WithError(err).Error("could not stop worker") + b.Logger().WithError(err).Error("could not stop worker") return err } } @@ -332,7 +334,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { err := b.Outbox.Stop() if err != nil { - b.log().WithError(err).Error("could not shutdown outbox") + b.Logger().WithError(err).Error("could not shutdown outbox") return err } b.TxProvider.Dispose() @@ -384,7 +386,7 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er newTx, newTxErr := b.TxProvider.New() if newTxErr != nil { - b.log().WithError(newTxErr).Error("failed to create transaction when sending a transactional message") + b.Logger().WithError(newTxErr).Error("failed to create transaction when sending a transactional message") return newTxErr } activeTx = newTx @@ -405,12 +407,12 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er if actionErr != nil { err := activeTx.Rollback() if err != nil { - b.log().WithError(err).Error("could not rollback transaction") + b.Logger().WithError(err).Error("could not rollback transaction") } } else { commitErr := activeTx.Commit() if commitErr != nil { - b.log().WithError(commitErr).Error("could not commit transaction") + b.Logger().WithError(commitErr).Error("could not commit transaction") return commitErr } } @@ -449,7 +451,7 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu b.Serializer.Register(reply.Payload) err := b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc) if err != nil { - b.log().WithError(err).Error("could not send message") + b.Logger().WithError(err).Error("could not send message") return nil, err } @@ -560,28 +562,20 @@ func (b *DefaultBus) connect(retryCount uint) (*amqp.Connection, error) { } -// -// func (b *DefaultBus) log(format string, v ...interface{}) { -// log.WithField("Service", b.SvcName).Infof(format, v...) -// } - -func (b *DefaultBus) log() *log.Entry { - return log.WithField("_service", b.SvcName) -} func (b *DefaultBus) monitorAMQPErrors() { for b.started { select { case blocked := <-b.amqpBlocks: if blocked.Active { - b.log().WithField("reason", blocked.Reason).Warn("amqp connection blocked") + b.Logger().WithField("reason", blocked.Reason).Warn("amqp connection blocked") } else { - b.log().WithField("reason", blocked.Reason).Info("amqp connection unblocked") + b.Logger().WithField("reason", blocked.Reason).Info("amqp connection unblocked") } b.backpressure = blocked.Active case amqpErr := <-b.amqpErrors: b.amqpConnected = false - b.log().WithField("amqp_error", amqpErr).Error("amqp error") + b.Logger().WithField("amqp_error", amqpErr).Error("amqp error") if b.healthChan != nil { b.healthChan <- amqpErr } @@ -605,12 +599,12 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply headers := message.GetAMQPHeaders() err := amqptracer.Inject(span, headers) if err != nil { - b.log().WithError(err).Error("could not inject headers") + b.Logger().WithError(err).Error("could not inject headers") } buffer, err := b.Serializer.Encode(message.Payload) if err != nil { - b.log().WithError(err).WithField("message", message).Error("failed to send message, encoding of message failed") + b.Logger().WithError(err).WithField("message", message).Error("failed to send message, encoding of message failed") return err } @@ -644,10 +638,10 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply //send to the transactional outbox if the bus is transactional //otherwise send directly to amqp if b.IsTxnl && tx != nil { - b.log().WithField("message_id", msg.MessageId).Debug("sending message to outbox") + b.Logger().WithField("message_id", msg.MessageId).Debug("sending message to outbox") saveErr := b.Outbox.Save(tx, exchange, key, msg) if saveErr != nil { - log.WithError(saveErr).Error("failed to save to transactional outbox") + b.Logger().WithError(saveErr).Error("failed to save to transactional outbox") } return saveErr } @@ -664,7 +658,7 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply err = b.SafeWithRetries(publish, MaxRetryCount) if err != nil { - log.Printf("failed publishing message.\n error:%v", err) + b.Logger().Printf("failed publishing message.\n error:%v", err) return err } return err @@ -698,3 +692,11 @@ type rpcPolicy struct { func (p rpcPolicy) Apply(publishing *amqp.Publishing) { publishing.Headers[RpcHeaderName] = p.rpcID } + +func (b *DefaultBus) SetLogger(entry logrus.FieldLogger) { + b.logger = entry +} + +func (b *DefaultBus) Logger() logrus.FieldLogger { + return b.logger +} diff --git a/gbus/invocation.go b/gbus/invocation.go index e854e14..c39a2d4 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -8,6 +8,9 @@ import ( "github.com/sirupsen/logrus" ) +var _ Invocation = &defaultInvocationContext{} +var _ Messaging = &defaultInvocationContext{} + type defaultInvocationContext struct { invocingSvc string bus *DefaultBus @@ -16,6 +19,11 @@ type defaultInvocationContext struct { ctx context.Context exchange string routingKey string + logger logrus.FieldLogger +} + +func (dfi *defaultInvocationContext) Logger() logrus.FieldLogger { + return dfi.logger } func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error { diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index d09b1d5..f3e8a11 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -190,7 +190,9 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat inboundMsg: message, sagaID: instance.ID, ctx: invocation.Ctx(), - invokingService: imsm.svcName} + invokingService: imsm.svcName, + logger: imsm.log().WithField("saga_id", instance.ID), + } exchange, routingKey := invocation.Routing() return instance.invoke(exchange, routingKey, sginv, message) diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 977eb3b..0dee955 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -90,7 +90,6 @@ func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error { func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance { - var newSagaPtr interface{} if sagaType.Kind() == reflect.Ptr { newSagaPtr = reflect.New(sagaType).Elem().Interface() diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index f2725af..36e3eec 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -3,11 +3,14 @@ package saga import ( "context" "database/sql" + "github.com/sirupsen/logrus" "time" "github.com/wework/grabbit/gbus" ) +var _ gbus.Invocation = &sagaInvocation{} + type sagaInvocation struct { decoratedBus gbus.Messaging decoratedInvocation gbus.Invocation @@ -15,6 +18,7 @@ type sagaInvocation struct { sagaID string ctx context.Context invokingService string + logger logrus.FieldLogger } func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bool) { @@ -76,3 +80,7 @@ func (si *sagaInvocation) RPC(ctx context.Context, service string, request, func (si *sagaInvocation) Routing() (exchange, routingKey string) { return si.decoratedInvocation.Routing() } + +func (si *sagaInvocation) Logger() logrus.FieldLogger { + panic("implement me") +} diff --git a/gbus/worker.go b/gbus/worker.go index ba928f0..362c5ae 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -19,7 +19,7 @@ import ( "github.com/opentracing-contrib/go-amqp/amqptracer" "github.com/opentracing/opentracing-go" slog "github.com/opentracing/opentracing-go/log" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) @@ -146,7 +146,7 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er } if bm.PayloadFQN == "" || bm.Semantics == "" { //TODO: Log poison pill message - worker.log().WithFields(log.Fields{"fqn": bm.PayloadFQN, "semantics": bm.Semantics}).Warn("message received but no headers found...rejecting message") + worker.log().WithFields(logrus.Fields{"fqn": bm.PayloadFQN, "semantics": bm.Semantics}).Warn("message received but no headers found...rejecting message") return nil, errors.New("missing critical headers") } @@ -190,7 +190,7 @@ func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery } } - worker.log().WithFields(log.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers") + worker.log().WithFields(logrus.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers") return handlers } @@ -212,14 +212,14 @@ func (worker *worker) ack(delivery amqp.Delivery) error { func (worker *worker) reject(requeue bool, delivery amqp.Delivery) error { reject := func(attempts uint) error { return delivery.Reject(requeue /*multiple*/) } - worker.log().WithFields(log.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("rejecting message") + worker.log().WithFields(logrus.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("rejecting message") err := retry.Retry(reject, strategy.Wait(100*time.Millisecond)) if err != nil { worker.log().WithError(err).Error("could not reject the message") worker.span.LogFields(slog.Error(err)) } - worker.log().WithFields(log.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("message rejected") + worker.log().WithFields(logrus.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("message rejected") return err } @@ -283,7 +283,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { worker.span.Finish() }() - worker.log().WithFields(log.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") + worker.log().WithFields(logrus.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") //handle a message that originated from a deadletter exchange if worker.isDead(delivery) { @@ -307,7 +307,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { if len(handlers) == 0 { worker.log(). WithFields( - log.Fields{"message-name": bm.PayloadFQN, + logrus.Fields{"message-name": bm.PayloadFQN, "message-type": bm.Semantics}). Warn("Message received but no handlers found") worker.span.LogFields(slog.String("grabbit", "no handlers found")) @@ -359,9 +359,11 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan } worker.span.Finish() }() + var handlerErr error + var hspan opentracing.Span + var hsctx context.Context for _, handler := range handlers { - hspan, hsctx := opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()) - hspan.Finish() + hspan, hsctx = opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()) ctx := &defaultInvocationContext{ invocingSvc: delivery.ReplyTo, @@ -370,18 +372,26 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan tx: tx, ctx: hsctx, exchange: delivery.Exchange, - routingKey: delivery.RoutingKey} - handlerErr := handler(ctx, message) + routingKey: delivery.RoutingKey, + logger: worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()), + } + handlerErr = handler(ctx, message) if handlerErr != nil { hspan.LogFields(slog.Error(handlerErr)) - if worker.isTxnl { - rbkErr := tx.Rollback() - if rbkErr != nil { - worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") - } + break + } + hspan.Finish() + } + if handlerErr != nil { + hspan.LogFields(slog.Error(handlerErr)) + if worker.isTxnl { + rbkErr := tx.Rollback() + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") } - return handlerErr } + hspan.Finish() + return handlerErr } if worker.isTxnl { cmtErr := tx.Commit() @@ -404,10 +414,8 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan )) } -func (worker *worker) log() *log.Entry { - - return log.WithFields(log.Fields{ - "_service": worker.svcName}) +func (worker *worker) log() logrus.FieldLogger { + return worker.b.Logger() } func (worker *worker) AddRegistration(registration *Registration) { From ddc9ba20b9264a56bb9d9701c985984075c3c5be Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Tue, 11 Jun 2019 17:41:21 +0300 Subject: [PATCH 04/10] fix(code:review) some small changes and added more fields to the default logger --- gbus/abstractions.go | 8 +++-- gbus/builder/builder.go | 4 +-- gbus/bus.go | 70 ++++++++++++++++++------------------- gbus/invocation.go | 4 +-- gbus/saga/glue.go | 22 +++++++----- gbus/saga/instance.go | 3 +- gbus/saga/invocation.go | 7 ++-- gbus/serialization/proto.go | 5 ++- gbus/worker.go | 4 +-- 9 files changed, 66 insertions(+), 61 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index a44e8e4..4647af8 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -187,7 +187,7 @@ type Invocation interface { Bus() Messaging Tx() *sql.Tx Ctx() context.Context - Logger() logrus.FieldLogger + Log() FieldLogger Routing() (exchange, routingKey string) } @@ -213,7 +213,9 @@ type TxOutbox interface { Stop() error } +type FieldLogger logrus.FieldLogger + type Logged interface { - SetLogger(entry logrus.FieldLogger) - Logger() logrus.FieldLogger + SetLogger(entry FieldLogger) + Log() FieldLogger } diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 9d622a8..fa0e50b 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -28,7 +28,7 @@ type defaultBuilder struct { confirm bool dbPingTimeout time.Duration usingPingTimeout bool - logger logrus.FieldLogger + logger gbus.FieldLogger } func (builder *defaultBuilder) Build(svcName string) gbus.Bus { @@ -188,7 +188,7 @@ func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) g return builder } -func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builder { +func (builder *defaultBuilder) WithLogger(logger gbus.FieldLogger) gbus.Builder { builder.logger = logger return builder } diff --git a/gbus/bus.go b/gbus/bus.go index 8032fc8..4ba97df 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -37,7 +37,7 @@ type DefaultBus struct { amqpErrors chan *amqp.Error amqpBlocks chan amqp.Blocking Registrations []*Registration - logger logrus.FieldLogger + log FieldLogger RPCHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error @@ -99,7 +99,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) { if b.PurgeOnStartup { msgsPurged, purgeError := b.AMQPChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/) if purgeError != nil { - b.Logger().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue") + b.Log().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue") return q, purgeError } } @@ -115,7 +115,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) { false, /*noWait*/ args /*args*/) if e != nil { - b.Logger().WithError(e).Error("failed to declare queue") + b.Log().WithError(e).Error("failed to declare queue") } b.serviceQueue = q return q, e @@ -132,12 +132,12 @@ func (b *DefaultBus) bindServiceQueue() error { false, /*noWait*/ nil /*args amqp.Table*/) if err != nil { - b.Logger().WithError(err).Error("could not declare exchange") + b.Log().WithError(err).Error("could not declare exchange") return err } err = b.bindQueue("", b.DLX) if err != nil { - b.Logger().WithError(err).Error("could not bind exchange") + b.Log().WithError(err).Error("could not bind exchange") return err } } @@ -152,12 +152,12 @@ func (b *DefaultBus) bindServiceQueue() error { false, /*noWait*/ nil /*args amqp.Table*/) if e != nil { - b.Logger().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange") + b.Log().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange") return e } e = b.bindQueue(topic, exchange) if e != nil { - b.Logger().WithError(e).WithFields(logrus.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange") + b.Log().WithError(e).WithFields(logrus.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange") return e } @@ -209,7 +209,7 @@ func (b *DefaultBus) Start() error { var amqpChan *amqp.Channel if amqpChan, e = b.createAMQPChannel(b.amqpConn); e != nil { - b.Logger().WithError(e).Error("failed to create amqp channel for transactional outbox") + b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox") return e } amqpChan.NotifyClose(b.amqpErrors) @@ -218,11 +218,11 @@ func (b *DefaultBus) Start() error { } err := amqpOutbox.init(amqpChan, b.Confirm, false) if err != nil { - b.Logger().WithError(err).Error("failed initializing amqpOutbox") + b.Log().WithError(err).Error("failed initializing amqpOutbox") return err } if startErr := b.Outbox.Start(amqpOutbox); startErr != nil { - b.Logger().WithError(startErr).Error("failed to start transactional outbox") + b.Log().WithError(startErr).Error("failed to start transactional outbox") return startErr } @@ -238,7 +238,7 @@ func (b *DefaultBus) Start() error { //bind queue err := b.bindServiceQueue() if err != nil { - b.Logger().WithError(err).Error("could not bind service to queue") + b.Log().WithError(err).Error("could not bind service to queue") return err } @@ -248,11 +248,11 @@ func (b *DefaultBus) Start() error { return e } - b.Logger().WithField("number_of_workers", b.WorkerNum).Info("initiating workers") + b.Log().WithField("number_of_workers", b.WorkerNum).Info("initiating workers") workers, createWorkersErr := b.createBusWorkers(b.WorkerNum) if createWorkersErr != nil { - b.Logger().WithError(createWorkersErr).Error("error creating channel for worker") + b.Log().WithError(createWorkersErr).Error("error creating channel for worker") return createWorkersErr } @@ -276,7 +276,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { qosErr := amqpChan.Qos(int(b.PrefetchCount), 0, false) if qosErr != nil { - b.Logger().Printf("failed to set worker qos\n %v", qosErr) + b.Log().Printf("failed to set worker qos\n %v", qosErr) } tag := fmt.Sprintf("%s_worker_%d", b.SvcName, i) @@ -300,7 +300,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { err := w.Start() if err != nil { - b.Logger().WithError(err).Error("failed to start worker") + b.Log().WithError(err).Error("failed to start worker") } workers = append(workers, w) @@ -311,19 +311,19 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { //Shutdown implements GBus.Start() func (b *DefaultBus) Shutdown() (shutdwonErr error) { - b.Logger().Info("Bus shuting down") + b.Log().Info("Bus shuting down") defer func() { if p := recover(); p != nil { pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) shutdwonErr = errors.New(pncMsg) - b.Logger().WithError(shutdwonErr).Error("error when shutting down bus") + b.Log().WithError(shutdwonErr).Error("error when shutting down bus") } }() for _, worker := range b.workers { err := worker.Stop() if err != nil { - b.Logger().WithError(err).Error("could not stop worker") + b.Log().WithError(err).Error("could not stop worker") return err } } @@ -334,7 +334,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { err := b.Outbox.Stop() if err != nil { - b.Logger().WithError(err).Error("could not shutdown outbox") + b.Log().WithError(err).Error("could not shutdown outbox") return err } b.TxProvider.Dispose() @@ -386,7 +386,7 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er newTx, newTxErr := b.TxProvider.New() if newTxErr != nil { - b.Logger().WithError(newTxErr).Error("failed to create transaction when sending a transactional message") + b.Log().WithError(newTxErr).Error("failed to create transaction when sending a transactional message") return newTxErr } activeTx = newTx @@ -407,12 +407,12 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er if actionErr != nil { err := activeTx.Rollback() if err != nil { - b.Logger().WithError(err).Error("could not rollback transaction") + b.Log().WithError(err).Error("could not rollback transaction") } } else { commitErr := activeTx.Commit() if commitErr != nil { - b.Logger().WithError(commitErr).Error("could not commit transaction") + b.Log().WithError(commitErr).Error("could not commit transaction") return commitErr } } @@ -451,7 +451,7 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu b.Serializer.Register(reply.Payload) err := b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc) if err != nil { - b.Logger().WithError(err).Error("could not send message") + b.Log().WithError(err).Error("could not send message") return nil, err } @@ -568,14 +568,14 @@ func (b *DefaultBus) monitorAMQPErrors() { select { case blocked := <-b.amqpBlocks: if blocked.Active { - b.Logger().WithField("reason", blocked.Reason).Warn("amqp connection blocked") + b.Log().WithField("reason", blocked.Reason).Warn("amqp connection blocked") } else { - b.Logger().WithField("reason", blocked.Reason).Info("amqp connection unblocked") + b.Log().WithField("reason", blocked.Reason).Info("amqp connection unblocked") } b.backpressure = blocked.Active case amqpErr := <-b.amqpErrors: b.amqpConnected = false - b.Logger().WithField("amqp_error", amqpErr).Error("amqp error") + b.Log().WithField("amqp_error", amqpErr).Error("amqp error") if b.healthChan != nil { b.healthChan <- amqpErr } @@ -599,12 +599,12 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply headers := message.GetAMQPHeaders() err := amqptracer.Inject(span, headers) if err != nil { - b.Logger().WithError(err).Error("could not inject headers") + b.Log().WithError(err).Error("could not inject headers") } buffer, err := b.Serializer.Encode(message.Payload) if err != nil { - b.Logger().WithError(err).WithField("message", message).Error("failed to send message, encoding of message failed") + b.Log().WithError(err).WithField("message", message).Error("failed to send message, encoding of message failed") return err } @@ -638,10 +638,10 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply //send to the transactional outbox if the bus is transactional //otherwise send directly to amqp if b.IsTxnl && tx != nil { - b.Logger().WithField("message_id", msg.MessageId).Debug("sending message to outbox") + b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox") saveErr := b.Outbox.Save(tx, exchange, key, msg) if saveErr != nil { - b.Logger().WithError(saveErr).Error("failed to save to transactional outbox") + b.Log().WithError(saveErr).Error("failed to save to transactional outbox") } return saveErr } @@ -658,7 +658,7 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply err = b.SafeWithRetries(publish, MaxRetryCount) if err != nil { - b.Logger().Printf("failed publishing message.\n error:%v", err) + b.Log().Printf("failed publishing message.\n error:%v", err) return err } return err @@ -693,10 +693,10 @@ func (p rpcPolicy) Apply(publishing *amqp.Publishing) { publishing.Headers[RpcHeaderName] = p.rpcID } -func (b *DefaultBus) SetLogger(entry logrus.FieldLogger) { - b.logger = entry +func (b *DefaultBus) SetLogger(entry FieldLogger) { + b.log = entry } -func (b *DefaultBus) Logger() logrus.FieldLogger { - return b.logger +func (b *DefaultBus) Log() FieldLogger { + return b.log } diff --git a/gbus/invocation.go b/gbus/invocation.go index c39a2d4..e0412a9 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -19,10 +19,10 @@ type defaultInvocationContext struct { ctx context.Context exchange string routingKey string - logger logrus.FieldLogger + logger FieldLogger } -func (dfi *defaultInvocationContext) Logger() logrus.FieldLogger { +func (dfi *defaultInvocationContext) Log() FieldLogger { return dfi.logger } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index f3e8a11..4279af2 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -7,7 +7,7 @@ import ( "strings" "sync" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" ) @@ -72,7 +72,7 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error { } imsm.log(). - WithFields(log.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}). + WithFields(logrus.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}). Info("registered saga with messages") return nil @@ -113,7 +113,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) if startNew { newInstance := def.newInstance() imsm.log(). - WithFields(log.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). + WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). Info("created new saga") if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil { imsm.log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") @@ -129,7 +129,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) } if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { - imsm.log().WithFields(log.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") + imsm.log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") imsm.timeoutManger.RequestTimeout(imsm.svcName, newInstance.ID, duration) } } @@ -158,13 +158,13 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return e } else { - imsm.log().WithFields(log.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") + imsm.log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") instances, e := imsm.sagaStore.GetSagasByType(invocation.Tx(), def.sagaType) if e != nil { return e } - imsm.log().WithFields(log.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") + imsm.log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") for _, instance := range instances { def.configureSaga(instance) @@ -191,7 +191,11 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat sagaID: instance.ID, ctx: invocation.Ctx(), invokingService: imsm.svcName, - logger: imsm.log().WithField("saga_id", instance.ID), + log: imsm.log().WithFields(logrus.Fields{ + "saga_id": instance.ID, + "saga_type": instance.String(), + "message_name": message.PayloadFQN, + }), } exchange, routingKey := invocation.Routing() @@ -241,8 +245,8 @@ func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { return imsm.completeOrUpdateSaga(tx, saga) } -func (imsm *Glue) log() *log.Entry { - return log.WithField("_service", imsm.svcName) +func (imsm *Glue) log() gbus.FieldLogger { + return imsm.bus.Log().WithField("_service", imsm.svcName) } //NewGlue creates a new Sagamanager diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 0dee955..4619937 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -105,7 +105,8 @@ func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instan newInstance := &Instance{ ID: xid.New().String(), UnderlyingInstance: newSaga, - MsgToMethodMap: msgToMethodMap} + MsgToMethodMap: msgToMethodMap, + } return newInstance } diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index 36e3eec..be10abf 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -3,7 +3,6 @@ package saga import ( "context" "database/sql" - "github.com/sirupsen/logrus" "time" "github.com/wework/grabbit/gbus" @@ -18,7 +17,7 @@ type sagaInvocation struct { sagaID string ctx context.Context invokingService string - logger logrus.FieldLogger + log gbus.FieldLogger } func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bool) { @@ -81,6 +80,6 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) { return si.decoratedInvocation.Routing() } -func (si *sagaInvocation) Logger() logrus.FieldLogger { - panic("implement me") +func (si *sagaInvocation) Log() gbus.FieldLogger { + return si.log } diff --git a/gbus/serialization/proto.go b/gbus/serialization/proto.go index bff127a..8700b15 100644 --- a/gbus/serialization/proto.go +++ b/gbus/serialization/proto.go @@ -9,7 +9,6 @@ import ( "sync" "github.com/golang/protobuf/proto" - "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" ) @@ -19,11 +18,11 @@ var _ gbus.Serializer = &Proto{} type Proto struct { lock *sync.Mutex registeredSchemas map[string]reflect.Type - logger logrus.FieldLogger + logger gbus.FieldLogger } //NewProtoSerializer creates a new instance of Proto and returns it -func NewProtoSerializer(logger logrus.FieldLogger) gbus.Serializer { +func NewProtoSerializer(logger gbus.FieldLogger) gbus.Serializer { return &Proto{ registeredSchemas: make(map[string]reflect.Type), lock: &sync.Mutex{}, diff --git a/gbus/worker.go b/gbus/worker.go index 362c5ae..5881ffd 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -414,8 +414,8 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan )) } -func (worker *worker) log() logrus.FieldLogger { - return worker.b.Logger() +func (worker *worker) log() FieldLogger { + return worker.b.Log() } func (worker *worker) AddRegistration(registration *Registration) { From 8963173d1c1935dd312d93533ba651379c2be82e Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Tue, 11 Jun 2019 19:40:17 +0300 Subject: [PATCH 05/10] fix(logger:instance) now removing logging to stdout --- gbus/bus.go | 6 ++++-- gbus/invocation.go | 5 ++++- gbus/saga/instance.go | 16 +++++++++++----- gbus/saga/instance_test.go | 5 ++++- gbus/saga/invocation.go | 6 +++++- 5 files changed, 28 insertions(+), 10 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index 4ba97df..9e94a83 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -364,7 +364,6 @@ func (b *DefaultBus) GetHealth() HealthCard { DbConnected: dbConnected, RabbitBackPressure: b.backpressure, RabbitConnected: b.amqpConnected, - } } @@ -698,5 +697,8 @@ func (b *DefaultBus) SetLogger(entry FieldLogger) { } func (b *DefaultBus) Log() FieldLogger { - return b.log + if b.log != nil { + return b.log + } + return logrus.WithField("log", "nil") } diff --git a/gbus/invocation.go b/gbus/invocation.go index e0412a9..f111089 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -23,7 +23,10 @@ type defaultInvocationContext struct { } func (dfi *defaultInvocationContext) Log() FieldLogger { - return dfi.logger + if dfi.logger != nil { + return dfi.logger + } + return logrus.WithField("log", "nil") } func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error { diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 4619937..207a876 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -3,7 +3,7 @@ package saga import ( "database/sql" "fmt" - "log" + "github.com/sirupsen/logrus" "reflect" "time" @@ -17,6 +17,7 @@ type Instance struct { ConcurrencyCtrl int UnderlyingInstance gbus.Saga MsgToMethodMap []*MsgToFuncPair + Log gbus.FieldLogger } func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { @@ -38,16 +39,21 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati params := make([]reflect.Value, 0) params = append(params, reflect.ValueOf(invocation), valueOfMessage) method := reflectedVal.MethodByName(methodName) - log.Printf(" invoking method %v on saga instance %v", methodName, si.ID) + if invocation.Log() == nil { + panic("here") + } + invocation.Log().WithFields(logrus.Fields{ + "method_name": methodName, "saga_id": si.ID, + }).Info("invoking method on saga") returns := method.Call(params) val := returns[0] if !val.IsNil() { return val.Interface().(error) } - - log.Printf(" saga instance %v invoked", si.ID) - + invocation.Log().WithFields(logrus.Fields{ + "method_name": methodName, "saga_id": si.ID, + }).Info("saga instance invoked") } return nil diff --git a/gbus/saga/instance_test.go b/gbus/saga/instance_test.go index 4d828dd..9b9ce77 100644 --- a/gbus/saga/instance_test.go +++ b/gbus/saga/instance_test.go @@ -2,6 +2,7 @@ package saga import ( "errors" + "github.com/sirupsen/logrus" "reflect" "testing" @@ -19,7 +20,9 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) { m2 := TestMsg2{} exchange, routingKey := "", "kong" - invocationStub := &sagaInvocation{} + invocationStub := &sagaInvocation{ + log: logrus.New(), + } failName := getFunNameFromHandler(s.Fail) failFilter := gbus.NewMessageFilter(exchange, routingKey, m1) diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index be10abf..f086683 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -3,6 +3,7 @@ package saga import ( "context" "database/sql" + "github.com/sirupsen/logrus" "time" "github.com/wework/grabbit/gbus" @@ -81,5 +82,8 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) { } func (si *sagaInvocation) Log() gbus.FieldLogger { - return si.log + if si.log != nil { + return si.log + } + return logrus.WithField("log", "nil") } From cb46b6590eaa9b7edea3d5ed7a0b3515cad4c760 Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Wed, 12 Jun 2019 18:31:34 +0300 Subject: [PATCH 06/10] fix #88 - ensuring creatinon of new logrus entries each time the log function is called --- gbus/builder/builder.go | 4 ++-- gbus/bus.go | 4 ++-- gbus/invocation.go | 4 ++-- gbus/saga/invocation.go | 4 ++-- gbus/tx/mysql/sagastore.go | 4 ++-- gbus/tx/mysql/txoutbox.go | 2 +- gbus/tx/sagastore.go | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index fa0e50b..619a55b 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -57,9 +57,9 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } if builder.logger != nil { - gb.SetLogger(builder.logger.WithField("_service", svcName)) + gb.SetLogger(builder.logger) } else { - gb.SetLogger(logrus.WithField("_service", svcName)) + gb.SetLogger(logrus.New()) } if builder.workerNum < 1 { diff --git a/gbus/bus.go b/gbus/bus.go index 9e94a83..c7d3f31 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -698,7 +698,7 @@ func (b *DefaultBus) SetLogger(entry FieldLogger) { func (b *DefaultBus) Log() FieldLogger { if b.log != nil { - return b.log + return b.log.WithField("_service", b.SvcName) } - return logrus.WithField("log", "nil") + return logrus.WithField("_service", b.SvcName) } diff --git a/gbus/invocation.go b/gbus/invocation.go index f111089..b079f35 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -24,9 +24,9 @@ type defaultInvocationContext struct { func (dfi *defaultInvocationContext) Log() FieldLogger { if dfi.logger != nil { - return dfi.logger + return dfi.logger.WithField("routing_key", dfi.routingKey) } - return logrus.WithField("log", "nil") + return logrus.WithField("routing_key", dfi.routingKey) } func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error { diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index f086683..d2b60dd 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -83,7 +83,7 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) { func (si *sagaInvocation) Log() gbus.FieldLogger { if si.log != nil { - return si.log + return si.log.WithField("saga_id", si.sagaID) } - return logrus.WithField("log", "nil") + return logrus.WithField("saga_id", si.sagaID) } diff --git a/gbus/tx/mysql/sagastore.go b/gbus/tx/mysql/sagastore.go index e458041..9217d48 100644 --- a/gbus/tx/mysql/sagastore.go +++ b/gbus/tx/mysql/sagastore.go @@ -10,13 +10,13 @@ import ( "github.com/wework/grabbit/gbus/tx" ) -//SagaStore implements the saga/store interface on top of PostgreSQL +//SagaStore implements the saga/store interface on top of MySQL type SagaStore struct { *tx.SagaStore } func (store *SagaStore) log() *log.Entry { - return log.WithField("_service", store.SvcName) + return log.WithField("saga_store", "mysql") } func (store *SagaStore) ensureSchema() { diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 7bc8108..0a5fa7e 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -44,7 +44,7 @@ type TxOutbox struct { } func (outbox *TxOutbox) log() *log.Entry { - return log.WithField("_service", outbox.svcName) + return log.WithField("tx", "mysql") } //Start starts the transactional outbox that is used to send messages in sync with domain object change diff --git a/gbus/tx/sagastore.go b/gbus/tx/sagastore.go index a720ec9..c7e1547 100644 --- a/gbus/tx/sagastore.go +++ b/gbus/tx/sagastore.go @@ -220,5 +220,5 @@ func (store *SagaStore) GetSagatableName() string { } func (store *SagaStore) log() *log.Entry { - return log.WithField("_service", store.SvcName) + return log.WithField("store", "mysql") } From 2577826b562e9bcb56b527643eaec42f93499d78 Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Sun, 16 Jun 2019 16:01:50 +0300 Subject: [PATCH 07/10] code review fixes --- gbus/abstractions.go | 8 +++----- gbus/builder/builder.go | 4 ++-- gbus/bus.go | 6 +++--- gbus/invocation.go | 11 +++++++---- gbus/saga/glue.go | 2 +- gbus/saga/instance.go | 2 +- gbus/saga/invocation.go | 9 +++------ gbus/serialization/proto.go | 5 +++-- gbus/worker.go | 2 +- 9 files changed, 24 insertions(+), 25 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 4647af8..2c0fcdc 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -187,7 +187,7 @@ type Invocation interface { Bus() Messaging Tx() *sql.Tx Ctx() context.Context - Log() FieldLogger + Log() logrus.FieldLogger Routing() (exchange, routingKey string) } @@ -213,9 +213,7 @@ type TxOutbox interface { Stop() error } -type FieldLogger logrus.FieldLogger - type Logged interface { - SetLogger(entry FieldLogger) - Log() FieldLogger + SetLogger(entry logrus.FieldLogger) + Log() logrus.FieldLogger } diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 619a55b..016b100 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -28,7 +28,7 @@ type defaultBuilder struct { confirm bool dbPingTimeout time.Duration usingPingTimeout bool - logger gbus.FieldLogger + logger logrus.FieldLogger } func (builder *defaultBuilder) Build(svcName string) gbus.Bus { @@ -188,7 +188,7 @@ func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) g return builder } -func (builder *defaultBuilder) WithLogger(logger gbus.FieldLogger) gbus.Builder { +func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builder { builder.logger = logger return builder } diff --git a/gbus/bus.go b/gbus/bus.go index c7d3f31..9fa45ce 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -37,7 +37,7 @@ type DefaultBus struct { amqpErrors chan *amqp.Error amqpBlocks chan amqp.Blocking Registrations []*Registration - log FieldLogger + log logrus.FieldLogger RPCHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error @@ -692,11 +692,11 @@ func (p rpcPolicy) Apply(publishing *amqp.Publishing) { publishing.Headers[RpcHeaderName] = p.rpcID } -func (b *DefaultBus) SetLogger(entry FieldLogger) { +func (b *DefaultBus) SetLogger(entry logrus.FieldLogger) { b.log = entry } -func (b *DefaultBus) Log() FieldLogger { +func (b *DefaultBus) Log() logrus.FieldLogger { if b.log != nil { return b.log.WithField("_service", b.SvcName) } diff --git a/gbus/invocation.go b/gbus/invocation.go index b079f35..f05d859 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -19,14 +19,17 @@ type defaultInvocationContext struct { ctx context.Context exchange string routingKey string - logger FieldLogger + logger logrus.FieldLogger } -func (dfi *defaultInvocationContext) Log() FieldLogger { +func (dfi *defaultInvocationContext) Log() logrus.FieldLogger { + var l logrus.FieldLogger if dfi.logger != nil { - return dfi.logger.WithField("routing_key", dfi.routingKey) + l = dfi.logger + } else { + l = logrus.New() } - return logrus.WithField("routing_key", dfi.routingKey) + return l.WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID}) } func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error { diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 4279af2..b3c336e 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -245,7 +245,7 @@ func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { return imsm.completeOrUpdateSaga(tx, saga) } -func (imsm *Glue) log() gbus.FieldLogger { +func (imsm *Glue) log() logrus.FieldLogger { return imsm.bus.Log().WithField("_service", imsm.svcName) } diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 207a876..9513c40 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -17,7 +17,7 @@ type Instance struct { ConcurrencyCtrl int UnderlyingInstance gbus.Saga MsgToMethodMap []*MsgToFuncPair - Log gbus.FieldLogger + Log logrus.FieldLogger } func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index d2b60dd..44c0dd6 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -18,7 +18,7 @@ type sagaInvocation struct { sagaID string ctx context.Context invokingService string - log gbus.FieldLogger + log logrus.FieldLogger } func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bool) { @@ -81,9 +81,6 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) { return si.decoratedInvocation.Routing() } -func (si *sagaInvocation) Log() gbus.FieldLogger { - if si.log != nil { - return si.log.WithField("saga_id", si.sagaID) - } - return logrus.WithField("saga_id", si.sagaID) +func (si *sagaInvocation) Log() logrus.FieldLogger { + return si.decoratedInvocation.Log().WithField("saga_id", si.sagaID) } diff --git a/gbus/serialization/proto.go b/gbus/serialization/proto.go index 8700b15..56591ee 100644 --- a/gbus/serialization/proto.go +++ b/gbus/serialization/proto.go @@ -8,6 +8,7 @@ import ( "reflect" "sync" + "github.com/sirupsen/logrus" "github.com/golang/protobuf/proto" "github.com/wework/grabbit/gbus" ) @@ -18,11 +19,11 @@ var _ gbus.Serializer = &Proto{} type Proto struct { lock *sync.Mutex registeredSchemas map[string]reflect.Type - logger gbus.FieldLogger + logger logrus.FieldLogger } //NewProtoSerializer creates a new instance of Proto and returns it -func NewProtoSerializer(logger gbus.FieldLogger) gbus.Serializer { +func NewProtoSerializer(logger logrus.FieldLogger) gbus.Serializer { return &Proto{ registeredSchemas: make(map[string]reflect.Type), lock: &sync.Mutex{}, diff --git a/gbus/worker.go b/gbus/worker.go index 5881ffd..60074c9 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -414,7 +414,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan )) } -func (worker *worker) log() FieldLogger { +func (worker *worker) log() logrus.FieldLogger { return worker.b.Log() } From 58e1426eca69301cfa9304c1ed7f5bea51b4a69a Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 22 Jun 2019 12:03:37 +0300 Subject: [PATCH 08/10] closing rows when querying transactional outbox fails When failing to query the outbox tables there was no call to rows.Close potentially causing a connection leack. --- gbus/tx/mysql/txoutbox.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 7bc8108..683450f 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -241,7 +241,10 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, if selectErr != nil { outbox.log().WithError(selectErr).Error("failed fetching messages from outbox") - + err := rows.Close() + if err != nil { + outbox.log().WithError(err).Error("could not close Rows") + } return selectErr } From 4234b1e7884d4469b5f3c6f56bec2daf84fe6a71 Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Thu, 20 Jun 2019 08:05:11 +0300 Subject: [PATCH 09/10] fixed tests --- gbus/abstractions.go | 5 ++++- gbus/bus.go | 14 ++++++-------- gbus/invocation.go | 10 ++-------- gbus/logged.go | 31 +++++++++++++++++++++++++++++++ gbus/saga/glue.go | 10 +++++----- gbus/saga/instance_test.go | 5 +---- gbus/saga/invocation.go | 9 ++++----- gbus/serialization/proto.go | 2 +- gbus/worker.go | 2 +- 9 files changed, 55 insertions(+), 33 deletions(-) create mode 100644 gbus/logged.go diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 2c0fcdc..d914271 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -179,15 +179,18 @@ type Builder interface { //Build the bus Build(svcName string) Bus + + //WithLogger set custom logger instance + WithLogger(logger logrus.FieldLogger) Builder } //Invocation context for a specific processed message type Invocation interface { + Logged Reply(ctx context.Context, message *BusMessage) error Bus() Messaging Tx() *sql.Tx Ctx() context.Context - Log() logrus.FieldLogger Routing() (exchange, routingKey string) } diff --git a/gbus/bus.go b/gbus/bus.go index 9fa45ce..e6fe8fc 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -23,6 +23,7 @@ var _ Bus = &DefaultBus{} //DefaultBus implements the Bus interface type DefaultBus struct { *Safety + *Glogged Outgoing *AMQPOutbox Outbox TxOutbox PrefetchCount uint @@ -37,7 +38,6 @@ type DefaultBus struct { amqpErrors chan *amqp.Error amqpBlocks chan amqp.Blocking Registrations []*Registration - log logrus.FieldLogger RPCHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error @@ -692,13 +692,11 @@ func (p rpcPolicy) Apply(publishing *amqp.Publishing) { publishing.Headers[RpcHeaderName] = p.rpcID } -func (b *DefaultBus) SetLogger(entry logrus.FieldLogger) { - b.log = entry -} - func (b *DefaultBus) Log() logrus.FieldLogger { - if b.log != nil { - return b.log.WithField("_service", b.SvcName) + if b.Glogged == nil { + b.Glogged = &Glogged{ + log: logrus.WithField("_service", b.SvcName), + } } - return logrus.WithField("_service", b.SvcName) + return b.Glogged.Log() } diff --git a/gbus/invocation.go b/gbus/invocation.go index f05d859..08a631a 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -12,6 +12,7 @@ var _ Invocation = &defaultInvocationContext{} var _ Messaging = &defaultInvocationContext{} type defaultInvocationContext struct { + *Glogged invocingSvc string bus *DefaultBus inboundMsg *BusMessage @@ -19,17 +20,10 @@ type defaultInvocationContext struct { ctx context.Context exchange string routingKey string - logger logrus.FieldLogger } func (dfi *defaultInvocationContext) Log() logrus.FieldLogger { - var l logrus.FieldLogger - if dfi.logger != nil { - l = dfi.logger - } else { - l = logrus.New() - } - return l.WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID}) + return dfi.Glogged.Log().WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID}) } func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error { diff --git a/gbus/logged.go b/gbus/logged.go new file mode 100644 index 0000000..1590de8 --- /dev/null +++ b/gbus/logged.go @@ -0,0 +1,31 @@ +// Copyright © 2019 Vladislav Shub +// All rights reserved to the We Company. + +package gbus + +import ( + "github.com/sirupsen/logrus" +) + +var _ Logged = &Glogged{} + +type Glogged struct { + log logrus.FieldLogger +} + +func (gl *Glogged) SetLogger(entry logrus.FieldLogger) { + if gl == nil { + gl = &Glogged{} + } + gl.log = entry +} + +func (gl *Glogged) Log() logrus.FieldLogger { + if gl == nil { + gl = &Glogged{} + } + if gl.log == nil { + gl.log = logrus.New() + } + return gl.log +} diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index b3c336e..8367b29 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -191,12 +191,12 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat sagaID: instance.ID, ctx: invocation.Ctx(), invokingService: imsm.svcName, - log: imsm.log().WithFields(logrus.Fields{ - "saga_id": instance.ID, - "saga_type": instance.String(), - "message_name": message.PayloadFQN, - }), } + sginv.SetLogger(imsm.log().WithFields(logrus.Fields{ + "saga_id": instance.ID, + "saga_type": instance.String(), + "message_name": message.PayloadFQN, + })) exchange, routingKey := invocation.Routing() return instance.invoke(exchange, routingKey, sginv, message) diff --git a/gbus/saga/instance_test.go b/gbus/saga/instance_test.go index 9b9ce77..4d828dd 100644 --- a/gbus/saga/instance_test.go +++ b/gbus/saga/instance_test.go @@ -2,7 +2,6 @@ package saga import ( "errors" - "github.com/sirupsen/logrus" "reflect" "testing" @@ -20,9 +19,7 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) { m2 := TestMsg2{} exchange, routingKey := "", "kong" - invocationStub := &sagaInvocation{ - log: logrus.New(), - } + invocationStub := &sagaInvocation{} failName := getFunNameFromHandler(s.Fail) failFilter := gbus.NewMessageFilter(exchange, routingKey, m1) diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index 44c0dd6..54aa494 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -3,7 +3,6 @@ package saga import ( "context" "database/sql" - "github.com/sirupsen/logrus" "time" "github.com/wework/grabbit/gbus" @@ -12,13 +11,13 @@ import ( var _ gbus.Invocation = &sagaInvocation{} type sagaInvocation struct { + *gbus.Glogged decoratedBus gbus.Messaging decoratedInvocation gbus.Invocation inboundMsg *gbus.BusMessage sagaID string ctx context.Context invokingService string - log logrus.FieldLogger } func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bool) { @@ -81,6 +80,6 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) { return si.decoratedInvocation.Routing() } -func (si *sagaInvocation) Log() logrus.FieldLogger { - return si.decoratedInvocation.Log().WithField("saga_id", si.sagaID) -} +//func (si *sagaInvocation) Log() logrus.FieldLogger { +// return si.decoratedInvocation.Log().WithField("saga_id", si.sagaID) +//} diff --git a/gbus/serialization/proto.go b/gbus/serialization/proto.go index 56591ee..bff127a 100644 --- a/gbus/serialization/proto.go +++ b/gbus/serialization/proto.go @@ -8,8 +8,8 @@ import ( "reflect" "sync" - "github.com/sirupsen/logrus" "github.com/golang/protobuf/proto" + "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" ) diff --git a/gbus/worker.go b/gbus/worker.go index 60074c9..fda9db2 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -373,8 +373,8 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan ctx: hsctx, exchange: delivery.Exchange, routingKey: delivery.RoutingKey, - logger: worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()), } + ctx.SetLogger(worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name())) handlerErr = handler(ctx, message) if handlerErr != nil { hspan.LogFields(slog.Error(handlerErr)) From 3c2b60c47136b19f87f459e491d1bec7620c6a8d Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 25 Jun 2019 11:24:01 +0300 Subject: [PATCH 10/10] updated README.md to include a release badge (#92) --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 2367aee..6b47b03 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,9 @@ [![CircleCI](https://circleci.com/gh/wework/grabbit.svg?style=svg)](https://circleci.com/gh/wework/grabbit) [![Go Report Card](https://goreportcard.com/badge/github.com/wework/grabbit)](https://goreportcard.com/report/github.com/wework/grabbit) [![Coverage Status](https://coveralls.io/repos/github/wework/grabbit/badge.svg?branch=master)](https://coveralls.io/github/wework/grabbit?branch=master) +![GitHub release](https://img.shields.io/github/release/wework/grabbit.svg) + + # grabbit