Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Clean up dsync package #14415

Merged
merged 3 commits into from Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/namespace-lock.go
Expand Up @@ -225,6 +225,7 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume
if n.isDistErasure {
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
GetLockers: lockers,
Timeouts: dsync.DefaultTimeouts,
}, pathsJoinPrefix(volume, paths...)...)
return &distLockInstance{drwmutex, opsID}
}
Expand Down
99 changes: 63 additions & 36 deletions internal/dsync/drwmutex.go
Expand Up @@ -43,32 +43,61 @@ func log(format string, data ...interface{}) {
}
}

// dRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before.
const drwMutexAcquireTimeout = 1 * time.Second // 1 second.
const (
// dRWMutexAcquireTimeout - default tolerance limit to wait for lock acquisition before.
drwMutexAcquireTimeout = 1 * time.Second // 1 second.

// dRWMutexRefreshTimeout - default timeout for the refresh call
drwMutexRefreshCallTimeout = 5 * time.Second

// dRWMutexUnlockTimeout - default timeout for the unlock call
drwMutexUnlockCallTimeout = 30 * time.Second

// dRWMutexForceUnlockTimeout - default timeout for the unlock call
drwMutexForceUnlockCallTimeout = 30 * time.Second

// dRWMutexRefreshTimeout - timeout for the refresh call
const drwMutexRefreshCallTimeout = 5 * time.Second
// dRWMutexRefreshInterval - default the interval between two refresh calls
drwMutexRefreshInterval = 10 * time.Second

// dRWMutexUnlockTimeout - timeout for the unlock call
const drwMutexUnlockCallTimeout = 30 * time.Second
lockRetryInterval = 1 * time.Second

drwMutexInfinite = 1<<63 - 1
)

// dRWMutexForceUnlockTimeout - timeout for the unlock call
const drwMutexForceUnlockCallTimeout = 30 * time.Second
// Timeouts are timeouts for specific operations.
type Timeouts struct {
// Acquire - tolerance limit to wait for lock acquisition before.
Acquire time.Duration

// dRWMutexRefreshInterval - the interval between two refresh calls
const drwMutexRefreshInterval = 10 * time.Second
// RefreshCall - timeout for the refresh call
RefreshCall time.Duration

const drwMutexInfinite = 1<<63 - 1
// UnlockCall - timeout for the unlock call
UnlockCall time.Duration

// ForceUnlockCall - timeout for the force unlock call
ForceUnlockCall time.Duration
}

// DefaultTimeouts contains default timeouts.
var DefaultTimeouts = Timeouts{
Acquire: drwMutexAcquireTimeout,
RefreshCall: drwMutexUnlockCallTimeout,
UnlockCall: drwMutexRefreshCallTimeout,
ForceUnlockCall: drwMutexForceUnlockCallTimeout,
}

// A DRWMutex is a distributed mutual exclusion lock.
type DRWMutex struct {
Names []string
writeLocks []string // Array of nodes that granted a write lock
readLocks []string // Array of array of nodes that granted reader locks
rng *rand.Rand
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
clnt *Dsync
cancelRefresh context.CancelFunc
Names []string
writeLocks []string // Array of nodes that granted a write lock
readLocks []string // Array of array of nodes that granted reader locks
rng *rand.Rand
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
clnt *Dsync
cancelRefresh context.CancelFunc
refreshInterval time.Duration
lockRetryInterval time.Duration
}

// Granted - represents a structure of a granted lock.
Expand All @@ -90,11 +119,13 @@ func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex {
restClnts, _ := clnt.GetLockers()
sort.Strings(names)
return &DRWMutex{
writeLocks: make([]string, len(restClnts)),
readLocks: make([]string, len(restClnts)),
Names: names,
clnt: clnt,
rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
writeLocks: make([]string, len(restClnts)),
readLocks: make([]string, len(restClnts)),
Names: names,
clnt: clnt,
rng: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}),
refreshInterval: drwMutexRefreshInterval,
lockRetryInterval: lockRetryInterval,
}
}

Expand Down Expand Up @@ -146,10 +177,6 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, cancel context.CancelFunc, id,
return dm.lockBlocking(ctx, cancel, id, source, isReadLock, opts)
}

const (
lockRetryInterval = 1 * time.Second
)

// lockBlocking will try to acquire either a read or a write lock
//
// The function will loop using a built-in timing randomized back-off
Expand Down Expand Up @@ -209,7 +236,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i
return locked
}

time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
}
}
}
Expand All @@ -224,15 +251,15 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
go func() {
defer cancel()

refreshTimer := time.NewTimer(drwMutexRefreshInterval)
refreshTimer := time.NewTimer(dm.refreshInterval)
defer refreshTimer.Stop()

for {
select {
case <-ctx.Done():
return
case <-refreshTimer.C:
refreshTimer.Reset(drwMutexRefreshInterval)
refreshTimer.Reset(dm.refreshInterval)

noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum)
if err == nil && noQuorum {
Expand All @@ -250,7 +277,7 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc
}

func forceUnlock(ctx context.Context, ds *Dsync, id string) {
ctx, cancel := context.WithTimeout(ctx, drwMutexForceUnlockCallTimeout)
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.ForceUnlockCall)
defer cancel()

restClnts, _ := ds.GetLockers()
Expand Down Expand Up @@ -300,7 +327,7 @@ func refreshLock(ctx context.Context, ds *Dsync, id, source string, quorum int)
return
}

ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout)
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.RefreshCall)
defer cancel()

refreshed, err := c.Refresh(ctx, args)
Expand Down Expand Up @@ -379,7 +406,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
}

// Combined timeout for the lock attempt.
ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout)
ctx, cancel := context.WithTimeout(ctx, ds.Timeouts.Acquire)
defer cancel()
for index, c := range restClnts {
wg.Add(1)
Expand Down Expand Up @@ -573,7 +600,7 @@ func (dm *DRWMutex) Unlock() {

isReadLock := false
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
}
}

Expand Down Expand Up @@ -614,7 +641,7 @@ func (dm *DRWMutex) RUnlock() {

isReadLock := true
for !releaseAll(dm.clnt, tolerance, owner, &locks, isReadLock, restClnts, dm.Names...) {
time.Sleep(time.Duration(dm.rng.Float64() * float64(lockRetryInterval)))
time.Sleep(time.Duration(dm.rng.Float64() * float64(dm.lockRetryInterval)))
}
}

Expand All @@ -635,7 +662,7 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo
Resources: names,
}

ctx, cancel := context.WithTimeout(context.Background(), drwMutexUnlockCallTimeout)
ctx, cancel := context.WithTimeout(context.Background(), ds.Timeouts.UnlockCall)
defer cancel()

if isReadLock {
Expand Down
84 changes: 44 additions & 40 deletions internal/dsync/drwmutex_test.go
Expand Up @@ -47,13 +47,13 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
// fmt.Println("2nd read lock acquired, waiting...")

go func() {
time.Sleep(2 * time.Second)
time.Sleep(2 * testDrwMutexAcquireTimeout)
drwm.RUnlock()
// fmt.Println("1st read lock released, waiting...")
}()

go func() {
time.Sleep(3 * time.Second)
time.Sleep(3 * testDrwMutexAcquireTimeout)
drwm.RUnlock()
// fmt.Println("2nd read lock released, waiting...")
}()
Expand All @@ -63,7 +63,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
locked = drwm.GetLock(ctx3, cancel3, id, source, Options{Timeout: duration})
if locked {
// fmt.Println("Write lock acquired, waiting...")
time.Sleep(time.Second)
time.Sleep(testDrwMutexAcquireTimeout)

drwm.Unlock()
}
Expand All @@ -72,7 +72,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
}

func TestSimpleWriteLockAcquired(t *testing.T) {
locked := testSimpleWriteLock(t, 5*time.Second)
locked := testSimpleWriteLock(t, 10*testDrwMutexAcquireTimeout)

expected := true
if locked != expected {
Expand All @@ -81,7 +81,7 @@ func TestSimpleWriteLockAcquired(t *testing.T) {
}

func TestSimpleWriteLockTimedOut(t *testing.T) {
locked := testSimpleWriteLock(t, time.Second)
locked := testSimpleWriteLock(t, testDrwMutexAcquireTimeout)

expected := false
if locked != expected {
Expand All @@ -99,7 +99,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
}

go func() {
time.Sleep(2 * time.Second)
time.Sleep(3 * testDrwMutexAcquireTimeout)
drwm.Unlock()
// fmt.Println("Initial write lock released, waiting...")
}()
Expand All @@ -109,7 +109,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
locked = drwm.GetLock(ctx2, cancel2, id, source, Options{Timeout: duration})
if locked {
// fmt.Println("2nd write lock acquired, waiting...")
time.Sleep(time.Second)
time.Sleep(testDrwMutexAcquireTimeout)

drwm.Unlock()
}
Expand All @@ -118,7 +118,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
}

func TestDualWriteLockAcquired(t *testing.T) {
locked := testDualWriteLock(t, 5*time.Second)
locked := testDualWriteLock(t, 10*testDrwMutexAcquireTimeout)

expected := true
if locked != expected {
Expand All @@ -127,7 +127,7 @@ func TestDualWriteLockAcquired(t *testing.T) {
}

func TestDualWriteLockTimedOut(t *testing.T) {
locked := testDualWriteLock(t, time.Second)
locked := testDualWriteLock(t, testDrwMutexAcquireTimeout)

expected := false
if locked != expected {
Expand Down Expand Up @@ -214,25 +214,27 @@ func writer(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool)
}

// Borrowed from rwmutex_test.go
func HammerRWMutex(gomaxprocs, numReaders, numIterations int) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
rwm := NewDRWMutex(ds, "test")
cdone := make(chan bool)
go writer(rwm, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
go writer(rwm, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
func hammerRWMutex(t *testing.T, gomaxprocs, numReaders, numIterations int) {
t.Run(fmt.Sprintf("%d-%d-%d", gomaxprocs, numReaders, numIterations), func(t *testing.T) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
rwm := NewDRWMutex(ds, "test")
cdone := make(chan bool)
go writer(rwm, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
go writer(rwm, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go reader(rwm, numIterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
})
}

// Borrowed from rwmutex_test.go
Expand All @@ -242,16 +244,16 @@ func TestRWMutex(t *testing.T) {
if testing.Short() {
n = 5
}
HammerRWMutex(1, 1, n)
HammerRWMutex(1, 3, n)
HammerRWMutex(1, 10, n)
HammerRWMutex(4, 1, n)
HammerRWMutex(4, 3, n)
HammerRWMutex(4, 10, n)
HammerRWMutex(10, 1, n)
HammerRWMutex(10, 3, n)
HammerRWMutex(10, 10, n)
HammerRWMutex(10, 5, n)
hammerRWMutex(t, 1, 1, n)
hammerRWMutex(t, 1, 3, n)
hammerRWMutex(t, 1, 10, n)
hammerRWMutex(t, 4, 1, n)
hammerRWMutex(t, 4, 3, n)
hammerRWMutex(t, 4, 10, n)
hammerRWMutex(t, 10, 1, n)
hammerRWMutex(t, 10, 3, n)
hammerRWMutex(t, 10, 10, n)
hammerRWMutex(t, 10, 5, n)
}

// Borrowed from rwmutex_test.go
Expand All @@ -267,12 +269,13 @@ func TestUnlockPanic(t *testing.T) {

// Borrowed from rwmutex_test.go
func TestUnlockPanic2(t *testing.T) {
mu := NewDRWMutex(ds, "test-unlock-panic-2")
defer func() {
if recover() == nil {
t.Fatalf("unlock of unlocked RWMutex did not panic")
}
mu.RUnlock() // Unlock, so -test.count > 1 works
}()
mu := NewDRWMutex(ds, "test-unlock-panic-2")
mu.RLock(id, source)
mu.Unlock()
}
Expand All @@ -290,12 +293,13 @@ func TestRUnlockPanic(t *testing.T) {

// Borrowed from rwmutex_test.go
func TestRUnlockPanic2(t *testing.T) {
mu := NewDRWMutex(ds, "test-runlock-panic-2")
defer func() {
if recover() == nil {
t.Fatalf("read unlock of unlocked RWMutex did not panic")
}
mu.Unlock() // Unlock, so -test.count > 1 works
}()
mu := NewDRWMutex(ds, "test-runlock-panic-2")
mu.Lock(id, source)
mu.RUnlock()
}
Expand Down
3 changes: 3 additions & 0 deletions internal/dsync/dsync.go
Expand Up @@ -22,4 +22,7 @@ package dsync
type Dsync struct {
// List of rest client objects, one per lock server.
GetLockers func() ([]NetLocker, string)

// Timeouts to apply.
Timeouts Timeouts
}