From 71e8928d39e00931582b77bb896c949314742827 Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sun, 21 Apr 2019 22:58:45 +0300 Subject: [PATCH 1/3] added more logs and error handling --- .circleci/config.yml | 5 +++ gbus/builder/builder.go | 20 +++++++---- gbus/bus.go | 72 +++++++++++++++++++++++++++++--------- gbus/outbox.go | 16 ++++++--- gbus/saga/def.go | 2 -- gbus/saga/glue.go | 19 +++++----- gbus/saga/instance.go | 2 +- gbus/saga/invocation.go | 2 +- gbus/serialization/avro.go | 9 ----- gbus/tx/mysql/sagastore.go | 9 +++-- gbus/tx/mysql/sanitize.go | 2 +- gbus/tx/mysql/txoutbox.go | 33 ++++++++++++----- gbus/tx/sagastore.go | 36 ++++++++++++------- go.mod | 1 - go.sum | 2 -- 15 files changed, 151 insertions(+), 79 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c2ba2b8..9d8e081 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -48,3 +48,8 @@ jobs: command: | go get github.com/mattn/goveralls goveralls -coverprofile=coverage.out -service=circle-ci -repotoken=$COVERALLS_TOKEN + - run: + name: Run linter + command: | + curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin v1.16.0 + golangci-lint run --skip-dirs=tests diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index c40eb9a..b57ee3c 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -2,7 +2,6 @@ package builder import ( "fmt" - "go/types" "sync" "time" @@ -14,7 +13,6 @@ import ( ) type defaultBuilder struct { - handlers []types.Type PrefetchCount uint connStr string purgeOnStartup bool @@ -34,9 +32,11 @@ type defaultBuilder struct { func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb := &gbus.DefaultBus{ - AmqpConnStr: builder.connStr, - PrefetchCount: 1, - Outgoing: &gbus.AMQPOutbox{}, + AmqpConnStr: builder.connStr, + PrefetchCount: 1, + Outgoing: &gbus.AMQPOutbox{ + SvcName: svcName, + }, SvcName: svcName, PurgeOnStartup: builder.purgeOnStartup, DelayedSubscriptions: [][]string{}, @@ -74,7 +74,10 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb.TxProvider = mysqltx sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx) if builder.purgeOnStartup { - sagaStore.Purge() + err := sagaStore.Purge() + if err != nil { + panic(err) + } } gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup) @@ -91,7 +94,10 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } if builder.purgeOnStartup { - sagaStore.Purge() + err := sagaStore.Purge() + if err != nil { + panic(err) + } } gb.Glue = saga.NewGlue(gb, sagaStore, svcName) return gb diff --git a/gbus/bus.go b/gbus/bus.go index accd63b..0a8fb09 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -39,8 +39,6 @@ type DefaultBus struct { RPCHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error - msgs <-chan amqp.Delivery - rpcMsgs <-chan amqp.Delivery HandlersLock *sync.Mutex RPCLock *sync.Mutex SenderLock *sync.Mutex @@ -118,17 +116,25 @@ func (b *DefaultBus) createServiceQueue() (amqp.Queue, error) { return q, e } -func (b *DefaultBus) bindServiceQueue() { +func (b *DefaultBus) bindServiceQueue() error { if b.deadletterHandler != nil && b.DLX != "" { - b.AMQPChannel.ExchangeDeclare(b.DLX, /*name*/ + err := b.AMQPChannel.ExchangeDeclare(b.DLX, /*name*/ "fanout", /*kind*/ true, /*durable*/ false, /*autoDelete*/ false, /*internal*/ false, /*noWait*/ nil /*args amqp.Table*/) - b.bindQueue("", b.DLX) + if err != nil { + b.log().WithError(err).Error("could not declare exchange") + return err + } + err = b.bindQueue("", b.DLX) + if err != nil { + b.log().WithError(err).Error("could not bind exchange") + return err + } } for _, subscription := range b.DelayedSubscriptions { topic := subscription[0] @@ -142,13 +148,16 @@ func (b *DefaultBus) bindServiceQueue() { nil /*args amqp.Table*/) if e != nil { b.log().WithError(e).WithField("exchange", exchange).Error("failed to declare exchange") + return e } else { e = b.bindQueue(topic, exchange) if e != nil { b.log().WithError(e).WithFields(log.Fields{"topic": topic, "exchange": exchange}).Error("failed to bind topic to exchange") + return e } } } + return nil } func (b *DefaultBus) createAMQPChannel(conn *amqp.Connection) (*amqp.Channel, error) { @@ -184,7 +193,7 @@ func (b *DefaultBus) Start() error { //TODO:Figure out what should be done //init the outbox that sends the messages to the amqp transport and handles publisher confirms - if b.Outgoing.init(b.outAMQPChannel, b.Confirm, true); e != nil { + if e := b.Outgoing.init(b.outAMQPChannel, b.Confirm, true); e != nil { return e } /* @@ -199,8 +208,14 @@ func (b *DefaultBus) Start() error { return e } amqpChan.NotifyClose(b.amqpErrors) - amqpOutbox := &AMQPOutbox{} - amqpOutbox.init(amqpChan, b.Confirm, false) + amqpOutbox := &AMQPOutbox{ + SvcName: b.SvcName, + } + err := amqpOutbox.init(amqpChan, b.Confirm, false) + if err != nil { + b.log().WithError(err).Error("failed initializing amqpOutbox") + return err + } if startErr := b.Outbox.Start(amqpOutbox); startErr != nil { b.log().WithError(startErr).Error("failed to start transactional outbox") return startErr @@ -216,7 +231,11 @@ func (b *DefaultBus) Start() error { b.serviceQueue = q //bind queue - b.bindServiceQueue() + err := b.bindServiceQueue() + if err != nil { + b.log().WithError(err).Error("could not bind service to queue") + return err + } //declare rpc queue @@ -273,7 +292,12 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { serializer: b.Serializer, b: b, amqpErrors: b.amqpErrors} - go w.Start() + go func() { + err := w.Start() + if err != nil { + log.WithError(err) + } + }() workers = append(workers, w) } @@ -293,7 +317,10 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { }() for _, worker := range b.workers { - worker.Stop() + err := worker.Stop() + if err != nil { + b.log().WithError(err).Error("could not stop worker") + } } b.Outgoing.shutdown() @@ -301,7 +328,11 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { if b.IsTxnl { b.TxProvider.Dispose() - b.Outbox.Stop() + err := b.Outbox.Stop() + if err != nil { + b.log().WithError(err).Error("could not shutdown outbox") + return err + } } return nil } @@ -366,11 +397,14 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er */ if b.IsTxnl && shouldCommitTx { if actionErr != nil { - activeTx.Rollback() + err := activeTx.Rollback() + if err != nil { + b.log().WithError(err).Error("could not rollback transaction") + } } else { commitErr := activeTx.Commit() if commitErr != nil { - return commitErr + b.log().WithError(commitErr).Error("could not commit transaction") } } } @@ -406,7 +440,11 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu rpcID: rpcID} b.Serializer.Register(reply.Payload) - b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc) + err := b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc) + if err != nil { + b.log().WithError(err).Error("could not send message") + return nil, err + } //wait for reply or timeout select { @@ -549,10 +587,10 @@ func (b *DefaultBus) monitorAMQPErrors() { } } -func (b *DefaultBus) sendImpl(ctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) { +func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) { b.SenderLock.Lock() defer b.SenderLock.Unlock() - span, ctx := opentracing.StartSpanFromContext(ctx, "sendImpl") + span, _ := opentracing.StartSpanFromContext(sctx, "sendImpl") defer func() { if err := recover(); err != nil { errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack()) diff --git a/gbus/outbox.go b/gbus/outbox.go index c709d4e..323a37a 100644 --- a/gbus/outbox.go +++ b/gbus/outbox.go @@ -1,9 +1,9 @@ package gbus import ( - "log" "sync" + log "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) @@ -19,6 +19,7 @@ type AMQPOutbox struct { locker *sync.Mutex pending map[uint64]pendingConfirmation stop chan bool + SvcName string } func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) error { @@ -96,7 +97,7 @@ func (out *AMQPOutbox) confirmationLoop() { out.locker.Lock() pending := out.pending[ack] if pending.deliveryTag > 0 { - log.Printf("ack received for a pending delivery with tag %v", ack) + out.log().WithField("tag", ack).Debug("ack received for a pending delivery with tag") } delete(out.pending, ack) out.locker.Unlock() @@ -104,7 +105,7 @@ func (out *AMQPOutbox) confirmationLoop() { if nack <= 0 { continue } - log.Printf("nack received for delivery tag %v", nack) + out.log().WithField("tag", nack).Debug("nack received for a pending delivery with tag") out.locker.Lock() pending := out.pending[nack] pending.deliveryTag = nack @@ -112,7 +113,10 @@ func (out *AMQPOutbox) confirmationLoop() { delete(out.pending, nack) out.locker.Unlock() case resend := <-out.resends: - out.Post(resend.exchange, resend.routingKey, resend.amqpMessage) + _, err := out.Post(resend.exchange, resend.routingKey, resend.amqpMessage) + if err != nil { + out.log().WithError(err).Error("could not post message to exchange") + } } } } @@ -131,6 +135,10 @@ func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64) { out.channel.NotifyConfirm(ack, nack) } +func (out *AMQPOutbox) log() *log.Entry { + return log.WithField("_service", out.SvcName) +} + type pendingConfirmation struct { deliveryTag uint64 exchange string diff --git a/gbus/saga/def.go b/gbus/saga/def.go index dac8735..b09721f 100644 --- a/gbus/saga/def.go +++ b/gbus/saga/def.go @@ -22,10 +22,8 @@ type Def struct { sagaType reflect.Type startedBy []string lock *sync.Mutex - instances []*Instance sagaConfFns []gbus.SagaConfFn msgToFunc []*MsgToFuncPair - msgHandler gbus.MessageHandler } //HandleMessage implements HandlerRegister interface diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 61643ba..b2f6163 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -24,15 +24,14 @@ var _ gbus.SagaRegister = &Glue{} //Glue ties the incoming messages from the Bus with the needed Saga instances type Glue struct { - svcName string - bus gbus.Bus - sagaDefs []*Def - lock *sync.Mutex - alreadyRegistred map[string]bool - msgToDefMap map[string][]*Def - sagaStore Store - timeoutManger TimeoutManager - subscribedOnTimeouts bool + svcName string + bus gbus.Bus + sagaDefs []*Def + lock *sync.Mutex + alreadyRegistred map[string]bool + msgToDefMap map[string][]*Def + sagaStore Store + timeoutManger TimeoutManager } func (imsm *Glue) isSagaAlreadyRegistered(sagaType reflect.Type) bool { @@ -139,7 +138,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return e } - if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout == true { + if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { imsm.log().WithFields(log.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") imsm.timeoutManger.RequestTimeout(imsm.svcName, newInstance.ID, duration) } diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 62fd902..50c8d1c 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -47,7 +47,7 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati returns := method.Call(params) val := returns[0] - if val.IsNil() == false { + if !val.IsNil() { return val.Interface().(error) } diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index 4740389..3ffa3d8 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -20,7 +20,7 @@ func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bo message.CorrelationID = si.inboundMsg.ID - if isEvent == false { + if !isEvent { //support saga-to-saga communication if si.inboundMsg.SagaID != "" { message.SagaCorrelationID = message.SagaID diff --git a/gbus/serialization/avro.go b/gbus/serialization/avro.go index 364a20f..a13ef5e 100644 --- a/gbus/serialization/avro.go +++ b/gbus/serialization/avro.go @@ -193,15 +193,6 @@ func (as *Avro) RegisterAvroMessage(schemaName, namespace, schema string, obj Av return } -//getSchema get schema id from schema-registry service -func (as *Avro) getSchema(id int) (codec *goavro.Codec, err error) { - codec, err = as.schemaRegistryClient.GetSchema(id) - if err != nil { - return nil, err - } - return codec, nil -} - //registerOrGetSchemaID get schema id from schema-registry service func (as *Avro) registerOrGetSchemaID(topic string, avroCodec *goavro.Codec) (schemaID int, err error) { schemaID = 0 diff --git a/gbus/tx/mysql/sagastore.go b/gbus/tx/mysql/sagastore.go index 75e263c..e458041 100644 --- a/gbus/tx/mysql/sagastore.go +++ b/gbus/tx/mysql/sagastore.go @@ -21,7 +21,7 @@ func (store *SagaStore) log() *log.Entry { func (store *SagaStore) ensureSchema() { store.log().Info("ensuring saga schema exists") - if tablesExists := store.sagaTablesExist(); tablesExists == false { + if tablesExists := store.sagaTablesExist(); !tablesExists { store.log().Info("could not find saga schema, attempting to creat schema") @@ -33,7 +33,12 @@ func (store *SagaStore) sagaTablesExist() bool { tblName := store.GetSagatableName() tx := store.NewTx() - defer tx.Commit() + defer func() { + err := tx.Commit() + if err != nil { + store.log().WithError(err).Error("could not commit sagaTablesExist") + } + }() selectSQL := `SELECT 1 FROM ` + tblName + ` LIMIT 1;` diff --git a/gbus/tx/mysql/sanitize.go b/gbus/tx/mysql/sanitize.go index 192a6e7..224b0d1 100644 --- a/gbus/tx/mysql/sanitize.go +++ b/gbus/tx/mysql/sanitize.go @@ -4,7 +4,7 @@ import "regexp" func sanitizeTableName(dirty string) string { - var re = regexp.MustCompile("-|;|\\|") + var re = regexp.MustCompile(`-|;|\\|`) sanitized := re.ReplaceAllString(dirty, "") return sanitized } diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 21660ba..d881c7c 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -16,9 +16,9 @@ import ( ) var ( - pending int - waitingConfirm = 1 - confirmed = 2 + pending int + //waitingConfirm = 1 + confirmed = 2 //TODO:get these values from configuration maxPageSize = 500 maxDeliveryAttempts = 50 @@ -52,13 +52,19 @@ func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error { panic(fmt.Sprintf("passed in transaction provider failed with the following error\n%s", e)) } if ensureErr := outbox.ensureSchema(tx, outbox.svcName); ensureErr != nil { - tx.Rollback() + err := tx.Rollback() + if err != nil { + outbox.log().WithError(err).Error("could not rollback the transaction for creation of schemas") + } return ensureErr } if outbox.purgeOnStartup { if purgeErr := outbox.purge(tx); purgeErr != nil { outbox.log().WithError(purgeErr).Error("failed to purge transactional outbox") - tx.Rollback() + err := tx.Rollback() + if err != nil { + outbox.log().WithError(err).Error("could not rollback the transaction for purge") + } return purgeErr } } @@ -171,7 +177,10 @@ func (outbox *TxOutbox) deleteCompletedRecords() error { if execErr != nil { outbox.log().WithError(execErr).Error("failed to delete processed records") - tx.Rollback() + err := tx.Rollback() + if err != nil { + outbox.log().WithError(err).Error("could not rollback the transaction for deleting completed records") + } return execErr } @@ -198,7 +207,10 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { outbox.log().WithError(execErr). WithFields(log.Fields{"delivery_tag": deliveryTag, "relay_id": outbox.ID}). Error("failed to update delivery tag") - tx.Rollback() + err := tx.Rollback() + if err != nil { + outbox.log().WithError(err).Error("could not rollback update in outbox") + } } return tx.Commit() } @@ -230,7 +242,7 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, return selectErr } - successfulDeliveries := make(map[uint64]int, 0) + successfulDeliveries := make(map[uint64]int) failedDeliveries := make([]int, 0) for rows.Next() { @@ -266,7 +278,10 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, successfulDeliveries[deliveryTag] = recID } } - rows.Close() + err := rows.Close() + if err != nil { + outbox.log().WithError(err).Error("could not close Rows") + } for deliveryTag, id := range successfulDeliveries { _, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET status=1, delivery_tag=?, relay_id=? WHERE rec_id=?", deliveryTag, outbox.ID, id) diff --git a/gbus/tx/sagastore.go b/gbus/tx/sagastore.go index 78c45e0..07df2c0 100644 --- a/gbus/tx/sagastore.go +++ b/gbus/tx/sagastore.go @@ -65,7 +65,12 @@ func (store *SagaStore) GetSagasByType(tx *sql.Tx, sagaType reflect.Type) (insta selectSQL := "SELECT saga_id, saga_type, saga_data, version FROM " + tblName + " WHERE saga_type=" + store.ParamsMarkers[0] rows, err := tx.Query(selectSQL, sagaType.String()) - defer rows.Close() + defer func() { + err := rows.Close() + if err != nil { + store.log().WithError(err).Error("could not close rows") + } + }() if err != nil { return nil, err @@ -121,18 +126,23 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, tblName := store.GetSagatableName() selectSQL := `SELECT saga_id, saga_type, saga_data, version FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + `` - rows, error := tx.Query(selectSQL, sagaID) - defer rows.Close() - if error != nil { - store.log().WithError(error). + rows, err := tx.Query(selectSQL, sagaID) + defer func() { + err := rows.Close() + if err != nil { + store.log().WithError(err).Error("could not close rows") + } + }() + if err != nil { + store.log().WithError(err). WithFields(log.Fields{"saga_id": sagaID, "table_name": store.GetSagatableName()}). Error("Failed to fetch saga") - return nil, error + return nil, err } - instances, error := store.scanInstances(rows) - if error != nil { - return nil, error + instances, err := store.scanInstances(rows) + if err != nil { + return nil, err } if len(instances) == 0 { return nil, fmt.Errorf("no saga found for saga with saga_id:%v", sagaID) @@ -191,9 +201,9 @@ func (store *SagaStore) serilizeSaga(instance *saga.Instance) ([]byte, error) { //NewTx creates a new transaction from the underlying TxProvider func (store *SagaStore) NewTx() *sql.Tx { - tx, error := store.Tx.New() - if error != nil { - e := fmt.Errorf("can't initialize sage store.\nerror:\n%s", error) + tx, err := store.Tx.New() + if err != nil { + e := fmt.Errorf("can't initialize sage store.\nerror:\n%s", err) panic(e) } @@ -203,7 +213,7 @@ func (store *SagaStore) NewTx() *sql.Tx { //GetSagatableName returns the table name in which to store the Sagas func (store *SagaStore) GetSagatableName() string { - var re = regexp.MustCompile("-|;|\\|") + var re = regexp.MustCompile(`-|;|\\|`) sanitized := re.ReplaceAllString(store.SvcName, "") return strings.ToLower("grabbit_" + sanitized + "_sagas") diff --git a/go.mod b/go.mod index 61a9675..d143cbb 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/golang/protobuf v1.3.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kr/pretty v0.1.0 // indirect - github.com/lib/pq v1.1.0 github.com/linkedin/goavro v2.1.0+incompatible github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect diff --git a/go.sum b/go.sum index 2ea3a46..6078870 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v1.1.0 h1:/5u4a+KGJptBRqGzPvYQL9p0d/tPR4S31+Tnzj9lEO4= -github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= From 7899cf96f96d434199bc56582098330c66496186 Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Sun, 21 Apr 2019 23:00:00 +0300 Subject: [PATCH 2/3] removed pg from circle build --- .circleci/config.yml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9d8e081..39e312f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -9,10 +9,6 @@ jobs: RABBITMQ_DEFAULT_USER: rabbitmq RABBITMQ_DEFAULT_PASS: rabbitmq RABBITMQ_DEFAULT_VHOST: / - - image: postgres - environment: - POSTGRES_PASSWORD: rhinof - POSTGRES_USER: rhinof - image: mysql command: --default-authentication-plugin=mysql_native_password environment: @@ -28,9 +24,6 @@ jobs: command: wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz environment: DOCKERIZE_VERSION: v0.6.1 - - run: - name: Wait for pg - command: dockerize -wait tcp://localhost:5432 -timeout 1m - run: name: Wait for mysql command: dockerize -wait tcp://localhost:3306 -timeout 1m From 5aa160e7dc976e59c9b1473f7f72647cdb0cf0d6 Mon Sep 17 00:00:00 2001 From: Vladislav Shub Date: Mon, 22 Apr 2019 11:14:55 +0300 Subject: [PATCH 3/3] code review changes --- gbus/bus.go | 1 + 1 file changed, 1 insertion(+) diff --git a/gbus/bus.go b/gbus/bus.go index 0a8fb09..61864cc 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -405,6 +405,7 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er commitErr := activeTx.Commit() if commitErr != nil { b.log().WithError(commitErr).Error("could not commit transaction") + return commitErr } } }