Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 76 additions & 59 deletions gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/rs/xid"
Expand All @@ -18,26 +19,28 @@ import (
var (
pending int
//waitingConfirm = 1
confirmed = 2

//TODO:get these values from configuration
maxPageSize = 500
maxDeliveryAttempts = 50
sendInterval = time.Second
cleanupInterval = time.Second * 30
scavengeInterval = time.Second * 60

scavengeInterval = time.Second * 60
ackers = 10
)

//TxOutbox is a mysql based transactional outbox
type TxOutbox struct {
svcName string
txProv gbus.TxProvider
purgeOnStartup bool
ID string
amqpOutbox *gbus.AMQPOutbox

ack chan uint64
nack chan uint64
exit chan bool
svcName string
txProv gbus.TxProvider
purgeOnStartup bool
ID string
amqpOutbox *gbus.AMQPOutbox
recordsPendingConfirms map[uint64]int
ack chan uint64
nack chan uint64
exit chan bool
gl *sync.Mutex
}

func (outbox *TxOutbox) log() *log.Entry {
Expand All @@ -46,7 +49,8 @@ func (outbox *TxOutbox) log() *log.Entry {

//Start starts the transactional outbox that is used to send messages in sync with domain object change
func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error {

outbox.gl = &sync.Mutex{}
outbox.recordsPendingConfirms = make(map[uint64]int)
tx, e := outbox.txProv.New()
if e != nil {
panic(fmt.Sprintf("passed in transaction provider failed with the following error\n%s", e))
Expand Down Expand Up @@ -75,6 +79,10 @@ func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error {
outbox.amqpOutbox.NotifyConfirm(outbox.ack, outbox.nack)

go outbox.processOutbox()
for i := 0; i < ackers; i++ {
go outbox.ackRec()
}

return nil
}

Expand Down Expand Up @@ -124,16 +132,34 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool) *TxO
txProv: txProv,
purgeOnStartup: purgeOnStartup,
ID: xid.New().String(),
ack: make(chan uint64, 10000),
nack: make(chan uint64, 10000),
ack: make(chan uint64, 1000000),
nack: make(chan uint64, 1000000),
exit: make(chan bool)}
return txo
}

func (outbox *TxOutbox) ackRec() {
for {
select {
case <-outbox.exit:
return
case ack := <-outbox.ack:
outbox.log().WithField("channel_len", len(outbox.ack)).Debug("length of ack channel")
if err := outbox.updateAckedRecord(ack); err != nil {
outbox.log().WithError(err).WithField("delivery_tag", ack).Error("failed to update delivery tag")
}
case nack := <-outbox.nack:
outbox.log().WithField("deliver_tag", nack).Info("nack received for delivery tag")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't do Nack anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These nacks are the broker sending a negative confirm for the publication so we do not handle them and let the transactional outbox re publish the message the next time it scans for messages that were sent but not yet confirmed (that is done with the scavenging channel/goroutine)

outbox.log().WithField("channel_len", len(outbox.nack)).Debug("length of nack channel")

}
}
}

func (outbox *TxOutbox) processOutbox() {

send := time.NewTicker(sendInterval).C
cleanUp := time.NewTicker(cleanupInterval).C
// cleanUp := time.NewTicker(cleanupInterval).C
scavenge := time.NewTicker(scavengeInterval).C

for {
Expand All @@ -142,57 +168,21 @@ func (outbox *TxOutbox) processOutbox() {
return
//TODO:get time duration from configuration
case <-send:

err := outbox.sendMessages(outbox.getMessageRecords)
if err != nil {
outbox.log().WithError(err).Error("failed to send messages from outbox")
}
case <-cleanUp:
err := outbox.deleteCompletedRecords()
if err != nil {
outbox.log().WithError(err).Error("failed to delete completed records")
}

case <-scavenge:
err := outbox.sendMessages(outbox.scavengeOrphanedRecords)
if err != nil {
outbox.log().WithError(err).Error("failed to scavenge records")
}
case ack := <-outbox.ack:
if err := outbox.updateAckedRecord(ack); err != nil {
outbox.log().WithError(err).WithField("delivery_tag", ack).Error("failed to update delivery tag")
}
case nack := <-outbox.nack:
outbox.log().WithField("deliver_tag", nack).Info("nack received for delivery tag")
}
}
}

func (outbox *TxOutbox) deleteCompletedRecords() error {

tx, txErr := outbox.txProv.New()
if txErr != nil {
return txErr
}
deleteSQL := "DELETE FROM " + getOutboxName(outbox.svcName) + " WHERE status=?"
result, execErr := tx.Exec(deleteSQL, confirmed)
if execErr != nil {
outbox.log().WithError(execErr).Error("failed to delete processed records")

err := tx.Rollback()
if err != nil {
outbox.log().WithError(err).Error("could not rollback the transaction for deleting completed records")
}
return execErr
}

commitErr := tx.Commit()
records, ree := result.RowsAffected()
if commitErr == nil && ree == nil && records > 0 {
outbox.log().WithField("records", records).Info("cleaned records from outbox")
}

return commitErr
}

func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
tx, txErr := outbox.txProv.New()
if txErr != nil {
Expand All @@ -201,8 +191,21 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
}
outbox.log().WithField("delivery_tag", deliveryTag).Info("ack received for delivery tag")

updateSQL := "UPDATE " + getOutboxName(outbox.svcName) + " SET status=? WHERE delivery_tag=? AND relay_id=?"
_, execErr := tx.Exec(updateSQL, confirmed, deliveryTag, outbox.ID)
outbox.gl.Lock()
recID := outbox.recordsPendingConfirms[deliveryTag]
outbox.gl.Unlock()
/*
since the messages get sent to rabbitmq and then the outbox table gets updated with the deilvery tag for teh record
it may be that we recived a acked deliveryTag that is not yet registered in the outbox table.
in that case we just place the deliveryTag back in the ack channel so it can be picked up and re processed later
we place it in the channel using a new goroutine so to not deadlock if there is only a single goroutine draining the ack channel
*/
if recID == 0 {
go func() { outbox.ack <- deliveryTag }()
}

deleteSQL := "DELETE FROM " + getOutboxName(outbox.svcName) + " WHERE rec_id=?"
_, execErr := tx.Exec(deleteSQL, recID)
if execErr != nil {
outbox.log().WithError(execErr).
WithFields(log.Fields{"delivery_tag": deliveryTag, "relay_id": outbox.ID}).
Expand All @@ -216,7 +219,7 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
}

func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) {
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
return tx.Query(selectSQL)
}

Expand Down Expand Up @@ -301,7 +304,15 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows,
}
if cmtErr := tx.Commit(); cmtErr != nil {
outbox.log().WithError(cmtErr).Error("Error committing outbox transaction")
} else {
//only after the tx has commited successfully add the recordids so they can be picked up by confirms
outbox.gl.Lock()
defer outbox.gl.Unlock()
for deliveryTag, recID := range successfulDeliveries {
outbox.recordsPendingConfirms[deliveryTag] = recID
}
}

return nil
}

Expand Down Expand Up @@ -332,7 +343,8 @@ func (outbox *TxOutbox) ensureSchema(tx *sql.Tx, svcName string) error {
delivery_tag bigint(20) NOT NULL,
delivery_attempts int NOT NULL DEFAULT 0,
insert_date timestamp DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(rec_id))`
PRIMARY KEY(rec_id),
INDEX status_delivery (rec_id, status, delivery_attempts))`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you change the order of the index to

Suggested change
INDEX status_delivery (rec_id, status, delivery_attempts))`
INDEX status_delivery (status, delivery_attempts, rec_id))`

you would not need to git the USE INDEX (status_delivery) hint in the query.


_, createErr := tx.Exec(createTablesSQL)

Expand Down Expand Up @@ -363,7 +375,12 @@ func (outbox *TxOutbox) migrate0_9To1_0(tx *sql.Tx, svcName string) error {
alter := `ALTER TABLE ` + tblName + ` CHANGE COLUMN delivery_attemtps delivery_attempts int NOT NULL DEFAULT 0;`
_, execErr := tx.Exec(alter)
if execErr != nil {
outbox.log().WithField("sql_err", execErr).Info("renaming column")
outbox.log().WithField("sql_err", execErr).Info("migration:renaming column")
}
addIndex := `ALTER TABLE ` + tblName + ` ADD INDEX status_delivery (rec_id, status, delivery_attempts);`
_, indexErr := tx.Exec(addIndex)
if indexErr != nil {
outbox.log().WithField("sql_err", execErr).Info("migration:adding index column")
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {

spCtx, err := amqptracer.Extract(delivery.Headers)
if err != nil {
worker.log().WithError(err).Error("could not extract SpanContext from headers")
worker.log().WithError(err).Debug("could not extract SpanContext from headers")
} else {
spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx))
}
Expand Down