Skip to content

Commit

Permalink
Fix sporadic duplicate key errors in mysql queue implementation (#2802)
Browse files Browse the repository at this point in the history
* More informative failures in queuePersistenceTest

* Fix sporadic duplicate key errors in mysql queue

* Fix lint
  • Loading branch information
nagl-temporal committed May 9, 2022
1 parent aa2c98e commit 9c9b4c9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
18 changes: 10 additions & 8 deletions common/persistence/persistence-tests/queuePersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *QueuePersistenceSuite) TestNamespaceReplicationQueue() {
numMessages := 100
concurrentSenders := 10

messageChan := make(chan interface{})
messageChan := make(chan *replicationspb.ReplicationTask)

taskType := enumsspb.REPLICATION_TASK_TYPE_NAMESPACE_TASK
go func() {
Expand All @@ -96,13 +96,14 @@ func (s *QueuePersistenceSuite) TestNamespaceReplicationQueue() {
wg.Add(concurrentSenders)

for i := 0; i < concurrentSenders; i++ {
go func() {
go func(senderNum int) {
defer wg.Done()
for message := range messageChan {
err := s.Publish(s.ctx, message)
s.Nil(err, "Enqueue message failed.")
id := message.Attributes.(*replicationspb.ReplicationTask_NamespaceTaskAttributes).NamespaceTaskAttributes.Id
s.Nil(err, "Enqueue message failed when sender %d tried to send %s", senderNum, id)
}
}()
}(i)
}

wg.Wait()
Expand Down Expand Up @@ -151,7 +152,7 @@ func (s *QueuePersistenceSuite) TestNamespaceReplicationDLQ() {
numMessages := 100
concurrentSenders := 10

messageChan := make(chan interface{})
messageChan := make(chan *replicationspb.ReplicationTask)

taskType := enumsspb.REPLICATION_TASK_TYPE_NAMESPACE_TASK
go func() {
Expand All @@ -172,13 +173,14 @@ func (s *QueuePersistenceSuite) TestNamespaceReplicationDLQ() {
wg.Add(concurrentSenders)

for i := 0; i < concurrentSenders; i++ {
go func() {
go func(senderNum int) {
defer wg.Done()
for message := range messageChan {
err := s.PublishToNamespaceDLQ(s.ctx, message)
s.Nil(err, "Enqueue message failed.")
id := message.Attributes.(*replicationspb.ReplicationTask_NamespaceTaskAttributes).NamespaceTaskAttributes.Id
s.Nil(err, "Enqueue message failed when sender %d tried to send %s", senderNum, id)
}
}()
}(i)
}

wg.Wait()
Expand Down
14 changes: 11 additions & 3 deletions common/persistence/sql/sqlplugin/mysql/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ const (
templateDeleteMessageQuery = `DELETE FROM queue WHERE queue_type = ? and message_id = ?`
templateRangeDeleteMessagesQuery = `DELETE FROM queue WHERE queue_type = ? and message_id > ? and message_id <= ?`

templateGetLastMessageIDQuery = `SELECT message_id FROM queue WHERE message_id >= (SELECT message_id FROM queue WHERE queue_type=? ORDER BY message_id DESC LIMIT 1) FOR UPDATE`
// Note that even though this query takes a range lock that serializes all writes, it will return multiple rows
// whenever more than one enqueue-er blocks. This is why we max().
templateGetLastMessageIDQuery = `SELECT MAX(message_id) FROM queue WHERE message_id >= (SELECT message_id FROM queue WHERE queue_type=? ORDER BY message_id DESC LIMIT 1) FOR UPDATE`

templateCreateQueueMetadataQuery = `INSERT INTO queue_metadata (queue_type, data, data_encoding, version) VALUES(:queue_type, :data, :data_encoding, :version)`
templateUpdateQueueMetadataQuery = `UPDATE queue_metadata SET data = :data, data_encoding = :data_encoding, version= :version+1 WHERE queue_type = :queue_type and version = :version`
Expand Down Expand Up @@ -118,13 +120,19 @@ func (mdb *db) GetLastEnqueuedMessageIDForUpdate(
ctx context.Context,
queueType persistence.QueueType,
) (int64, error) {
var lastMessageID int64
var lastMessageID *int64
err := mdb.conn.GetContext(ctx,
&lastMessageID,
templateGetLastMessageIDQuery,
queueType,
)
return lastMessageID, err
if lastMessageID == nil {
// The layer of code above us expects ErrNoRows when the queue is empty. MAX() yields
// null when the queue is empty, so we need to turn that into the correct error.
return 0, sql.ErrNoRows
} else {
return *lastMessageID, err
}
}

func (mdb *db) InsertIntoQueueMetadata(
Expand Down

0 comments on commit 9c9b4c9

Please sign in to comment.