Skip to content

Commit

Permalink
Merge pull request #3126 from k0sproject/backport-3039-to-release-1.26
Browse files Browse the repository at this point in the history
[Backport release-1.26] Fix racy leader pool test
  • Loading branch information
juanluisvaladas committed May 18, 2023
2 parents 13c3bb0 + bb8d32e commit 8c55c8e
Showing 1 changed file with 56 additions and 78 deletions.
134 changes: 56 additions & 78 deletions pkg/leaderelection/lease_pool_test.go
Expand Up @@ -26,20 +26,18 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
coordinationv1 "k8s.io/api/coordination/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
fakecoordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
)

func TestLeasePoolWatcherTriggersOnLeaseAcquisition(t *testing.T) {
const identity = "test-node"

fakeClient := fake.NewSimpleClientset()
expectCreateNamespace(t, fakeClient)
expectCreateLease(t, fakeClient, identity)

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", WithIdentity(identity), WithNamespace("test"))
require.NoError(t, err)
Expand Down Expand Up @@ -79,8 +77,6 @@ func TestLeasePoolTriggersLostLeaseWhenCancelled(t *testing.T) {
const identity = "test-node"

fakeClient := fake.NewSimpleClientset()
expectCreateNamespace(t, fakeClient)
expectCreateLease(t, fakeClient, identity)

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", WithIdentity(identity), WithNamespace("test"))
require.NoError(t, err)
Expand All @@ -104,8 +100,6 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
const identity = "test-node"

fakeClient := fake.NewSimpleClientset()
expectCreateNamespace(t, fakeClient)
expectCreateLease(t, fakeClient, identity)

givenLeaderElectorError := func() func(err error) {
var updateErr atomic.Value
Expand All @@ -123,8 +117,7 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {

pool, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity(identity), WithNamespace("test"),
WithRetryPeriod(350*time.Millisecond),
WithRenewDeadline(500*time.Millisecond),
WithRetryPeriod(10*time.Millisecond),
)
require.NoError(t, err)

Expand Down Expand Up @@ -155,99 +148,84 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
}

func TestSecondWatcherAcquiresReleasedLease(t *testing.T) {
const (
identity = "test-node"
identity2 = "test-node-2"
)

fakeClient := fake.NewSimpleClientset()
expectCreateNamespace(t, fakeClient)
expectCreateLease(t, fakeClient, identity)

pool, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity(identity), WithNamespace("test"),
WithRetryPeriod(350*time.Millisecond),
WithRenewDeadline(500*time.Millisecond),
pool1, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity("pool1"), WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
require.NoError(t, err)

pool2, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity("pool2"), WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
require.NoError(t, err)

expectedEventOrder := []string{"pool1-acquired", "pool1-lost", "pool2-acquired"}

// Pre-create the acquired lease for the first identity, so that there are
// no races when acquiring the lease by the two competing pools.
now := metav1.NewMicroTime(time.Now())
_, err = fakeClient.CoordinationV1().Leases("test").Create(context.TODO(), &coordinationv1.Lease{
TypeMeta: metav1.TypeMeta{
Kind: "Lease",
APIVersion: coordinationv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: pointer.String("pool1"),
AcquireTime: &now,
RenewTime: &now,
LeaseDurationSeconds: pointer.Int32(60 * 60), // block lease for a very long time
},
}, metav1.CreateOptions{})
require.NoError(t, err)
t.Log("Pre-created acquired lease for first identity")

events, cancel, err := pool.Watch(WithOutputChannels(&LeaseEvents{
events1, cancel1, err := pool1.Watch(WithOutputChannels(&LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}))
require.NoError(t, err)
defer cancel()

pool2, err := NewLeasePool(context.TODO(), fakeClient, "test",
WithIdentity(identity2),
WithNamespace("test"),
WithRetryPeriod(350*time.Millisecond),
WithRenewDeadline(500*time.Millisecond),
)
require.NoError(t, err)
defer cancel1()
t.Log("Started first lease pool")

events2, cancel2, err := pool2.Watch(WithOutputChannels(&LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}))
require.NoError(t, err)
defer cancel2()
t.Log("started second lease holder")
t.Log("Started second lease pool, receiving events ...")

var receivedEvents []string

leaseEventLoop:
for {
select {
case <-events.AcquiredLease:
t.Log("lease acquired, cancelling leaser")
cancel()
receivedEvents = append(receivedEvents, "node1-acquired")
case <-events.LostLease:
t.Log("context cancelled and node 1 lease successfully lost")
receivedEvents = append(receivedEvents, "node1-lost")
case <-events1.AcquiredLease:
t.Log("First lease acquired, cancelling pool")
cancel1()
receivedEvents = append(receivedEvents, "pool1-acquired")
case <-events1.LostLease:
t.Log("First lease lost")
receivedEvents = append(receivedEvents, "pool1-lost")
case <-events2.AcquiredLease:
t.Log("node 2 lease acquired")
receivedEvents = append(receivedEvents, "node2-acquired")
default:
if len(receivedEvents) >= 3 {
break leaseEventLoop
}
t.Log("Second lease acquired")
receivedEvents = append(receivedEvents, "pool2-acquired")
case <-events2.LostLease:
t.Log("Second lease lost")
receivedEvents = append(receivedEvents, "pool2-lost")
case <-time.After(10 * time.Second):
require.Fail(t, "Didn't receive any events for 10 seconds.")
}
}

assert.Equal(t, "node1-acquired", receivedEvents[0])
assert.Equal(t, "node1-lost", receivedEvents[1])
assert.Equal(t, "node2-acquired", receivedEvents[2])
}

func expectCreateNamespace(t *testing.T, fakeClient *fake.Clientset) {
_, err := fakeClient.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{
TypeMeta: metav1.TypeMeta{
Kind: "Namespace",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1.NamespaceSpec{},
}, metav1.CreateOptions{})
require.NoError(t, err)
}
if len(receivedEvents) >= 3 {
break
}
}

func expectCreateLease(t *testing.T, fakeClient *fake.Clientset, identity string) {
_, err := fakeClient.CoordinationV1().Leases("test").Create(context.TODO(), &coordinationv1.Lease{
TypeMeta: metav1.TypeMeta{
Kind: "Lease",
APIVersion: "coordination.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &identity,
},
}, metav1.CreateOptions{})
require.NoError(t, err)
assert.Equal(t, expectedEventOrder, receivedEvents)
}

0 comments on commit 8c55c8e

Please sign in to comment.