Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaking goroutines in multiple integration tests #110264

Merged
merged 9 commits into from
Jun 2, 2022
98 changes: 22 additions & 76 deletions test/integration/daemonset/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (

var zero = int64(0)

func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
func setup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition"}, framework.SharedEtcd())

Expand All @@ -74,22 +74,15 @@ func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsC
t.Fatalf("error creating DaemonSets controller: %v", err)
}

return server.TearDownFn, dc, informers, clientSet
}
ctx, cancel := context.WithCancel(context.Background())

func setupScheduler(
ctx context.Context,
t *testing.T,
cs clientset.Interface,
informerFactory informers.SharedInformerFactory,
) {
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: cs.EventsV1(),
Interface: clientSet.EventsV1(),
})

sched, err := scheduler.New(
cs,
informerFactory,
clientSet,
informers,
nil,
profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(),
Expand All @@ -99,8 +92,15 @@ func setupScheduler(
}

eventBroadcaster.StartRecordingToSink(ctx.Done())

go sched.Run(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will run without the informers started, is that something we should be worried?
it seems all the Run and Start are done later on the tests bodys

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, basically we are moving the

  // Start Scheduler
  setupScheduler(ctx, t, clientset, informers)

to setup , but it seems that the informers weren't started before this change , 🤷 technically should be the same


tearDownFn := func() {
cancel()
server.TearDownFn()
eventBroadcaster.Shutdown()
}

return ctx, tearDownFn, dc, informers, clientSet
}

func testLabels() map[string]string {
Expand Down Expand Up @@ -421,7 +421,7 @@ func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSe

func TestOneNodeDaemonLaunchesPod(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -431,12 +431,6 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

Expand All @@ -460,7 +454,7 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {

func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -470,15 +464,9 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
_, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
Expand All @@ -496,7 +484,7 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {

func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -506,15 +494,9 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy

Expand Down Expand Up @@ -565,7 +547,7 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {

func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -575,15 +557,9 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
_, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
Expand Down Expand Up @@ -612,7 +588,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
// not schedule Pods onto the nodes with insufficient resource.
func TestInsufficientCapacityNode(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "insufficient-capacity", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -622,15 +598,9 @@ func TestInsufficientCapacityNode(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

ds := newDaemonSet("foo", ns.Name)
ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m")
ds.Spec.UpdateStrategy = *strategy
Expand Down Expand Up @@ -676,7 +646,7 @@ func TestInsufficientCapacityNode(t *testing.T) {
// TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a
// hash collision with an existing ControllerRevision
func TestLaunchWithHashCollision(t *testing.T) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -685,15 +655,9 @@ func TestLaunchWithHashCollision(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

// Create single node
_, err := nodeClient.Create(context.TODO(), newNode("single-node", nil), metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -787,7 +751,7 @@ func TestLaunchWithHashCollision(t *testing.T) {
// 2. Add a node to ensure the controller sync
// 3. The dsc is expected to "PATCH" the existing pod label with new hash and deletes the old controllerrevision once finishes the update
func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -796,15 +760,9 @@ func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

// Create single node
_, err := nodeClient.Create(context.TODO(), newNode("single-node", nil), metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -915,7 +873,7 @@ func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) {
// TestTaintedNode tests tainted node isn't expected to have pod scheduled
func TestTaintedNode(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "tainted-node", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -925,15 +883,9 @@ func TestTaintedNode(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
ds, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
Expand Down Expand Up @@ -980,7 +932,7 @@ func TestTaintedNode(t *testing.T) {
// to the Unschedulable nodes.
func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "daemonset-unschedulable-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
Expand All @@ -990,15 +942,9 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

// Start Scheduler
setupScheduler(ctx, t, clientset, informers)

ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.HostNetwork = true
Expand Down