Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing goreportcard issues #118

Merged
merged 1 commit into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ debug

# Test binary, built with `go test -c`
*.test

.vscode
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
.DS_Store
1 change: 1 addition & 0 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
publishing.Headers[RpcHeaderName] = p.rpcID
}

//Log returns the default logrus.FieldLogger for the bus via the Glogged helper
func (b *DefaultBus) Log() logrus.FieldLogger {
if b.Glogged == nil {
b.Glogged = &Glogged{
Expand Down
6 changes: 3 additions & 3 deletions gbus/logged.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// Copyright © 2019 Vladislav Shub <vladislav.shub@wework.com>
// All rights reserved to the We Company.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must protest... ;)


package gbus

import (
Expand All @@ -9,17 +6,20 @@ import (

var _ Logged = &Glogged{}

//Glogged provides an easy way for structs with in the grabbit package to participate in the general logging schema of the bus
type Glogged struct {
log logrus.FieldLogger
}

//SetLogger sets the default logrus.FieldLogger that should be used when logging a new message
func (gl *Glogged) SetLogger(entry logrus.FieldLogger) {
if gl == nil {
gl = &Glogged{}
}
gl.log = entry
}

//Log returns the set default log or a new instance of a logrus.FieldLogger
func (gl *Glogged) Log() logrus.FieldLogger {
if gl == nil {
gl = &Glogged{}
Expand Down
1 change: 1 addition & 0 deletions gbus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (bm *BusMessage) SetPayload(payload Message) {
bm.Payload = payload
}

//GetTraceLog returns an array of log entires containing all of the message properties
func (bm *BusMessage) GetTraceLog() (fields []log.Field) {
return []log.Field{
log.String("message", bm.PayloadFQN),
Expand Down
6 changes: 6 additions & 0 deletions gbus/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"strings"
)

//MessageFilter matches rabbitmq topic patterns
type MessageFilter struct {
Exchange string
RoutingKey string
MsgName string
}

//Matches the passed in exchange, routingKey, msgName with the defined filter
func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool {

targetExchange := strings.ToLower(exchange)
Expand Down Expand Up @@ -59,22 +61,26 @@ func matchWords(input, pattern []string) bool {
return false
}

//Registration represents a message handler's registration for a given exchange, topic and msg combination
type Registration struct {
info *MessageFilter
Handler MessageHandler
}

//Matches the registration with the given xchange, routingKey, msgName
func (sub *Registration) Matches(exchange, routingKey, msgName string) bool {
return sub.info.Matches(exchange, routingKey, msgName)
}

//NewRegistration creates a new registration
func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration {
reg := Registration{
info: NewMessageFilter(exchange, routingKey, message),
Handler: handler}
return &reg
}

//NewMessageFilter creates a new MessageFilter
func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter {
filter := &MessageFilter{
Exchange: strings.ToLower(exchange),
Expand Down
4 changes: 3 additions & 1 deletion gbus/saga/def.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package saga

import (
"github.com/wework/grabbit/gbus/metrics"
"reflect"
"sync"

"github.com/wework/grabbit/gbus/metrics"

"github.com/wework/grabbit/gbus"
)

var _ gbus.HandlerRegister = &Def{}

//MsgToFuncPair helper struct
type MsgToFuncPair struct {
Filter *gbus.MessageFilter
SagaFuncName string
Expand Down
6 changes: 4 additions & 2 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package saga
import (
"database/sql"
"fmt"
"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus/metrics"
"reflect"
"time"

"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus/metrics"

"github.com/rs/xid"
"github.com/wework/grabbit/gbus"
)
Expand Down Expand Up @@ -104,6 +105,7 @@ func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error {
return saga.Timeout(tx, bus)
}

//NewInstance creates a new saga instance
func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance {

var newSagaPtr interface{}
Expand Down
6 changes: 3 additions & 3 deletions gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
recID := outbox.recordsPendingConfirms[deliveryTag]
outbox.gl.Unlock()
/*
since the messages get sent to rabbitmq and then the outbox table gets updated with the deilvery tag for teh record
it may be that we recived a acked deliveryTag that is not yet registered in the outbox table.
since the messages get sent to rabbitmq and then the outbox table gets updated with the deilvery tag for the record
it may be that we received a acked deliveryTag that is not yet registered in the outbox table.
in that case we just place the deliveryTag back in the ack channel so it can be picked up and re processed later
we place it in the channel using a new goroutine so to not deadlock if there is only a single goroutine draining the ack channel
*/
Expand Down Expand Up @@ -310,7 +310,7 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows,
if cmtErr := tx.Commit(); cmtErr != nil {
outbox.log().WithError(cmtErr).Error("Error committing outbox transaction")
} else {
//only after the tx has commited successfully add the recordids so they can be picked up by confirms
//only after the tx has committed successfully add the recordids so they can be picked up by confirms
outbox.gl.Lock()
defer outbox.gl.Unlock()
for deliveryTag, recID := range successfulDeliveries {
Expand Down
7 changes: 4 additions & 3 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/wework/grabbit/gbus/metrics"
"math/rand"
"runtime/debug"
"sync"
"time"

"github.com/wework/grabbit/gbus/metrics"

"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/jitter"
Expand Down Expand Up @@ -388,7 +389,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
}
ctx.SetLogger(worker.log().WithField("handler", handler.Name()))
handlerErr = metrics.RunHandlerWithMetric(func() error {
return handler(ctx, message)
return handler(ctx, message)
}, handler.Name(), worker.log())
if handlerErr != nil {
hspan.LogFields(slog.Error(handlerErr))
Expand All @@ -410,7 +411,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
if worker.isTxnl {
cmtErr := tx.Commit()
if cmtErr != nil {
worker.log().WithError(cmtErr).Error("failed commiting transaction after invoking handlers")
worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers")
return cmtErr
}
}
Expand Down