Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle empty body messages #147

Merged
merged 17 commits into from
Aug 31, 2019
17 changes: 11 additions & 6 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,13 @@ func (worker *worker) createMessagesChannel(q amqp.Queue, consumerTag string) (<
func (worker *worker) consumeMessages() {

for msg := range worker.messages {
if msg.Body == nil || len(msg.Body) == 0 {
continue
}
worker.processMessage(msg, false)
}
}

func (worker *worker) consumeRPC() {

for msg := range worker.rpcMessages {
if msg.Body == nil || len(msg.Body) == 0 {
continue
}
worker.processMessage(msg, true)
}
}
Expand Down Expand Up @@ -311,6 +305,17 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
_ = worker.ack(delivery)
return
}

if delivery.Body == nil || len(delivery.Body) == 0 {
worker.log().
WithFields(
logrus.Fields{"message-name": msgName}).
Warn("body is missing for message. Cannot invoke handlers.")
worker.span.LogFields(slog.String("grabbit", "no body found"))
// if there are handlers registered for this type of message, it's a bug and the message must be rejected.
_ = worker.reject(false, delivery)
return
}
/*
extract the bus message only after we are sure there are registered handlers since
it includes deserializing the amqp payload which we want to avoid if no handlers are found
Expand Down
Loading