diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index d881c7c..7c39bfb 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" "github.com/rs/xid" @@ -18,26 +19,28 @@ import ( var ( pending int //waitingConfirm = 1 - confirmed = 2 + //TODO:get these values from configuration maxPageSize = 500 maxDeliveryAttempts = 50 sendInterval = time.Second - cleanupInterval = time.Second * 30 - scavengeInterval = time.Second * 60 + + scavengeInterval = time.Second * 60 + ackers = 10 ) //TxOutbox is a mysql based transactional outbox type TxOutbox struct { - svcName string - txProv gbus.TxProvider - purgeOnStartup bool - ID string - amqpOutbox *gbus.AMQPOutbox - - ack chan uint64 - nack chan uint64 - exit chan bool + svcName string + txProv gbus.TxProvider + purgeOnStartup bool + ID string + amqpOutbox *gbus.AMQPOutbox + recordsPendingConfirms map[uint64]int + ack chan uint64 + nack chan uint64 + exit chan bool + gl *sync.Mutex } func (outbox *TxOutbox) log() *log.Entry { @@ -46,7 +49,8 @@ func (outbox *TxOutbox) log() *log.Entry { //Start starts the transactional outbox that is used to send messages in sync with domain object change func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error { - + outbox.gl = &sync.Mutex{} + outbox.recordsPendingConfirms = make(map[uint64]int) tx, e := outbox.txProv.New() if e != nil { panic(fmt.Sprintf("passed in transaction provider failed with the following error\n%s", e)) @@ -75,6 +79,10 @@ func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error { outbox.amqpOutbox.NotifyConfirm(outbox.ack, outbox.nack) go outbox.processOutbox() + for i := 0; i < ackers; i++ { + go outbox.ackRec() + } + return nil } @@ -124,16 +132,34 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool) *TxO txProv: txProv, purgeOnStartup: purgeOnStartup, ID: xid.New().String(), - ack: make(chan uint64, 10000), - nack: make(chan uint64, 10000), + ack: make(chan uint64, 1000000), + nack: make(chan uint64, 1000000), exit: make(chan bool)} return txo } +func (outbox *TxOutbox) ackRec() { + for { + select { + case <-outbox.exit: + return + case ack := <-outbox.ack: + outbox.log().WithField("channel_len", len(outbox.ack)).Debug("length of ack channel") + if err := outbox.updateAckedRecord(ack); err != nil { + outbox.log().WithError(err).WithField("delivery_tag", ack).Error("failed to update delivery tag") + } + case nack := <-outbox.nack: + outbox.log().WithField("deliver_tag", nack).Info("nack received for delivery tag") + outbox.log().WithField("channel_len", len(outbox.nack)).Debug("length of nack channel") + + } + } +} + func (outbox *TxOutbox) processOutbox() { send := time.NewTicker(sendInterval).C - cleanUp := time.NewTicker(cleanupInterval).C + // cleanUp := time.NewTicker(cleanupInterval).C scavenge := time.NewTicker(scavengeInterval).C for { @@ -142,57 +168,21 @@ func (outbox *TxOutbox) processOutbox() { return //TODO:get time duration from configuration case <-send: + err := outbox.sendMessages(outbox.getMessageRecords) if err != nil { outbox.log().WithError(err).Error("failed to send messages from outbox") } - case <-cleanUp: - err := outbox.deleteCompletedRecords() - if err != nil { - outbox.log().WithError(err).Error("failed to delete completed records") - } + case <-scavenge: err := outbox.sendMessages(outbox.scavengeOrphanedRecords) if err != nil { outbox.log().WithError(err).Error("failed to scavenge records") } - case ack := <-outbox.ack: - if err := outbox.updateAckedRecord(ack); err != nil { - outbox.log().WithError(err).WithField("delivery_tag", ack).Error("failed to update delivery tag") - } - case nack := <-outbox.nack: - outbox.log().WithField("deliver_tag", nack).Info("nack received for delivery tag") } } } -func (outbox *TxOutbox) deleteCompletedRecords() error { - - tx, txErr := outbox.txProv.New() - if txErr != nil { - return txErr - } - deleteSQL := "DELETE FROM " + getOutboxName(outbox.svcName) + " WHERE status=?" - result, execErr := tx.Exec(deleteSQL, confirmed) - if execErr != nil { - outbox.log().WithError(execErr).Error("failed to delete processed records") - - err := tx.Rollback() - if err != nil { - outbox.log().WithError(err).Error("could not rollback the transaction for deleting completed records") - } - return execErr - } - - commitErr := tx.Commit() - records, ree := result.RowsAffected() - if commitErr == nil && ree == nil && records > 0 { - outbox.log().WithField("records", records).Info("cleaned records from outbox") - } - - return commitErr -} - func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { tx, txErr := outbox.txProv.New() if txErr != nil { @@ -201,8 +191,21 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { } outbox.log().WithField("delivery_tag", deliveryTag).Info("ack received for delivery tag") - updateSQL := "UPDATE " + getOutboxName(outbox.svcName) + " SET status=? WHERE delivery_tag=? AND relay_id=?" - _, execErr := tx.Exec(updateSQL, confirmed, deliveryTag, outbox.ID) + outbox.gl.Lock() + recID := outbox.recordsPendingConfirms[deliveryTag] + outbox.gl.Unlock() + /* + since the messages get sent to rabbitmq and then the outbox table gets updated with the deilvery tag for teh record + it may be that we recived a acked deliveryTag that is not yet registered in the outbox table. + in that case we just place the deliveryTag back in the ack channel so it can be picked up and re processed later + we place it in the channel using a new goroutine so to not deadlock if there is only a single goroutine draining the ack channel + */ + if recID == 0 { + go func() { outbox.ack <- deliveryTag }() + } + + deleteSQL := "DELETE FROM " + getOutboxName(outbox.svcName) + " WHERE rec_id=?" + _, execErr := tx.Exec(deleteSQL, recID) if execErr != nil { outbox.log().WithError(execErr). WithFields(log.Fields{"delivery_tag": deliveryTag, "relay_id": outbox.ID}). @@ -216,7 +219,7 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error { } func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) { - selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" + selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED" return tx.Query(selectSQL) } @@ -301,7 +304,15 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, } if cmtErr := tx.Commit(); cmtErr != nil { outbox.log().WithError(cmtErr).Error("Error committing outbox transaction") + } else { + //only after the tx has commited successfully add the recordids so they can be picked up by confirms + outbox.gl.Lock() + defer outbox.gl.Unlock() + for deliveryTag, recID := range successfulDeliveries { + outbox.recordsPendingConfirms[deliveryTag] = recID + } } + return nil } @@ -332,7 +343,8 @@ func (outbox *TxOutbox) ensureSchema(tx *sql.Tx, svcName string) error { delivery_tag bigint(20) NOT NULL, delivery_attempts int NOT NULL DEFAULT 0, insert_date timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(rec_id))` + PRIMARY KEY(rec_id), + INDEX status_delivery (rec_id, status, delivery_attempts))` _, createErr := tx.Exec(createTablesSQL) @@ -363,7 +375,12 @@ func (outbox *TxOutbox) migrate0_9To1_0(tx *sql.Tx, svcName string) error { alter := `ALTER TABLE ` + tblName + ` CHANGE COLUMN delivery_attemtps delivery_attempts int NOT NULL DEFAULT 0;` _, execErr := tx.Exec(alter) if execErr != nil { - outbox.log().WithField("sql_err", execErr).Info("renaming column") + outbox.log().WithField("sql_err", execErr).Info("migration:renaming column") + } + addIndex := `ALTER TABLE ` + tblName + ` ADD INDEX status_delivery (rec_id, status, delivery_attempts);` + _, indexErr := tx.Exec(addIndex) + if indexErr != nil { + outbox.log().WithField("sql_err", execErr).Info("migration:adding index column") } return nil } diff --git a/gbus/worker.go b/gbus/worker.go index a13aceb..4c74667 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -257,7 +257,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { spCtx, err := amqptracer.Extract(delivery.Headers) if err != nil { - worker.log().WithError(err).Error("could not extract SpanContext from headers") + worker.log().WithError(err).Debug("could not extract SpanContext from headers") } else { spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx)) }