Skip to content

Commit

Permalink
Fix racy leader pool test
Browse files Browse the repository at this point in the history
The TestSecondWatcherAcquiresReleasedLease test was racy concerning
which of the two pools would acquire the lease first. As a result,
the test would either fail due to an unexpected event order, or hang
until aborted because the pools wouldn't emit enough events for the
event collection loop to terminate. This behavior was apparently only
observable on Windows.

Fix this by adding appropriate values for AcquireTime, RenewTime and
LeaseDurationSeconds to the pre-created lease, so that it properly
reflects its acquisition by the first pool.

Also remove the namespace and pool creations from the other tests, as
they were unnecessary. Inline the last required lease creation into the
racy test case.

Remove the WithRenewDeadline options from the pools under test, as they
didn't have any impact on the test, neither semantically nor in terms
of execution times. Reduce the RetryPeriod to 10 milliseconds to provide
even more immediate test feedback. Rename some local variables and
rephrase some log strings to make them more consistent.

Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
(cherry picked from commit 544307d)
(cherry picked from commit b97c0ef)
  • Loading branch information
twz123 authored and github-actions[bot] committed May 17, 2023
1 parent b92b1d6 commit bb8d32e
Showing 1 changed file with 56 additions and 78 deletions.
134 changes: 56 additions & 78 deletions pkg/leaderelection/lease_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,18 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
coordinationv1 "k8s.io/api/coordination/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
fakecoordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
)

func TestLeasePoolWatcherTriggersOnLeaseAcquisition(t *testing.T) {
const identity = "test-node"

fakeClient := fake.NewSimpleClientset()
expectCreateNamespace(t, fakeClient)
expectCreateLease(t, fakeClient, identity)

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", WithIdentity(identity), WithNamespace("test"))
require.NoError(t, err)
Expand Down Expand Up @@ -79,8 +77,6 @@ func TestLeasePoolTriggersLostLeaseWhenCancelled(t *testing.T) {
const identity = "test-node"

fakeClient := fake.NewSimpleClientset()
expectCreateNamespace(t, fakeClient)
expectCreateLease(t, fakeClient, identity)

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", WithIdentity(identity), WithNamespace("test"))
require.NoError(t, err)
Expand All @@ -104,8 +100,6 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
const identity = "test-node"

fakeClient := fake.NewSimpleClientset()
expectCreateNamespace(t, fakeClient)
expectCreateLease(t, fakeClient, identity)

givenLeaderElectorError := func() func(err error) {
var updateErr atomic.Value
Expand All @@ -123,8 +117,7 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {

pool, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity(identity), WithNamespace("test"),
WithRetryPeriod(350*time.Millisecond),
WithRenewDeadline(500*time.Millisecond),
WithRetryPeriod(10*time.Millisecond),
)
require.NoError(t, err)

Expand Down Expand Up @@ -155,99 +148,84 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
}

func TestSecondWatcherAcquiresReleasedLease(t *testing.T) {
const (
identity = "test-node"
identity2 = "test-node-2"
)

fakeClient := fake.NewSimpleClientset()
expectCreateNamespace(t, fakeClient)
expectCreateLease(t, fakeClient, identity)

pool, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity(identity), WithNamespace("test"),
WithRetryPeriod(350*time.Millisecond),
WithRenewDeadline(500*time.Millisecond),
pool1, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity("pool1"), WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
require.NoError(t, err)

pool2, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity("pool2"), WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
require.NoError(t, err)

expectedEventOrder := []string{"pool1-acquired", "pool1-lost", "pool2-acquired"}

// Pre-create the acquired lease for the first identity, so that there are
// no races when acquiring the lease by the two competing pools.
now := metav1.NewMicroTime(time.Now())
_, err = fakeClient.CoordinationV1().Leases("test").Create(context.TODO(), &coordinationv1.Lease{
TypeMeta: metav1.TypeMeta{
Kind: "Lease",
APIVersion: coordinationv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: pointer.String("pool1"),
AcquireTime: &now,
RenewTime: &now,
LeaseDurationSeconds: pointer.Int32(60 * 60), // block lease for a very long time
},
}, metav1.CreateOptions{})
require.NoError(t, err)
t.Log("Pre-created acquired lease for first identity")

events, cancel, err := pool.Watch(WithOutputChannels(&LeaseEvents{
events1, cancel1, err := pool1.Watch(WithOutputChannels(&LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}))
require.NoError(t, err)
defer cancel()

pool2, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity(identity2),
WithNamespace("test"),
WithRetryPeriod(350*time.Millisecond),
WithRenewDeadline(500*time.Millisecond),
)
require.NoError(t, err)
defer cancel1()
t.Log("Started first lease pool")

events2, cancel2, err := pool2.Watch(WithOutputChannels(&LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}))
require.NoError(t, err)
defer cancel2()
t.Log("started second lease holder")
t.Log("Started second lease pool, receiving events ...")

var receivedEvents []string

leaseEventLoop:
for {
select {
case <-events.AcquiredLease:
t.Log("lease acquired, cancelling leaser")
cancel()
receivedEvents = append(receivedEvents, "node1-acquired")
case <-events.LostLease:
t.Log("context cancelled and node 1 lease successfully lost")
receivedEvents = append(receivedEvents, "node1-lost")
case <-events1.AcquiredLease:
t.Log("First lease acquired, cancelling pool")
cancel1()
receivedEvents = append(receivedEvents, "pool1-acquired")
case <-events1.LostLease:
t.Log("First lease lost")
receivedEvents = append(receivedEvents, "pool1-lost")
case <-events2.AcquiredLease:
t.Log("node 2 lease acquired")
receivedEvents = append(receivedEvents, "node2-acquired")
default:
if len(receivedEvents) >= 3 {
break leaseEventLoop
}
t.Log("Second lease acquired")
receivedEvents = append(receivedEvents, "pool2-acquired")
case <-events2.LostLease:
t.Log("Second lease lost")
receivedEvents = append(receivedEvents, "pool2-lost")
case <-time.After(10 * time.Second):
require.Fail(t, "Didn't receive any events for 10 seconds.")
}
}

assert.Equal(t, "node1-acquired", receivedEvents[0])
assert.Equal(t, "node1-lost", receivedEvents[1])
assert.Equal(t, "node2-acquired", receivedEvents[2])
}

func expectCreateNamespace(t *testing.T, fakeClient *fake.Clientset) {
_, err := fakeClient.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{
TypeMeta: metav1.TypeMeta{
Kind: "Namespace",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1.NamespaceSpec{},
}, metav1.CreateOptions{})
require.NoError(t, err)
}
if len(receivedEvents) >= 3 {
break
}
}

func expectCreateLease(t *testing.T, fakeClient *fake.Clientset, identity string) {
_, err := fakeClient.CoordinationV1().Leases("test").Create(context.TODO(), &coordinationv1.Lease{
TypeMeta: metav1.TypeMeta{
Kind: "Lease",
APIVersion: "coordination.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &identity,
},
}, metav1.CreateOptions{})
require.NoError(t, err)
assert.Equal(t, expectedEventOrder, receivedEvents)
}

0 comments on commit bb8d32e

Please sign in to comment.