From 0eda828ffad4188fb9639163c188867fdabb93c8 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 23 Apr 2019 14:04:14 +0300 Subject: [PATCH 1/8] updating akc by recID to avoid a full table scan --- gbus/tx/mysql/txoutbox.go | 46 +++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index d881c7c..5be1a36 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" "github.com/rs/xid" @@ -29,15 +30,16 @@ var ( //TxOutbox is a mysql based transactional outbox type TxOutbox struct { - svcName string - txProv gbus.TxProvider - purgeOnStartup bool - ID string - amqpOutbox *gbus.AMQPOutbox - - ack chan uint64 - nack chan uint64 - exit chan bool + svcName string + txProv gbus.TxProvider + purgeOnStartup bool + ID string + amqpOutbox *gbus.AMQPOutbox + recordsPendingConfirms map[uint64]int + ack chan uint64 + nack chan uint64 + exit chan bool + gl *sync.Mutex } func (outbox *TxOutbox) log() *log.Entry { @@ -46,7 +48,8 @@ func (outbox *TxOutbox) log() *log.Entry { //Start starts the transactional outbox that is used to send messages in sync with domain object change func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error { - + outbox.gl = &sync.Mutex{} + outbox.recordsPendingConfirms = make(map[uint64]int) tx, e := outbox.txProv.New() if e != nil { panic(fmt.Sprintf("passed in transaction provider failed with the following error\n%s", e)) @@ -133,7 +136,7 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool) *TxO func (outbox *TxOutbox) processOutbox() { send := time.NewTicker(sendInterval).C - cleanUp := time.NewTicker(cleanupInterval).C + // cleanUp := time.NewTicker(cleanupInterval).C scavenge := time.NewTicker(scavengeInterval).C for { @@ -146,11 +149,6 @@ func (outbox *TxOutbox) processOutbox() { if err != nil { outbox.log().WithError(err).Error("failed to send messages from outbox") } - case <-cleanUp: - err := outbox.deleteCompletedRecords() - if err != nil { - outbox.log().WithError(err).Error("failed to delete completed records") - } case <-scavenge: err := outbox.sendMessages(outbox.scavengeOrphanedRecords) if err != nil { @@ -201,8 +199,15 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { } outbox.log().WithField("delivery_tag", deliveryTag).Info("ack received for delivery tag") - updateSQL := "UPDATE " + getOutboxName(outbox.svcName) + " SET status=? WHERE delivery_tag=? AND relay_id=?" - _, execErr := tx.Exec(updateSQL, confirmed, deliveryTag, outbox.ID) + outbox.gl.Lock() + recID := outbox.recordsPendingConfirms[deliveryTag] + outbox.gl.Unlock() + if recID == 0 { + go func() { outbox.ack <- deliveryTag }() + } + // updateSQL := "UPDATE " + getOutboxName(outbox.svcName) + " SET status=? WHERE delivery_tag=? AND relay_id=?" + updateSQL := "DELETE FROM " + getOutboxName(outbox.svcName) + " WHERE rec_id=?" + _, execErr := tx.Exec(updateSQL, recID) if execErr != nil { outbox.log().WithError(execErr). WithFields(log.Fields{"delivery_tag": deliveryTag, "relay_id": outbox.ID}). @@ -244,7 +249,8 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, successfulDeliveries := make(map[uint64]int) failedDeliveries := make([]int, 0) - + outbox.gl.Lock() + defer outbox.gl.Unlock() for rows.Next() { var ( recID int @@ -276,6 +282,8 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, failedDeliveries = append(failedDeliveries, recID) } else { successfulDeliveries[deliveryTag] = recID + outbox.recordsPendingConfirms[deliveryTag] = recID + } } err := rows.Close() From f84d0729a061ed2706204a41fea794d82ea236dd Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 23 Apr 2019 14:21:55 +0300 Subject: [PATCH 2/8] fixing go lint errors --- gbus/tx/mysql/txoutbox.go | 35 +++++------------------------------ 1 file changed, 5 insertions(+), 30 deletions(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 5be1a36..59783ec 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -19,13 +19,13 @@ import ( var ( pending int //waitingConfirm = 1 - confirmed = 2 + //TODO:get these values from configuration maxPageSize = 500 maxDeliveryAttempts = 50 sendInterval = time.Second - cleanupInterval = time.Second * 30 - scavengeInterval = time.Second * 60 + + scavengeInterval = time.Second * 60 ) //TxOutbox is a mysql based transactional outbox @@ -164,33 +164,6 @@ func (outbox *TxOutbox) processOutbox() { } } -func (outbox *TxOutbox) deleteCompletedRecords() error { - - tx, txErr := outbox.txProv.New() - if txErr != nil { - return txErr - } - deleteSQL := "DELETE FROM " + getOutboxName(outbox.svcName) + " WHERE status=?" - result, execErr := tx.Exec(deleteSQL, confirmed) - if execErr != nil { - outbox.log().WithError(execErr).Error("failed to delete processed records") - - err := tx.Rollback() - if err != nil { - outbox.log().WithError(err).Error("could not rollback the transaction for deleting completed records") - } - return execErr - } - - commitErr := tx.Commit() - records, ree := result.RowsAffected() - if commitErr == nil && ree == nil && records > 0 { - outbox.log().WithField("records", records).Info("cleaned records from outbox") - } - - return commitErr -} - func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { tx, txErr := outbox.txProv.New() if txErr != nil { @@ -202,6 +175,8 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { outbox.gl.Lock() recID := outbox.recordsPendingConfirms[deliveryTag] outbox.gl.Unlock() + //since the message gets sent to rabbitmq and then the tx completes we may get an ack for a record that is + // if recID == 0 { go func() { outbox.ack <- deliveryTag }() } From fca00e4081d650e4c56412bf473a0b8b9cd4d354 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 23 Apr 2019 19:23:05 +0300 Subject: [PATCH 3/8] added query hint to txoutbox query --- gbus/tx/mysql/txoutbox.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 59783ec..f822625 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -196,7 +196,7 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { } func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) { - selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" + selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" return tx.Query(selectSQL) } From bbb408a990e8ecf22f5514a2467ba6f91c2302d9 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 23 Apr 2019 22:53:01 +0300 Subject: [PATCH 4/8] inceased size of ack/nack chan and draining from multiple goroutines to prevent streadway/amqp to deadlock --- gbus/tx/mysql/txoutbox.go | 48 ++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index f822625..7d2fe46 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -26,6 +26,7 @@ var ( sendInterval = time.Second scavengeInterval = time.Second * 60 + ackers = 10 ) //TxOutbox is a mysql based transactional outbox @@ -78,6 +79,10 @@ func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error { outbox.amqpOutbox.NotifyConfirm(outbox.ack, outbox.nack) go outbox.processOutbox() + for i := 0; i < ackers; i++ { + go outbox.ackRec() + } + return nil } @@ -127,12 +132,30 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool) *TxO txProv: txProv, purgeOnStartup: purgeOnStartup, ID: xid.New().String(), - ack: make(chan uint64, 10000), - nack: make(chan uint64, 10000), + ack: make(chan uint64, 1000000), + nack: make(chan uint64, 1000000), exit: make(chan bool)} return txo } +func (outbox *TxOutbox) ackRec() { + for { + select { + case <-outbox.exit: + return + case ack := <-outbox.ack: + outbox.log().WithField("channel_len", len(outbox.ack)).Debug("length of ack channel") + if err := outbox.updateAckedRecord(ack); err != nil { + outbox.log().WithError(err).WithField("delivery_tag", ack).Error("failed to update delivery tag") + } + case nack := <-outbox.nack: + outbox.log().WithField("deliver_tag", nack).Info("nack received for delivery tag") + outbox.log().WithField("channel_len", len(outbox.nack)).Debug("length of nack channel") + + } + } +} + func (outbox *TxOutbox) processOutbox() { send := time.NewTicker(sendInterval).C @@ -142,24 +165,21 @@ func (outbox *TxOutbox) processOutbox() { for { select { case <-outbox.exit: + outbox.log().Info("on the moo again...") return //TODO:get time duration from configuration case <-send: + err := outbox.sendMessages(outbox.getMessageRecords) if err != nil { outbox.log().WithError(err).Error("failed to send messages from outbox") } + case <-scavenge: err := outbox.sendMessages(outbox.scavengeOrphanedRecords) if err != nil { outbox.log().WithError(err).Error("failed to scavenge records") } - case ack := <-outbox.ack: - if err := outbox.updateAckedRecord(ack); err != nil { - outbox.log().WithError(err).WithField("delivery_tag", ack).Error("failed to update delivery tag") - } - case nack := <-outbox.nack: - outbox.log().WithField("deliver_tag", nack).Info("nack received for delivery tag") } } } @@ -196,7 +216,7 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { } func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) { - selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" + selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" return tx.Query(selectSQL) } @@ -315,7 +335,8 @@ func (outbox *TxOutbox) ensureSchema(tx *sql.Tx, svcName string) error { delivery_tag bigint(20) NOT NULL, delivery_attempts int NOT NULL DEFAULT 0, insert_date timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(rec_id))` + PRIMARY KEY(rec_id), + INDEX status_delivery (rec_id, status, delivery_attempts))` _, createErr := tx.Exec(createTablesSQL) @@ -346,7 +367,12 @@ func (outbox *TxOutbox) migrate0_9To1_0(tx *sql.Tx, svcName string) error { alter := `ALTER TABLE ` + tblName + ` CHANGE COLUMN delivery_attemtps delivery_attempts int NOT NULL DEFAULT 0;` _, execErr := tx.Exec(alter) if execErr != nil { - outbox.log().WithField("sql_err", execErr).Info("renaming column") + outbox.log().WithField("sql_err", execErr).Info("migration:renaming column") + } + addIndex := `ALTER TABLE ` + tblName + ` ADD INDEX status_delivery (rec_id, status, delivery_attempts);` + _, indexErr := tx.Exec(addIndex) + if indexErr != nil { + outbox.log().WithField("sql_err", execErr).Info("migration:adding index column") } return nil } From b226b0fab166347eb4dec951f80d747f23719502 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 23 Apr 2019 23:08:49 +0300 Subject: [PATCH 5/8] cleaned up code and added comments --- gbus/tx/mysql/txoutbox.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 7d2fe46..eaa9bf7 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -165,7 +165,6 @@ func (outbox *TxOutbox) processOutbox() { for { select { case <-outbox.exit: - outbox.log().Info("on the moo again...") return //TODO:get time duration from configuration case <-send: @@ -195,14 +194,18 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { outbox.gl.Lock() recID := outbox.recordsPendingConfirms[deliveryTag] outbox.gl.Unlock() - //since the message gets sent to rabbitmq and then the tx completes we may get an ack for a record that is - // + /* + since the messages get sent to rabbitmq and then the outbox table gets updated with the deilvery tag for teh record + it may be that we recived a acked deliveryTag that is not yet registered in the outbox table. + in that case we just place the deliveryTag back in the ack channel so it can be picked up and re processed later + we place it in the channel using a new goroutine so to not deadlock if there is only a single goroutine draining the ack channel + */ if recID == 0 { go func() { outbox.ack <- deliveryTag }() } - // updateSQL := "UPDATE " + getOutboxName(outbox.svcName) + " SET status=? WHERE delivery_tag=? AND relay_id=?" - updateSQL := "DELETE FROM " + getOutboxName(outbox.svcName) + " WHERE rec_id=?" - _, execErr := tx.Exec(updateSQL, recID) + + deleteSQL := "DELETE FROM " + getOutboxName(outbox.svcName) + " WHERE rec_id=?" + _, execErr := tx.Exec(deleteSQL, recID) if execErr != nil { outbox.log().WithError(execErr). WithFields(log.Fields{"delivery_tag": deliveryTag, "relay_id": outbox.ID}). From 52dbb22e78de5c555912ee3c678b8bdf459cb699 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 23 Apr 2019 23:18:07 +0300 Subject: [PATCH 6/8] reenabled index hint when fetching for records --- gbus/tx/mysql/txoutbox.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index eaa9bf7..b7c5cf5 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -219,7 +219,7 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { } func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) { - selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" + selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" return tx.Query(selectSQL) } From 76caed4adacee2b99db7589ef619bc55ace4c74b Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Wed, 24 Apr 2019 11:41:36 +0300 Subject: [PATCH 7/8] fixing the way delivery tags get tracked so we can properly delete records from outbox --- gbus/tx/mysql/txoutbox.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index b7c5cf5..7c39bfb 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -247,8 +247,7 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, successfulDeliveries := make(map[uint64]int) failedDeliveries := make([]int, 0) - outbox.gl.Lock() - defer outbox.gl.Unlock() + for rows.Next() { var ( recID int @@ -280,8 +279,6 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, failedDeliveries = append(failedDeliveries, recID) } else { successfulDeliveries[deliveryTag] = recID - outbox.recordsPendingConfirms[deliveryTag] = recID - } } err := rows.Close() @@ -307,7 +304,15 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, } if cmtErr := tx.Commit(); cmtErr != nil { outbox.log().WithError(cmtErr).Error("Error committing outbox transaction") + } else { + //only after the tx has commited successfully add the recordids so they can be picked up by confirms + outbox.gl.Lock() + defer outbox.gl.Unlock() + for deliveryTag, recID := range successfulDeliveries { + outbox.recordsPendingConfirms[deliveryTag] = recID + } } + return nil } From 3dc88d3b63c960724b16ae8ac895933547fa3883 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Wed, 24 Apr 2019 12:26:01 +0300 Subject: [PATCH 8/8] changed the log level when extracting opentrace headers from message fails When the extraction fails it prints out an error however this is due to the fact that the passed in message deaders do not contain the needed opentrace headers for extraction to work. This is expected as message producers do not necessarily pass in these headers so log level changed from error to debug in order not to litter the log --- gbus/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gbus/worker.go b/gbus/worker.go index a13aceb..4c74667 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -257,7 +257,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { spCtx, err := amqptracer.Extract(delivery.Headers) if err != nil { - worker.log().WithError(err).Error("could not extract SpanContext from headers") + worker.log().WithError(err).Debug("could not extract SpanContext from headers") } else { spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx)) }