Skip to content

Commit

Permalink
tests: Clean up dsync package (#14415)
Browse files Browse the repository at this point in the history
Add non-constant timeouts to dsync package.

Reduce test runtime by minutes. Hopefully not too aggressive.
  • Loading branch information
klauspost committed Mar 1, 2022
1 parent cc46a99 commit b030ef1
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 102 deletions.
1 change: 1 addition & 0 deletions cmd/namespace-lock.go
Expand Up @@ -225,6 +225,7 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume
if n.isDistErasure {
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
GetLockers: lockers,
Timeouts: dsync.DefaultTimeouts,
}, pathsJoinPrefix(volume, paths...)...)
return &distLockInstance{drwmutex, opsID}
}
Expand Down
99 changes: 63 additions & 36 deletions internal/dsync/drwmutex.go
Expand Up @@ -43,32 +43,61 @@ func log(format string, data ...interface{}) {
}
}

// dRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before.
const drwMutexAcquireTimeout = 1 * time.Second // 1 second.
const (
// dRWMutexAcquireTimeout - default tolerance limit to wait for lock acquisition before.
drwMutexAcquireTimeout = 1 * time.Second // 1 second.

// dRWMutexRefreshTimeout - default timeout for the refresh call
drwMutexRefreshCallTimeout = 5 * time.Second

// dRWMutexUnlockTimeout - default timeout for the unlock call
drwMutexUnlockCallTimeout = 30 * time.Second

// dRWMutexForceUnlockTimeout - default timeout for the unlock call
drwMutexForceUnlockCallTimeout = 30 * time.Second

// dRWMutexRefreshTimeout - timeout for the refresh call
const drwMutexRefreshCallTimeout = 5 * time.Second
// dRWMutexRefreshInterval - default the interval between two refresh calls
drwMutexRefreshInterval = 10 * time.Second

// dRWMutexUnlockTimeout - timeout for the unlock call
const drwMutexUnlockCallTimeout = 30 * time.Second
lockRetryInterval = 1 * time.Second

drwMutexInfinite = 1<<63 - 1
)

// dRWMutexForceUnlockTimeout - timeout for the unlock call
const drwMutexForceUnlockCallTimeout = 30 * time.Second
// Timeouts are timeouts for specific operations.
type Timeouts struct {
// Acquire - tolerance limit to wait for lock acquisition before.
Acquire time.Duration

// dRWMutexRefreshInterval - the interval between two refresh calls
const drwMutexRefreshInterval = 10 * time.Second
// RefreshCall - timeout for the refresh call
RefreshCall time.Duration

const drwMutexInfinite = 1<<63 - 1
// UnlockCall - timeout for the unlock call
UnlockCall time.Duration

// ForceUnlockCall - timeout for the force unlock call
ForceUnlockCall time.Duration
}

// DefaultTimeouts contains default timeouts.
var DefaultTimeouts = Timeouts{
Acquire: drwMutexAcquireTimeout,
RefreshCall: drwMutexUnlockCallTimeout,
UnlockCall: drwMutexRefreshCallTimeout,
ForceUnlockCall: drwMutexForceUnlockCallTimeout,
}

// A DRWMutex is a distributed mutual exclusion lock.
type DRWMutex struct {
Names []string
writeLocks []string // Array of nodes that granted a write lock
readLocks []string // Array of array of nodes that granted reader locks
rng *rand.Rand
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
clnt *Dsync
cancelRefresh context.CancelFunc
Names []string
writeLocks []string // Array of nodes that granted a write lock
readLocks []string // Array of array of nodes that granted reader locks
rng *rand.Rand
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
clnt *Dsync
cancelRefresh context.CancelFunc
refreshInterval time.Duration
lockRetryInterval time.Duration
}

// Granted - represents a structure of a granted lock.
Expand All @@ -90,11 +119,13 @@ func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex {
restClnts, _ := clnt.GetLockers()
sort.Strings(names)
return &DRWMutex{
writeLocks: make([]string, len(restClnts)),
readLocks: make([]string, len(restClnts)),
Names: names,
clnt: clnt,
rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
writeLocks: make([]string, len(restClnts)),
readLocks: make([]string, len(restClnts)),
Names: names,
clnt: clnt,
rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
refreshInterval: drwMutexRefreshInterval,
lockRetryInterval: lockRetryInterval,
}
}

Expand Down Expand Up @@ -146,10 +177,6 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, cancel context.CancelFunc, id,
return dm.lockBlocking(ctx, cancel, id, source, isReadLock, opts)
}

const (
lockRetryInterval = 1 * time.Second
)

// lockBlocking will try to acquire either a read or a write lock
//
// The function will loop using a built-in timing randomized back-off
Expand Down Expand Up @@ -209,7 +236,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i
return locked
}

time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
}
}
}
Expand All @@ -224,15 +251,15 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
go func() {
defer cancel()

refreshTimer := time.NewTimer(drwMutexRefreshInterval)
refreshTimer := time.NewTimer(dm.refreshInterval)
defer refreshTimer.Stop()

for {
select {
case <-ctx.Done():
return
case <-refreshTimer.C:
refreshTimer.Reset(drwMutexRefreshInterval)
refreshTimer.Reset(dm.refreshInterval)

noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum)
if err == nil && noQuorum {
Expand All @@ -250,7 +277,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
}

func forceUnlock(ctx context.Context, ds *Dsync, id string) {
ctx, cancel := context.WithTimeout(ctx, drwMutexForceUnlockCallTimeout)
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.ForceUnlockCall)
defer cancel()

restClnts, _ := ds.GetLockers()
Expand Down Expand Up @@ -300,7 +327,7 @@ func refreshLock(ctx context.Context, ds *Dsync, id, source string, quorum int)
return
}

ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout)
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.RefreshCall)
defer cancel()

refreshed, err := c.Refresh(ctx, args)
Expand Down Expand Up @@ -379,7 +406,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
}

// Combined timeout for the lock attempt.
ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout)
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.Acquire)
defer cancel()
for index, c := range restClnts {
wg.Add(1)
Expand Down Expand Up @@ -573,7 +600,7 @@ func (dm *DRWMutex) Unlock() {

isReadLock := false
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
}
}

Expand Down Expand Up @@ -614,7 +641,7 @@ func (dm *DRWMutex) RUnlock() {

isReadLock := true
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
}
}

Expand All @@ -635,7 +662,7 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo
Resources: names,
}

ctx, cancel := context.WithTimeout(context.Background(), drwMutexUnlockCallTimeout)
ctx, cancel := context.WithTimeout(context.Background(), ds.Timeouts.UnlockCall)
defer cancel()

if isReadLock {
Expand Down
84 changes: 44 additions & 40 deletions internal/dsync/drwmutex_test.go
Expand Up @@ -47,13 +47,13 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
// fmt.Println("2nd read lock acquired, waiting...")

go func() {
time.Sleep(2 * time.Second)
time.Sleep(2 * testDrwMutexAcquireTimeout)
drwm.RUnlock()
// fmt.Println("1st read lock released, waiting...")
}()

go func() {
time.Sleep(3 * time.Second)
time.Sleep(3 * testDrwMutexAcquireTimeout)
drwm.RUnlock()
// fmt.Println("2nd read lock released, waiting...")
}()
Expand All @@ -63,7 +63,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
locked = drwm.GetLock(ctx3, cancel3, id, source, Options{Timeout: duration})
if locked {
// fmt.Println("Write lock acquired, waiting...")
time.Sleep(time.Second)
time.Sleep(testDrwMutexAcquireTimeout)

drwm.Unlock()
}
Expand All @@ -72,7 +72,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
}

func TestSimpleWriteLockAcquired(t *testing.T) {
locked := testSimpleWriteLock(t, 5*time.Second)
locked := testSimpleWriteLock(t, 10*testDrwMutexAcquireTimeout)

expected := true
if locked != expected {
Expand All @@ -81,7 +81,7 @@ func TestSimpleWriteLockAcquired(t *testing.T) {
}

func TestSimpleWriteLockTimedOut(t *testing.T) {
locked := testSimpleWriteLock(t, time.Second)
locked := testSimpleWriteLock(t, testDrwMutexAcquireTimeout)

expected := false
if locked != expected {
Expand All @@ -99,7 +99,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
}

go func() {
time.Sleep(2 * time.Second)
time.Sleep(3 * testDrwMutexAcquireTimeout)
drwm.Unlock()
// fmt.Println("Initial write lock released, waiting...")
}()
Expand All @@ -109,7 +109,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
locked = drwm.GetLock(ctx2, cancel2, id, source, Options{Timeout: duration})
if locked {
// fmt.Println("2nd write lock acquired, waiting...")
time.Sleep(time.Second)
time.Sleep(testDrwMutexAcquireTimeout)

drwm.Unlock()
}
Expand All @@ -118,7 +118,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
}

func TestDualWriteLockAcquired(t *testing.T) {
locked := testDualWriteLock(t, 5*time.Second)
locked := testDualWriteLock(t, 10*testDrwMutexAcquireTimeout)

expected := true
if locked != expected {
Expand All @@ -127,7 +127,7 @@ func TestDualWriteLockAcquired(t *testing.T) {
}

func TestDualWriteLockTimedOut(t *testing.T) {
locked := testDualWriteLock(t, time.Second)
locked := testDualWriteLock(t, testDrwMutexAcquireTimeout)

expected := false
if locked != expected {
Expand Down Expand Up @@ -214,25 +214,27 @@ func writer(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool)
}

// Borrowed from rwmutex_test.go
func HammerRWMutex(gomaxprocs, numReaders, numIterations int) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
rwm := NewDRWMutex(ds, "test")
cdone := make(chan bool)
go writer(rwm, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
go writer(rwm, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
func hammerRWMutex(t *testing.T, gomaxprocs, numReaders, numIterations int) {
t.Run(fmt.Sprintf("%d-%d-%d", gomaxprocs, numReaders, numIterations), func(t *testing.T) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
rwm := NewDRWMutex(ds, "test")
cdone := make(chan bool)
go writer(rwm, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
go writer(rwm, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
})
}

// Borrowed from rwmutex_test.go
Expand All @@ -242,16 +244,16 @@ func TestRWMutex(t *testing.T) {
if testing.Short() {
n = 5
}
HammerRWMutex(1, 1, n)
HammerRWMutex(1, 3, n)
HammerRWMutex(1, 10, n)
HammerRWMutex(4, 1, n)
HammerRWMutex(4, 3, n)
HammerRWMutex(4, 10, n)
HammerRWMutex(10, 1, n)
HammerRWMutex(10, 3, n)
HammerRWMutex(10, 10, n)
HammerRWMutex(10, 5, n)
hammerRWMutex(t, 1, 1, n)
hammerRWMutex(t, 1, 3, n)
hammerRWMutex(t, 1, 10, n)
hammerRWMutex(t, 4, 1, n)
hammerRWMutex(t, 4, 3, n)
hammerRWMutex(t, 4, 10, n)
hammerRWMutex(t, 10, 1, n)
hammerRWMutex(t, 10, 3, n)
hammerRWMutex(t, 10, 10, n)
hammerRWMutex(t, 10, 5, n)
}

// Borrowed from rwmutex_test.go
Expand All @@ -267,12 +269,13 @@ func TestUnlockPanic(t *testing.T) {

// Borrowed from rwmutex_test.go
func TestUnlockPanic2(t *testing.T) {
mu := NewDRWMutex(ds, "test-unlock-panic-2")
defer func() {
if recover() == nil {
t.Fatalf("unlock of unlocked RWMutex did not panic")
}
mu.RUnlock() // Unlock, so -test.count > 1 works
}()
mu := NewDRWMutex(ds, "test-unlock-panic-2")
mu.RLock(id, source)
mu.Unlock()
}
Expand All @@ -290,12 +293,13 @@ func TestRUnlockPanic(t *testing.T) {

// Borrowed from rwmutex_test.go
func TestRUnlockPanic2(t *testing.T) {
mu := NewDRWMutex(ds, "test-runlock-panic-2")
defer func() {
if recover() == nil {
t.Fatalf("read unlock of unlocked RWMutex did not panic")
}
mu.Unlock() // Unlock, so -test.count > 1 works
}()
mu := NewDRWMutex(ds, "test-runlock-panic-2")
mu.Lock(id, source)
mu.RUnlock()
}
Expand Down
3 changes: 3 additions & 0 deletions internal/dsync/dsync.go
Expand Up @@ -22,4 +22,7 @@ package dsync
type Dsync struct {
// List of rest client objects, one per lock server.
GetLockers func() ([]NetLocker, string)

// Timeouts to apply.
Timeouts Timeouts
}

0 comments on commit b030ef1

Please sign in to comment.