Skip to content

Commit

Permalink
Merge pull request #902 from nats-io/fix_ft_sql_tick_update
Browse files Browse the repository at this point in the history
[FIXED] SQLStore: tick column of StoreLock table may not be updated
  • Loading branch information
kozlovic committed Aug 9, 2019
2 parents fb69826 + e966150 commit 1e3cdc2
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 4 deletions.
21 changes: 17 additions & 4 deletions stores/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,13 +460,26 @@ func (s *SQLStore) GetExclusiveLock() (bool, error) {
for i := 0; i < sqlLockLostCount; i++ {
time.Sleep(time.Duration(1.5 * float64(sqlLockUpdateInterval)))
hasLock, id, tick, err = s.acquireDBLock(false)
if hasLock || err != nil || id != prevID || tick != prevTick {
return hasLock, err
// If the current lock owner is closed, the lockID is being
// cleaned from the entry in the table, which could allow the
// call above to acquired the lock even though the "steal"
// boolean is false. If we got the lock, ensure we start the
// "tick" update process.
if hasLock {
break
}
// If we got an error or ID and/or tick has changed, simply
// return that we don't have the lock.
if err != nil || id != prevID || tick != prevTick {
return false, err
}
prevTick = tick
}
// Try to steal.
hasLock, _, _, err = s.acquireDBLock(true)
if !hasLock {
// Still did not get the lock but there was no update to the
// lock table, so try to steal.
hasLock, _, _, err = s.acquireDBLock(true)
}
}
if hasLock {
// Success. Keep track that we own the lock so we can clear
Expand Down
65 changes: 65 additions & 0 deletions stores/sqlstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1856,3 +1856,68 @@ func TestSQLMsgCacheAutoFlush(t *testing.T) {
t.Fatalf("Expected some messages, got none")
}
}

func TestSQLGetExclusiveLockRace(t *testing.T) {
if !doSQL {
t.SkipNow()
}

cleanupSQLDatastore(t)
defer cleanupSQLDatastore(t)

sqlLockUpdateInterval = 250 * time.Millisecond
sqlLockLostCount = 100
sqlNoPanic = true
defer func() {
sqlLockUpdateInterval = sqlDefaultLockUpdateInterval
sqlLockLostCount = sqlDefaultLockLostCount
sqlNoPanic = false
}()

s1 := createDefaultSQLStore(t)
defer s1.Close()

hasLock, err := s1.GetExclusiveLock()
if !hasLock || err != nil {
t.Fatalf("Expected lock to be acquired, got %v, %v", hasLock, err)
}

dl := &captureErrAndFatalLogger{}
s2, err := NewSQLStore(dl, testSQLDriver, testSQLSource, nil, SQLNoCaching(true))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s2.Close()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s2.GetExclusiveLock()
wg.Done()
}()

time.Sleep(200 * time.Millisecond)
s1.Close()

wg.Wait()

db := getDBConnection(t)
defer db.Close()

// Now check that s2's is updating the lock table
prevTick := uint64(0)
for i := 0; i < 2; i++ {
r := db.QueryRow("SELECT tick FROM StoreLock")
tick := uint64(0)
if err := r.Scan(&tick); err != nil {
t.Fatalf("Error on Scan: %v", err)
}
if prevTick > 0 && tick == prevTick {
t.Fatalf("Tick was not updated (%v)", prevTick)
}
prevTick = tick
if i == 0 {
time.Sleep(3 * sqlLockUpdateInterval)
}
}
}

0 comments on commit 1e3cdc2

Please sign in to comment.