diff --git a/README.md b/README.md index 9123586..6b47b03 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,9 @@ [![CircleCI](https://circleci.com/gh/wework/grabbit.svg?style=svg)](https://circleci.com/gh/wework/grabbit) [![Go Report Card](https://goreportcard.com/badge/github.com/wework/grabbit)](https://goreportcard.com/report/github.com/wework/grabbit) [![Coverage Status](https://coveralls.io/repos/github/wework/grabbit/badge.svg?branch=master)](https://coveralls.io/github/wework/grabbit?branch=master) +![GitHub release](https://img.shields.io/github/release/wework/grabbit.svg) + + # grabbit @@ -141,5 +144,6 @@ if e != nil{ ## Testing +0) ensure that you have the dependencies installed: `go get -v -t -d ./...` 1) make sure to first: `docker-compose up -V -d` 2) then to run the tests: `go test ./...` diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 42f5410..d914271 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -3,6 +3,7 @@ package gbus import ( "context" "database/sql" + "github.com/sirupsen/logrus" "time" "github.com/streadway/amqp" @@ -17,7 +18,7 @@ const ( //BusConfiguration provides configuration passed to the bus builder type BusConfiguration struct { - MaxRetryCount uint + MaxRetryCount uint BaseRetryDuration int } @@ -29,6 +30,7 @@ type Bus interface { Messaging SagaRegister Health + Logged } //Message a common interface that passes to the serializers to allow decoding and encoding of content @@ -177,10 +179,14 @@ type Builder interface { //Build the bus Build(svcName string) Bus + + //WithLogger set custom logger instance + WithLogger(logger logrus.FieldLogger) Builder } //Invocation context for a specific processed message type Invocation interface { + Logged Reply(ctx context.Context, message *BusMessage) error Bus() Messaging Tx() *sql.Tx @@ -209,3 +215,8 @@ type TxOutbox interface { Start(amqpOut *AMQPOutbox) error Stop() error } + +type Logged interface { + SetLogger(entry logrus.FieldLogger) + Log() logrus.FieldLogger +} diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index b704015..016b100 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -2,6 +2,7 @@ package builder import ( "fmt" + "github.com/sirupsen/logrus" "sync" "time" @@ -27,13 +28,14 @@ type defaultBuilder struct { confirm bool dbPingTimeout time.Duration usingPingTimeout bool + logger logrus.FieldLogger } func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb := &gbus.DefaultBus{ AmqpConnStr: builder.connStr, - PrefetchCount: 1, + PrefetchCount: builder.PrefetchCount, Outgoing: &gbus.AMQPOutbox{ SvcName: svcName, }, @@ -50,15 +52,21 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { Serializer: builder.serializer, DLX: builder.dlx, DefaultPolicies: builder.defaultPolicies, - DbPingTimeout: 3} + DbPingTimeout: 3, + Confirm: builder.confirm, + } + + if builder.logger != nil { + gb.SetLogger(builder.logger) + } else { + gb.SetLogger(logrus.New()) + } - gb.Confirm = builder.confirm if builder.workerNum < 1 { gb.WorkerNum = 1 } else { gb.WorkerNum = builder.workerNum } - gb.PrefetchCount = builder.PrefetchCount var ( sagaStore saga.Store ) @@ -175,11 +183,16 @@ func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) g gbus.MaxRetryCount = config.MaxRetryCount } if config.BaseRetryDuration > 0 { - gbus.BaseRetryDuration = time.Millisecond*time.Duration(config.BaseRetryDuration) + gbus.BaseRetryDuration = time.Millisecond * time.Duration(config.BaseRetryDuration) } return builder } +func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builder { + builder.logger = logger + return builder +} + //New :) func New() Nu { return Nu{} diff --git a/gbus/bus.go b/gbus/bus.go index 19b5b13..e6fe8fc 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -13,15 +13,17 @@ import ( "github.com/opentracing/opentracing-go" slog "github.com/opentracing/opentracing-go/log" "github.com/rs/xid" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) var _ SagaRegister = &DefaultBus{} +var _ Bus = &DefaultBus{} //DefaultBus implements the Bus interface type DefaultBus struct { *Safety + *Glogged Outgoing *AMQPOutbox Outbox TxOutbox PrefetchCount uint @@ -56,7 +58,7 @@ type DefaultBus struct { DefaultPolicies []MessagePolicy Confirm bool healthChan chan error - backpreasure bool + backpressure bool DbPingTimeout time.Duration amqpConnected bool } @@ -67,7 +69,7 @@ var ( MaxRetryCount uint = 3 //BaseRetryDuration defines the basic milliseconds that the retry algorithm uses //for a random retry time. Default is 10 but it is configurable. - BaseRetryDuration = 10*time.Millisecond + BaseRetryDuration = 10 * time.Millisecond //RpcHeaderName used to define the header in grabbit for RPC RpcHeaderName = "x-grabbit-msg-rpc-id" ) @@ -97,7 +99,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) { if b.PurgeOnStartup { msgsPurged, purgeError := b.AMQPChannel.QueueDelete(qName, false /*ifUnused*/, false /*ifEmpty*/, false /*noWait*/) if purgeError != nil { - b.log().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue") + b.Log().WithError(purgeError).WithField("deleted_messages", msgsPurged).Error("failed to purge queue") return q, purgeError } } @@ -113,7 +115,7 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) { false, /*noWait*/ args /*args*/) if e != nil { - b.log().WithError(e).Error("failed to declare queue") + b.Log().WithError(e).Error("failed to declare queue") } b.serviceQueue = q return q, e @@ -130,12 +132,12 @@ func (b *DefaultBus) bindServiceQueue() error { false, /*noWait*/ nil /*args amqp.Table*/) if err != nil { - b.log().WithError(err).Error("could not declare exchange") + b.Log().WithError(err).Error("could not declare exchange") return err } err = b.bindQueue("", b.DLX) if err != nil { - b.log().WithError(err).Error("could not bind exchange") + b.Log().WithError(err).Error("could not bind exchange") return err } } @@ -150,12 +152,12 @@ func (b *DefaultBus) bindServiceQueue() error { false, /*noWait*/ nil /*args amqp.Table*/) if e != nil { - b.log().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange") + b.Log().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange") return e } e = b.bindQueue(topic, exchange) if e != nil { - b.log().WithError(e).WithFields(log.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange") + b.Log().WithError(e).WithFields(logrus.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange") return e } @@ -207,7 +209,7 @@ func (b *DefaultBus) Start() error { var amqpChan *amqp.Channel if amqpChan, e = b.createAMQPChannel(b.amqpConn); e != nil { - b.log().WithError(e).Error("failed to create amqp channel for transactional outbox") + b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox") return e } amqpChan.NotifyClose(b.amqpErrors) @@ -216,11 +218,11 @@ func (b *DefaultBus) Start() error { } err := amqpOutbox.init(amqpChan, b.Confirm, false) if err != nil { - b.log().WithError(err).Error("failed initializing amqpOutbox") + b.Log().WithError(err).Error("failed initializing amqpOutbox") return err } if startErr := b.Outbox.Start(amqpOutbox); startErr != nil { - b.log().WithError(startErr).Error("failed to start transactional outbox") + b.Log().WithError(startErr).Error("failed to start transactional outbox") return startErr } @@ -236,7 +238,7 @@ func (b *DefaultBus) Start() error { //bind queue err := b.bindServiceQueue() if err != nil { - b.log().WithError(err).Error("could not bind service to queue") + b.Log().WithError(err).Error("could not bind service to queue") return err } @@ -246,11 +248,11 @@ func (b *DefaultBus) Start() error { return e } - b.log().WithField("number_of_workers", b.WorkerNum).Info("initiating workers") + b.Log().WithField("number_of_workers", b.WorkerNum).Info("initiating workers") workers, createWorkersErr := b.createBusWorkers(b.WorkerNum) if createWorkersErr != nil { - b.log().WithError(createWorkersErr).Error("error creating channel for worker") + b.Log().WithError(createWorkersErr).Error("error creating channel for worker") return createWorkersErr } @@ -274,7 +276,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { qosErr := amqpChan.Qos(int(b.PrefetchCount), 0, false) if qosErr != nil { - log.Printf("failed to set worker qos\n %v", qosErr) + b.Log().Printf("failed to set worker qos\n %v", qosErr) } tag := fmt.Sprintf("%s_worker_%d", b.SvcName, i) @@ -298,7 +300,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { err := w.Start() if err != nil { - log.WithError(err).Error("failed to start worker") + b.Log().WithError(err).Error("failed to start worker") } workers = append(workers, w) @@ -309,19 +311,19 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { //Shutdown implements GBus.Start() func (b *DefaultBus) Shutdown() (shutdwonErr error) { - b.log().Info("Bus shuting down") + b.Log().Info("Bus shuting down") defer func() { if p := recover(); p != nil { pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) shutdwonErr = errors.New(pncMsg) - b.log().WithError(shutdwonErr).Error("error when shutting down bus") + b.Log().WithError(shutdwonErr).Error("error when shutting down bus") } }() for _, worker := range b.workers { err := worker.Stop() if err != nil { - b.log().WithError(err).Error("could not stop worker") + b.Log().WithError(err).Error("could not stop worker") return err } } @@ -332,7 +334,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { err := b.Outbox.Stop() if err != nil { - b.log().WithError(err).Error("could not shutdown outbox") + b.Log().WithError(err).Error("could not shutdown outbox") return err } b.TxProvider.Dispose() @@ -360,7 +362,7 @@ func (b *DefaultBus) GetHealth() HealthCard { return HealthCard{ DbConnected: dbConnected, - RabbitBackPressure: b.backpreasure, + RabbitBackPressure: b.backpressure, RabbitConnected: b.amqpConnected, } } @@ -383,7 +385,7 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er newTx, newTxErr := b.TxProvider.New() if newTxErr != nil { - b.log().WithError(newTxErr).Error("failed to create transaction when sending a transactional message") + b.Log().WithError(newTxErr).Error("failed to create transaction when sending a transactional message") return newTxErr } activeTx = newTx @@ -404,12 +406,12 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er if actionErr != nil { err := activeTx.Rollback() if err != nil { - b.log().WithError(err).Error("could not rollback transaction") + b.Log().WithError(err).Error("could not rollback transaction") } } else { commitErr := activeTx.Commit() if commitErr != nil { - b.log().WithError(commitErr).Error("could not commit transaction") + b.Log().WithError(commitErr).Error("could not commit transaction") return commitErr } } @@ -448,7 +450,7 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu b.Serializer.Register(reply.Payload) err := b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc) if err != nil { - b.log().WithError(err).Error("could not send message") + b.Log().WithError(err).Error("could not send message") return nil, err } @@ -559,28 +561,20 @@ func (b *DefaultBus) connect(retryCount uint) (*amqp.Connection, error) { } -// -// func (b *DefaultBus) log(format string, v ...interface{}) { -// log.WithField("Service", b.SvcName).Infof(format, v...) -// } - -func (b *DefaultBus) log() *log.Entry { - return log.WithField("_service", b.SvcName) -} func (b *DefaultBus) monitorAMQPErrors() { for b.started { select { case blocked := <-b.amqpBlocks: if blocked.Active { - b.log().WithField("reason", blocked.Reason).Warn("amqp connection blocked") + b.Log().WithField("reason", blocked.Reason).Warn("amqp connection blocked") } else { - b.log().WithField("reason", blocked.Reason).Info("amqp connection unblocked") + b.Log().WithField("reason", blocked.Reason).Info("amqp connection unblocked") } - b.backpreasure = blocked.Active + b.backpressure = blocked.Active case amqpErr := <-b.amqpErrors: b.amqpConnected = false - b.log().WithField("amqp_error", amqpErr).Error("amqp error") + b.Log().WithField("amqp_error", amqpErr).Error("amqp error") if b.healthChan != nil { b.healthChan <- amqpErr } @@ -604,12 +598,12 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply headers := message.GetAMQPHeaders() err := amqptracer.Inject(span, headers) if err != nil { - b.log().WithError(err).Error("could not inject headers") + b.Log().WithError(err).Error("could not inject headers") } buffer, err := b.Serializer.Encode(message.Payload) if err != nil { - b.log().WithError(err).WithField("message", message).Error("failed to send message, encoding of message failed") + b.Log().WithError(err).WithField("message", message).Error("failed to send message, encoding of message failed") return err } @@ -643,16 +637,16 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply //send to the transactional outbox if the bus is transactional //otherwise send directly to amqp if b.IsTxnl && tx != nil { - b.log().WithField("message_id", msg.MessageId).Debug("sending message to outbox") + b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox") saveErr := b.Outbox.Save(tx, exchange, key, msg) if saveErr != nil { - log.WithError(saveErr).Error("failed to save to transactional outbox") + b.Log().WithError(saveErr).Error("failed to save to transactional outbox") } return saveErr } - //do not attempt to contact the borker if backpreasure is being applied - if b.backpreasure { - return errors.New("can't send message due to backpreasure from amqp broker") + //do not attempt to contact the borker if backpressure is being applied + if b.backpressure { + return errors.New("can't send message due to backpressure from amqp broker") } _, outgoingErr := b.Outgoing.Post(exchange, key, msg) return outgoingErr @@ -663,7 +657,7 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply err = b.SafeWithRetries(publish, MaxRetryCount) if err != nil { - log.Printf("failed publishing message.\n error:%v", err) + b.Log().Printf("failed publishing message.\n error:%v", err) return err } return err @@ -697,3 +691,12 @@ type rpcPolicy struct { func (p rpcPolicy) Apply(publishing *amqp.Publishing) { publishing.Headers[RpcHeaderName] = p.rpcID } + +func (b *DefaultBus) Log() logrus.FieldLogger { + if b.Glogged == nil { + b.Glogged = &Glogged{ + log: logrus.WithField("_service", b.SvcName), + } + } + return b.Glogged.Log() +} diff --git a/gbus/invocation.go b/gbus/invocation.go index e854e14..08a631a 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -8,7 +8,11 @@ import ( "github.com/sirupsen/logrus" ) +var _ Invocation = &defaultInvocationContext{} +var _ Messaging = &defaultInvocationContext{} + type defaultInvocationContext struct { + *Glogged invocingSvc string bus *DefaultBus inboundMsg *BusMessage @@ -18,6 +22,10 @@ type defaultInvocationContext struct { routingKey string } +func (dfi *defaultInvocationContext) Log() logrus.FieldLogger { + return dfi.Glogged.Log().WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID}) +} + func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error { if dfi.inboundMsg != nil { replyMessage.CorrelationID = dfi.inboundMsg.ID diff --git a/gbus/logged.go b/gbus/logged.go new file mode 100644 index 0000000..1590de8 --- /dev/null +++ b/gbus/logged.go @@ -0,0 +1,31 @@ +// Copyright © 2019 Vladislav Shub +// All rights reserved to the We Company. + +package gbus + +import ( + "github.com/sirupsen/logrus" +) + +var _ Logged = &Glogged{} + +type Glogged struct { + log logrus.FieldLogger +} + +func (gl *Glogged) SetLogger(entry logrus.FieldLogger) { + if gl == nil { + gl = &Glogged{} + } + gl.log = entry +} + +func (gl *Glogged) Log() logrus.FieldLogger { + if gl == nil { + gl = &Glogged{} + } + if gl.log == nil { + gl.log = logrus.New() + } + return gl.log +} diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index d09b1d5..8367b29 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -7,7 +7,7 @@ import ( "strings" "sync" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" ) @@ -72,7 +72,7 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error { } imsm.log(). - WithFields(log.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}). + WithFields(logrus.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}). Info("registered saga with messages") return nil @@ -113,7 +113,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) if startNew { newInstance := def.newInstance() imsm.log(). - WithFields(log.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). + WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). Info("created new saga") if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil { imsm.log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") @@ -129,7 +129,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) } if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { - imsm.log().WithFields(log.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") + imsm.log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") imsm.timeoutManger.RequestTimeout(imsm.svcName, newInstance.ID, duration) } } @@ -158,13 +158,13 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return e } else { - imsm.log().WithFields(log.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") + imsm.log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") instances, e := imsm.sagaStore.GetSagasByType(invocation.Tx(), def.sagaType) if e != nil { return e } - imsm.log().WithFields(log.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") + imsm.log().WithFields(logrus.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") for _, instance := range instances { def.configureSaga(instance) @@ -190,7 +190,13 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat inboundMsg: message, sagaID: instance.ID, ctx: invocation.Ctx(), - invokingService: imsm.svcName} + invokingService: imsm.svcName, + } + sginv.SetLogger(imsm.log().WithFields(logrus.Fields{ + "saga_id": instance.ID, + "saga_type": instance.String(), + "message_name": message.PayloadFQN, + })) exchange, routingKey := invocation.Routing() return instance.invoke(exchange, routingKey, sginv, message) @@ -239,8 +245,8 @@ func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { return imsm.completeOrUpdateSaga(tx, saga) } -func (imsm *Glue) log() *log.Entry { - return log.WithField("_service", imsm.svcName) +func (imsm *Glue) log() logrus.FieldLogger { + return imsm.bus.Log().WithField("_service", imsm.svcName) } //NewGlue creates a new Sagamanager diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 977eb3b..9513c40 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -3,7 +3,7 @@ package saga import ( "database/sql" "fmt" - "log" + "github.com/sirupsen/logrus" "reflect" "time" @@ -17,6 +17,7 @@ type Instance struct { ConcurrencyCtrl int UnderlyingInstance gbus.Saga MsgToMethodMap []*MsgToFuncPair + Log logrus.FieldLogger } func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { @@ -38,16 +39,21 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati params := make([]reflect.Value, 0) params = append(params, reflect.ValueOf(invocation), valueOfMessage) method := reflectedVal.MethodByName(methodName) - log.Printf(" invoking method %v on saga instance %v", methodName, si.ID) + if invocation.Log() == nil { + panic("here") + } + invocation.Log().WithFields(logrus.Fields{ + "method_name": methodName, "saga_id": si.ID, + }).Info("invoking method on saga") returns := method.Call(params) val := returns[0] if !val.IsNil() { return val.Interface().(error) } - - log.Printf(" saga instance %v invoked", si.ID) - + invocation.Log().WithFields(logrus.Fields{ + "method_name": methodName, "saga_id": si.ID, + }).Info("saga instance invoked") } return nil @@ -90,7 +96,6 @@ func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error { func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance { - var newSagaPtr interface{} if sagaType.Kind() == reflect.Ptr { newSagaPtr = reflect.New(sagaType).Elem().Interface() @@ -106,7 +111,8 @@ func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instan newInstance := &Instance{ ID: xid.New().String(), UnderlyingInstance: newSaga, - MsgToMethodMap: msgToMethodMap} + MsgToMethodMap: msgToMethodMap, + } return newInstance } diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index f2725af..54aa494 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -8,7 +8,10 @@ import ( "github.com/wework/grabbit/gbus" ) +var _ gbus.Invocation = &sagaInvocation{} + type sagaInvocation struct { + *gbus.Glogged decoratedBus gbus.Messaging decoratedInvocation gbus.Invocation inboundMsg *gbus.BusMessage @@ -76,3 +79,7 @@ func (si *sagaInvocation) RPC(ctx context.Context, service string, request, func (si *sagaInvocation) Routing() (exchange, routingKey string) { return si.decoratedInvocation.Routing() } + +//func (si *sagaInvocation) Log() logrus.FieldLogger { +// return si.decoratedInvocation.Log().WithField("saga_id", si.sagaID) +//} diff --git a/gbus/tx/mysql/sagastore.go b/gbus/tx/mysql/sagastore.go index e458041..9217d48 100644 --- a/gbus/tx/mysql/sagastore.go +++ b/gbus/tx/mysql/sagastore.go @@ -10,13 +10,13 @@ import ( "github.com/wework/grabbit/gbus/tx" ) -//SagaStore implements the saga/store interface on top of PostgreSQL +//SagaStore implements the saga/store interface on top of MySQL type SagaStore struct { *tx.SagaStore } func (store *SagaStore) log() *log.Entry { - return log.WithField("_service", store.SvcName) + return log.WithField("saga_store", "mysql") } func (store *SagaStore) ensureSchema() { diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 7bc8108..2c49610 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -44,7 +44,7 @@ type TxOutbox struct { } func (outbox *TxOutbox) log() *log.Entry { - return log.WithField("_service", outbox.svcName) + return log.WithField("tx", "mysql") } //Start starts the transactional outbox that is used to send messages in sync with domain object change @@ -241,7 +241,10 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, if selectErr != nil { outbox.log().WithError(selectErr).Error("failed fetching messages from outbox") - + err := rows.Close() + if err != nil { + outbox.log().WithError(err).Error("could not close Rows") + } return selectErr } diff --git a/gbus/tx/sagastore.go b/gbus/tx/sagastore.go index a720ec9..c7e1547 100644 --- a/gbus/tx/sagastore.go +++ b/gbus/tx/sagastore.go @@ -220,5 +220,5 @@ func (store *SagaStore) GetSagatableName() string { } func (store *SagaStore) log() *log.Entry { - return log.WithField("_service", store.SvcName) + return log.WithField("store", "mysql") } diff --git a/gbus/worker.go b/gbus/worker.go index ba928f0..fda9db2 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -19,7 +19,7 @@ import ( "github.com/opentracing-contrib/go-amqp/amqptracer" "github.com/opentracing/opentracing-go" slog "github.com/opentracing/opentracing-go/log" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) @@ -146,7 +146,7 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er } if bm.PayloadFQN == "" || bm.Semantics == "" { //TODO: Log poison pill message - worker.log().WithFields(log.Fields{"fqn": bm.PayloadFQN, "semantics": bm.Semantics}).Warn("message received but no headers found...rejecting message") + worker.log().WithFields(logrus.Fields{"fqn": bm.PayloadFQN, "semantics": bm.Semantics}).Warn("message received but no headers found...rejecting message") return nil, errors.New("missing critical headers") } @@ -190,7 +190,7 @@ func (worker *worker) resolveHandlers(isRPCreply bool, bm *BusMessage, delivery } } - worker.log().WithFields(log.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers") + worker.log().WithFields(logrus.Fields{"number_of_handlers": len(handlers)}).Info("found message handlers") return handlers } @@ -212,14 +212,14 @@ func (worker *worker) ack(delivery amqp.Delivery) error { func (worker *worker) reject(requeue bool, delivery amqp.Delivery) error { reject := func(attempts uint) error { return delivery.Reject(requeue /*multiple*/) } - worker.log().WithFields(log.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("rejecting message") + worker.log().WithFields(logrus.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("rejecting message") err := retry.Retry(reject, strategy.Wait(100*time.Millisecond)) if err != nil { worker.log().WithError(err).Error("could not reject the message") worker.span.LogFields(slog.Error(err)) } - worker.log().WithFields(log.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("message rejected") + worker.log().WithFields(logrus.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("message rejected") return err } @@ -283,7 +283,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { worker.span.Finish() }() - worker.log().WithFields(log.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") + worker.log().WithFields(logrus.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") //handle a message that originated from a deadletter exchange if worker.isDead(delivery) { @@ -307,7 +307,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { if len(handlers) == 0 { worker.log(). WithFields( - log.Fields{"message-name": bm.PayloadFQN, + logrus.Fields{"message-name": bm.PayloadFQN, "message-type": bm.Semantics}). Warn("Message received but no handlers found") worker.span.LogFields(slog.String("grabbit", "no handlers found")) @@ -359,9 +359,11 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan } worker.span.Finish() }() + var handlerErr error + var hspan opentracing.Span + var hsctx context.Context for _, handler := range handlers { - hspan, hsctx := opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()) - hspan.Finish() + hspan, hsctx = opentracing.StartSpanFromContext(sctx, runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name()) ctx := &defaultInvocationContext{ invocingSvc: delivery.ReplyTo, @@ -370,18 +372,26 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan tx: tx, ctx: hsctx, exchange: delivery.Exchange, - routingKey: delivery.RoutingKey} - handlerErr := handler(ctx, message) + routingKey: delivery.RoutingKey, + } + ctx.SetLogger(worker.log().WithField("handler", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name())) + handlerErr = handler(ctx, message) if handlerErr != nil { hspan.LogFields(slog.Error(handlerErr)) - if worker.isTxnl { - rbkErr := tx.Rollback() - if rbkErr != nil { - worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") - } + break + } + hspan.Finish() + } + if handlerErr != nil { + hspan.LogFields(slog.Error(handlerErr)) + if worker.isTxnl { + rbkErr := tx.Rollback() + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") } - return handlerErr } + hspan.Finish() + return handlerErr } if worker.isTxnl { cmtErr := tx.Commit() @@ -404,10 +414,8 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan )) } -func (worker *worker) log() *log.Entry { - - return log.WithFields(log.Fields{ - "_service": worker.svcName}) +func (worker *worker) log() logrus.FieldLogger { + return worker.b.Log() } func (worker *worker) AddRegistration(registration *Registration) {