Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

calling channel.Cancel when worker is stopped #149

Merged
merged 7 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions gbus/metrics/message_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ var (
rejectedMessages = newRejectedMessagesCounter()
)

//ResetRejectedMessagesCounter resets the counter intended to be used in tests only
func ResetRejectedMessagesCounter() {

prometheus.Unregister(rejectedMessages)
rejectedMessages = newRejectedMessagesCounter()
}

//ReportRejectedMessage reports a message being rejected to the metrics counter
func ReportRejectedMessage() {
rejectedMessages.Inc()
Expand All @@ -28,6 +35,7 @@ func GetRejectedMessagesValue() (float64, error) {
}

func newRejectedMessagesCounter() prometheus.Counter {

return promauto.NewCounter(prometheus.CounterOpts{
Namespace: grabbitPrefix,
Subsystem: "messages",
Expand Down
42 changes: 16 additions & 26 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,14 @@ func (worker *worker) Start() error {

func (worker *worker) Stop() error {
worker.log().Info("stopping worker")
close(worker.stop) // worker.stop <- true
e1 := worker.channel.Cancel(worker.consumerTag, false)
e2 := worker.channel.Cancel(worker.consumerTag+"_rpc", false)
if e1 != nil {
return e1
}
if e2 != nil {
return e2
}
return nil
}

Expand Down Expand Up @@ -192,6 +199,7 @@ func (worker *worker) reject(requeue bool, delivery amqp.Delivery) error {
if !requeue {
metrics.ReportRejectedMessage()
}

worker.log().WithFields(logrus.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("message rejected")
return err
}
Expand All @@ -205,34 +213,16 @@ func (worker *worker) isDead(delivery amqp.Delivery) bool {
}

func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
tx, txCreateErr := worker.txProvider.New()
if txCreateErr != nil {
worker.log().WithError(txCreateErr).Error("failed creating new tx")
worker.span.LogFields(slog.Error(txCreateErr))
_ = worker.reject(true, delivery)
return
}
err := metrics.RunHandlerWithMetric(func() error {
return worker.deadletterHandler(tx, &delivery)
}, worker.deadletterHandler.Name(), worker.log())

var reject bool
if err != nil {
worker.log().WithError(err).Error("failed handling deadletter")
worker.span.LogFields(slog.Error(err))
err = worker.SafeWithRetries(tx.Rollback, MaxRetryCount)
reject = true
} else {
err = worker.SafeWithRetries(tx.Commit, MaxRetryCount)
txWrapper := func(tx *sql.Tx) error {
handlerWrapper := func() error {
return worker.deadletterHandler(tx, &delivery)
}
return metrics.RunHandlerWithMetric(handlerWrapper, worker.deadletterHandler.Name(), worker.log())
}

err := worker.withTx(txWrapper)
if err != nil {
worker.log().WithError(err).Error("Rollback/Commit deadletter handler message")
worker.span.LogFields(slog.Error(err))
reject = true
}

if reject {
//we reject the deelivery but requeue it so the message will not be lost and recovered to the dlq
_ = worker.reject(true, delivery)
} else {
_ = worker.ack(delivery)
Expand Down
32 changes: 19 additions & 13 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,8 @@ func TestRPC(t *testing.T) {
}

func TestDeadlettering(t *testing.T) {
rejectedMessages, err := metrics.GetRejectedMessagesValue()
if err != nil {
t.Error("failed to get rejected messages value")
}
metrics.ResetRejectedMessagesCounter()

proceed := make(chan bool)
poison := gbus.NewBusMessage(PoisonMessage{})
service1 := createNamedBusForTest(testSvc1)
Expand Down Expand Up @@ -259,7 +257,7 @@ func TestDeadlettering(t *testing.T) {

<-proceed
count, _ := metrics.GetRejectedMessagesValue()
if count != rejectedMessages+1 {
if count != 1 {
t.Error("Should have one rejected message")
}

Expand Down Expand Up @@ -348,11 +346,7 @@ func TestReturnDeadToQueue(t *testing.T) {

func TestDeadLetterHandlerPanic(t *testing.T) {
proceed := make(chan bool)
rejectedMessages, err := metrics.GetRejectedMessagesValue()
if err != nil {
t.Error("failed to get rejected messages value")
}

metrics.ResetRejectedMessagesCounter()
poison := gbus.NewBusMessage(Command1{})
service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
Expand All @@ -361,6 +355,14 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
visited := false
deadMessageHandler := func(tx *sql.Tx, poison *amqp.Delivery) error {
/*
this handler will be called more than once since when grabbit rejects
a message from a deadletter queue to rejects it with the requeu option set to
true and that is why this will be called more than once even though the retry count
is set to 0

*/

if !visited {
visited = true
panic("PANIC DEAD HANDLER aaahhh!!!!!!")
Expand All @@ -374,7 +376,7 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
}

deadletterSvc.HandleDeadletter(deadMessageHandler)
err = service1.HandleMessage(Command1{}, faultyHandler)
err := service1.HandleMessage(Command1{}, faultyHandler)
if err != nil {
t.Error("failed to register faultyhandler")
}
Expand All @@ -388,8 +390,12 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
select {
case <-proceed:
count, _ := metrics.GetRejectedMessagesValue()
if count != rejectedMessages+2 {
t.Error("Should have 2 rejected messages")
//we expect only 1 rejcted meessage from the counter since rejected messages that get
//requeued are not reported to the metric so the counter won't be increment when the message
//in the dlq gets rejected as it is rejected with the requeue option set to true
if count != 1 {

t.Errorf("Should have 1 rejected messages but was %v", count)
}
case <-time.After(2 * time.Second):
t.Fatal("timeout, dlq failed to reject message after handler panicked")
Expand Down