Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug fix - when a deadletterhandler panics grabbit fails to reject the… #136

Merged
merged 6 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,22 @@ func (worker *worker) isDead(delivery amqp.Delivery) bool {
}

func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to reject the message in the recover function in the process message and not duplicate another recover function within the invokeDeadletterHandler.
In addition, it would probably be better to have the call to metrics.ReportRejected in the worker.reject function (and once there need to delete the call to metrics.ReportRejected from the processMessage function

defer func() {
if r := recover(); r != nil {
logEntry := worker.log().WithField("worker", worker.consumerTag)
if err, ok := r.(error); ok {
worker.span.LogFields(slog.Error(err))
logEntry = logEntry.WithError(err)
} else {
logEntry = logEntry.WithField("panic", r)
}
worker.span.LogFields(slog.String("panic", "failed to process message"))
logEntry.Error("failed to process message")
_ = worker.reject(false, delivery)
metrics.ReportRejectedMessage()
}
worker.span.Finish()
}()
tx, txCreateErr := worker.txProvider.New()
if txCreateErr != nil {
worker.log().WithError(txCreateErr).Error("failed creating new tx")
Expand Down
58 changes: 56 additions & 2 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ func TestRPC(t *testing.T) {
}

func TestDeadlettering(t *testing.T) {

rejectedMessages, err := metrics.GetRejectedMessagesValue()
if err != nil {
t.Error("failed to get rejected messages value")
}
var waitgroup sync.WaitGroup
waitgroup.Add(2)
poison := gbus.NewBusMessage(PoisonMessage{})
Expand Down Expand Up @@ -257,7 +260,7 @@ func TestDeadlettering(t *testing.T) {

waitgroup.Wait()
count, _ := metrics.GetRejectedMessagesValue()
if count != 1 {
if count != rejectedMessages+1 {
t.Error("Should have one rejected message")
}

Expand Down Expand Up @@ -325,6 +328,57 @@ func TestReturnDeadToQueue(t *testing.T) {
}
}

func TestDeadLetterHandlerPanic(t *testing.T) {
proceed := make(chan bool)
rejectedMessages, err := metrics.GetRejectedMessagesValue()
if err != nil {
t.Error("failed to get rejected messages value")
}

poison := gbus.NewBusMessage(Command1{})
service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
visited := false
deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error {
if !visited {
visited = true
panic("PANIC DEAD HANDLER aaahhh!!!!!!")
}
proceed <- true
return nil
}

faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
return errors.New("fail")
}

deadletterSvc.HandleDeadletter(deadMessageHandler)
err = service1.HandleMessage(Command1{}, faultyHandler)
if err != nil {
t.Error("failed to register faultyhandler")
}

deadletterSvc.Start()
defer deadletterSvc.Shutdown()
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poison)
select {
case <-proceed:
count, _ := metrics.GetRejectedMessagesValue()
if count != rejectedMessages+2 {
t.Error("Should have 2 rejected messages")
}
case <-time.After(2 * time.Second):
t.Fatal("timeout, dlq failed to reject message after handler panicked")
}

}

func TestRegistrationAfterBusStarts(t *testing.T) {
event := Event1{}
b := createBusForTest()
Expand Down