diff --git a/pkg/leaderelection/lease_pool_test.go b/pkg/leaderelection/lease_pool_test.go index f316bbba07ef..97eaa255bc87 100644 --- a/pkg/leaderelection/lease_pool_test.go +++ b/pkg/leaderelection/lease_pool_test.go @@ -26,20 +26,18 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" coordinationv1 "k8s.io/api/coordination/v1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" fakecoordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1/fake" k8stesting "k8s.io/client-go/testing" + "k8s.io/utils/pointer" ) func TestLeasePoolWatcherTriggersOnLeaseAcquisition(t *testing.T) { const identity = "test-node" fakeClient := fake.NewSimpleClientset() - expectCreateNamespace(t, fakeClient) - expectCreateLease(t, fakeClient, identity) pool, err := NewLeasePool(context.TODO(), fakeClient, "test", WithIdentity(identity), WithNamespace("test")) require.NoError(t, err) @@ -79,8 +77,6 @@ func TestLeasePoolTriggersLostLeaseWhenCancelled(t *testing.T) { const identity = "test-node" fakeClient := fake.NewSimpleClientset() - expectCreateNamespace(t, fakeClient) - expectCreateLease(t, fakeClient, identity) pool, err := NewLeasePool(context.TODO(), fakeClient, "test", WithIdentity(identity), WithNamespace("test")) require.NoError(t, err) @@ -104,8 +100,6 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) { const identity = "test-node" fakeClient := fake.NewSimpleClientset() - expectCreateNamespace(t, fakeClient) - expectCreateLease(t, fakeClient, identity) givenLeaderElectorError := func() func(err error) { var updateErr atomic.Value @@ -123,8 +117,7 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) { pool, err := NewLeasePool(context.TODO(), fakeClient, "test", WithIdentity(identity), WithNamespace("test"), - WithRetryPeriod(350*time.Millisecond), - WithRenewDeadline(500*time.Millisecond), + WithRetryPeriod(10*time.Millisecond), ) require.NoError(t, err) @@ -155,37 +148,50 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) { } func TestSecondWatcherAcquiresReleasedLease(t *testing.T) { - const ( - identity = "test-node" - identity2 = "test-node-2" - ) - fakeClient := fake.NewSimpleClientset() - expectCreateNamespace(t, fakeClient) - expectCreateLease(t, fakeClient, identity) - pool, err := NewLeasePool(context.TODO(), fakeClient, "test", - WithIdentity(identity), WithNamespace("test"), - WithRetryPeriod(350*time.Millisecond), - WithRenewDeadline(500*time.Millisecond), + pool1, err := NewLeasePool(context.TODO(), fakeClient, "test", + WithIdentity("pool1"), WithNamespace("test"), + WithRetryPeriod(10*time.Millisecond), + ) + require.NoError(t, err) + + pool2, err := NewLeasePool(context.TODO(), fakeClient, "test", + WithIdentity("pool2"), WithNamespace("test"), + WithRetryPeriod(10*time.Millisecond), ) + require.NoError(t, err) + expectedEventOrder := []string{"pool1-acquired", "pool1-lost", "pool2-acquired"} + + // Pre-create the acquired lease for the first identity, so that there are + // no races when acquiring the lease by the two competing pools. + now := metav1.NewMicroTime(time.Now()) + _, err = fakeClient.CoordinationV1().Leases("test").Create(context.TODO(), &coordinationv1.Lease{ + TypeMeta: metav1.TypeMeta{ + Kind: "Lease", + APIVersion: coordinationv1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: pointer.String("pool1"), + AcquireTime: &now, + RenewTime: &now, + LeaseDurationSeconds: pointer.Int32(60 * 60), // block lease for a very long time + }, + }, metav1.CreateOptions{}) require.NoError(t, err) + t.Log("Pre-created acquired lease for first identity") - events, cancel, err := pool.Watch(WithOutputChannels(&LeaseEvents{ + events1, cancel1, err := pool1.Watch(WithOutputChannels(&LeaseEvents{ AcquiredLease: make(chan struct{}, 1), LostLease: make(chan struct{}, 1), })) require.NoError(t, err) - defer cancel() - - pool2, err := NewLeasePool(context.TODO(), fakeClient, "test", - WithIdentity(identity2), - WithNamespace("test"), - WithRetryPeriod(350*time.Millisecond), - WithRenewDeadline(500*time.Millisecond), - ) - require.NoError(t, err) + defer cancel1() + t.Log("Started first lease pool") events2, cancel2, err := pool2.Watch(WithOutputChannels(&LeaseEvents{ AcquiredLease: make(chan struct{}, 1), @@ -193,61 +199,33 @@ func TestSecondWatcherAcquiresReleasedLease(t *testing.T) { })) require.NoError(t, err) defer cancel2() - t.Log("started second lease holder") + t.Log("Started second lease pool, receiving events ...") var receivedEvents []string -leaseEventLoop: for { select { - case <-events.AcquiredLease: - t.Log("lease acquired, cancelling leaser") - cancel() - receivedEvents = append(receivedEvents, "node1-acquired") - case <-events.LostLease: - t.Log("context cancelled and node 1 lease successfully lost") - receivedEvents = append(receivedEvents, "node1-lost") + case <-events1.AcquiredLease: + t.Log("First lease acquired, cancelling pool") + cancel1() + receivedEvents = append(receivedEvents, "pool1-acquired") + case <-events1.LostLease: + t.Log("First lease lost") + receivedEvents = append(receivedEvents, "pool1-lost") case <-events2.AcquiredLease: - t.Log("node 2 lease acquired") - receivedEvents = append(receivedEvents, "node2-acquired") - default: - if len(receivedEvents) >= 3 { - break leaseEventLoop - } + t.Log("Second lease acquired") + receivedEvents = append(receivedEvents, "pool2-acquired") + case <-events2.LostLease: + t.Log("Second lease lost") + receivedEvents = append(receivedEvents, "pool2-lost") + case <-time.After(10 * time.Second): + require.Fail(t, "Didn't receive any events for 10 seconds.") } - } - - assert.Equal(t, "node1-acquired", receivedEvents[0]) - assert.Equal(t, "node1-lost", receivedEvents[1]) - assert.Equal(t, "node2-acquired", receivedEvents[2]) -} -func expectCreateNamespace(t *testing.T, fakeClient *fake.Clientset) { - _, err := fakeClient.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ - TypeMeta: metav1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: v1.NamespaceSpec{}, - }, metav1.CreateOptions{}) - require.NoError(t, err) -} + if len(receivedEvents) >= 3 { + break + } + } -func expectCreateLease(t *testing.T, fakeClient *fake.Clientset, identity string) { - _, err := fakeClient.CoordinationV1().Leases("test").Create(context.TODO(), &coordinationv1.Lease{ - TypeMeta: metav1.TypeMeta{ - Kind: "Lease", - APIVersion: "coordination.k8s.io/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: coordinationv1.LeaseSpec{ - HolderIdentity: &identity, - }, - }, metav1.CreateOptions{}) - require.NoError(t, err) + assert.Equal(t, expectedEventOrder, receivedEvents) }