From 11d78416dfd3d90a034beec6c4d71bb0094e3cbf Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 13 Oct 2019 12:00:41 +0300 Subject: [PATCH] fixing issue with txoutbox failing to deliver a message after 50 failed attempts https://github.com/wework/grabbit/issues/203 --- gbus/tx/mysql/txoutbox.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 0b2edc2..93458a7 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -24,12 +24,11 @@ const ( ) var ( - maxPageSize = 500 - maxDeliveryAttempts = 50 - sendInterval = time.Second - scavengeInterval = time.Second * 60 - metricsInterval = time.Second * 15 - ackers = 10 + maxPageSize = 500 + sendInterval = time.Second + scavengeInterval = time.Second * 60 + metricsInterval = time.Second * 15 + ackers = 10 ) //TxOutbox is a mysql based transactional outbox @@ -288,7 +287,7 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { } func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) { - selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = " + strconv.Itoa(Pending) + " AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" + selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = " + strconv.Itoa(Pending) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" return tx.Query(selectSQL) }