Skip to content

Commit

Permalink
Update AMQP message processing loop
Browse files Browse the repository at this point in the history
Previously, msg.Ack(true) was used, which is mostly used for batch
processing. It mistakenly acknowledged all previously unacknowledged
messages, causing errors from the AMQP server. Now, messages are
acknowledged individually after each one is processed.

Messages that ERs failed to process are now rejected and requeued
for future processing attempts.

The reader is now closed immediately if the message delivery
channel closes. Therefore, it prevents an endless loop by avoiding
continuous consumption from empty or closed channels.

Addresses: cgrates#4146
  • Loading branch information
ionutboangiu committed Oct 16, 2023
1 parent d7aabe2 commit 72a8b45
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions ers/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,47 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) {
utils.ERs, rdr.dialURL))
rdr.close()
return
case msg := <-msgChan:
case msg, ok := <-msgChan:
if !ok {
utils.Logger.Warning(
fmt.Sprintf("<%s> lost connection to AMQP server at %s, closing reader...",
utils.ERs, rdr.dialURL))
rdr.close()
return
}
go func(msg amqp.Delivery) {
if err := rdr.processMessage(msg.Body); err != nil {
err := rdr.processMessage(msg.Body)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing message %s error: %s",
utils.ERs, msg.MessageId, err.Error()))

err = msg.Reject(true)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error negatively acknowledging message %s: %s",
utils.ERs, msg.MessageId, err.Error()))
}
return
}
if rdr.poster != nil { // post it
if err := ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString); err != nil {

// Post the message if poster is available.
if rdr.poster != nil {
err = ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, msg.MessageId, err.Error()))

}
}
msg.Ack(true)

err = msg.Ack(false)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error acknowledging message %s: %s",
utils.ERs, msg.MessageId, err.Error()))
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
}
Expand Down

0 comments on commit 72a8b45

Please sign in to comment.