Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaking goroutines in multiple integration tests #110362

Merged
merged 6 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 0 additions & 7 deletions pkg/controller/certificates/certificate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
certificatesinformers "k8s.io/client-go/informers/certificates/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
certificateslisters "k8s.io/client-go/listers/certificates/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
Expand All @@ -60,11 +58,6 @@ func NewCertificateController(
csrInformer certificatesinformers.CertificateSigningRequestInformer,
handler func(context.Context, *certificates.CertificateSigningRequest) error,
) *CertificateController {
// Send events to the apiserver
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
Comment on lines -63 to -66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just wasn't used anywhere :D


cc := &CertificateController{
name: name,
kubeClient: kubeClient,
Expand Down
17 changes: 11 additions & 6 deletions pkg/controller/disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,18 @@ func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string,

func (dc *DisruptionController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
if dc.kubeClient != nil {
klog.Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
} else {
klog.Infof("No api server defined - no events will be sent to API server.")
}
defer dc.broadcaster.Shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you defer dc.broadcaster.Shutdown() always or only if it "startedRecordingToSink" ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we defer it always (it's unrelated to whether we record to sink or not)


defer dc.queue.ShutDown()
defer dc.recheckQueue.ShutDown()

klog.Infof("Starting disruption controller")
defer klog.Infof("Shutting down disruption controller")
Expand All @@ -367,12 +378,6 @@ func (dc *DisruptionController) Run(ctx context.Context) {
return
}

if dc.kubeClient != nil {
klog.Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
} else {
klog.Infof("No api server defined - no events will be sent to API server.")
}
go wait.UntilWithContext(ctx, dc.worker, time.Second)
go wait.Until(dc.recheckWorker, time.Second, ctx.Done())

Expand Down
28 changes: 16 additions & 12 deletions pkg/controller/nodelifecycle/node_lifecycle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ type Controller struct {

getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)

recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder

// Value controlling Controller monitoring period, i.e. how often does Controller
// check node health signal posted from kubelet. This value should be lower than
Expand Down Expand Up @@ -372,13 +373,6 @@ func NewNodeLifecycleController(

eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
eventBroadcaster.StartStructuredLogging(0)

klog.Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),
})

if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
Expand All @@ -390,6 +384,7 @@ func NewNodeLifecycleController(
knownNodeSet: make(map[string]*v1.Node),
nodeHealthMap: newNodeHealthMap(),
nodeEvictionMap: newNodeEvictionMap(),
broadcaster: eventBroadcaster,
recorder: recorder,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
Expand Down Expand Up @@ -536,6 +531,19 @@ func NewNodeLifecycleController(
func (nc *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
nc.broadcaster.StartStructuredLogging(0)
klog.Infof("Sending events to api server.")
nc.broadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: v1core.New(nc.kubeClient.CoreV1().RESTClient()).Events(""),
})
defer nc.broadcaster.Shutdown()

// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
defer nc.podUpdateQueue.ShutDown()

klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")

Expand All @@ -547,10 +555,6 @@ func (nc *Controller) Run(ctx context.Context) {
go nc.taintManager.Run(ctx)
}

// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
defer nc.podUpdateQueue.ShutDown()

// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
Expand Down
24 changes: 17 additions & 7 deletions pkg/controller/nodelifecycle/scheduler/taint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
// from Nodes tainted with NoExecute Taints.
type NoExecuteTaintManager struct {
client clientset.Interface
broadcaster record.EventBroadcaster
recorder record.EventRecorder
getPod GetPodFunc
getNode GetNodeFunc
Expand Down Expand Up @@ -158,16 +159,10 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
eventBroadcaster.StartStructuredLogging(0)
if c != nil {
klog.InfoS("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")})
} else {
klog.Fatalf("kubeClient is nil when starting NodeController")
}

tm := &NoExecuteTaintManager{
client: c,
broadcaster: eventBroadcaster,
recorder: recorder,
getPod: getPod,
getNode: getNode,
Expand All @@ -184,8 +179,23 @@ func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

klog.InfoS("Starting NoExecuteTaintManager")

// Start events processing pipeline.
tc.broadcaster.StartStructuredLogging(0)
if tc.client != nil {
klog.InfoS("Sending events to api server")
tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
} else {
klog.Fatalf("kubeClient is nil when starting NodeController")
}
defer tc.broadcaster.Shutdown()

defer tc.nodeUpdateQueue.ShutDown()
defer tc.podUpdateQueue.ShutDown()

for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
Expand Down
5 changes: 2 additions & 3 deletions pkg/controlplane/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery"
apiserverfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic"
Expand Down Expand Up @@ -477,7 +476,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
time.Duration(c.ExtraConfig.IdentityLeaseRenewIntervalSeconds)*time.Second,
metav1.NamespaceSystem,
labelAPIServerHeartbeat)
go controller.Run(wait.NeverStop)
go controller.Run(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
Expand All @@ -490,7 +489,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
time.Duration(c.ExtraConfig.IdentityLeaseDurationSeconds)*time.Second,
metav1.NamespaceSystem,
KubeAPIServerIdentityLeaseLabelSelector,
).Run(wait.NeverStop)
).Run(hookContext.StopCh)
return nil
})
}
Expand Down
6 changes: 3 additions & 3 deletions test/integration/certificates/duration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ import (
func TestCSRDuration(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
t.Cleanup(s.TearDownFn)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so many t.Cleanup() functions is not complex to follow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhat - I was just trying to do minimal changes...


// assert that the metrics we collect during the test run match expectations
// we have 7 valid test cases below that request a duration of which 6 should have their duration honored
wantMetricStrings := []string{
Expand Down
41 changes: 25 additions & 16 deletions test/integration/disruption/disruption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
func TestPDBWithScaleSubresource(t *testing.T) {
s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t)
defer s.TearDownFn()
ctx := context.TODO()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nsName := "pdb-scale-subresource"
createNs(ctx, t, nsName, clientSet)

Expand Down Expand Up @@ -187,16 +189,14 @@ func TestPDBWithScaleSubresource(t *testing.T) {
}

func TestEmptySelector(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testcases := []struct {
name string
createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
createPDBFunc func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
expectedCurrentHealthy int32
}{
{
name: "v1beta1 should not target any pods",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &v1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -213,7 +213,7 @@ func TestEmptySelector(t *testing.T) {
},
{
name: "v1 should target all pods",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -235,6 +235,9 @@ func TestEmptySelector(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t)
defer s.TearDownFn()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

nsName := fmt.Sprintf("pdb-empty-selector-%d", i)
createNs(ctx, t, nsName, clientSet)

Expand All @@ -252,7 +255,7 @@ func TestEmptySelector(t *testing.T) {
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning)

pdbName := "test-pdb"
if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil {
if err := tc.createPDBFunc(ctx, clientSet, pdbName, nsName, minAvailable); err != nil {
t.Errorf("Error creating PodDisruptionBudget: %v", err)
}

Expand All @@ -271,16 +274,14 @@ func TestEmptySelector(t *testing.T) {
}

func TestSelectorsForPodsWithoutLabels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testcases := []struct {
name string
createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
createPDBFunc func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
expectedCurrentHealthy int32
}{
{
name: "pods with no labels can be targeted by v1 PDBs with empty selector",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -297,7 +298,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
},
{
name: "pods with no labels can be targeted by v1 PDBs with DoesNotExist selector",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -321,7 +322,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
},
{
name: "pods with no labels can be targeted by v1beta1 PDBs with DoesNotExist selector",
createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error {
pdb := &v1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand Down Expand Up @@ -350,6 +351,9 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t)
defer s.TearDownFn()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

nsName := fmt.Sprintf("pdb-selectors-%d", i)
createNs(ctx, t, nsName, clientSet)

Expand All @@ -360,7 +364,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {

// Create the PDB first and wait for it to settle.
pdbName := "test-pdb"
if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil {
if err := tc.createPDBFunc(ctx, clientSet, pdbName, nsName, minAvailable); err != nil {
t.Errorf("Error creating PodDisruptionBudget: %v", err)
}
waitPDBStable(ctx, t, clientSet, 0, nsName, pdbName)
Expand Down Expand Up @@ -498,9 +502,15 @@ func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podN
}

func TestPatchCompatibility(t *testing.T) {
s, _, _, clientSet, _, _ := setup(t)
s, pdbc, _, clientSet, _, _ := setup(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you need to start the informers too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't - just creating informers doesn't start any goroutines - so we don't need to touch them

defer s.TearDownFn()

// Even though pdbc isn't used in this test, its creation is already
// spawning some goroutines. So we need to run it to ensure they won't leak.
ctx, cancel := context.WithCancel(context.Background())
cancel()
pdbc.Run(ctx)

testcases := []struct {
name string
version string
Expand Down Expand Up @@ -634,5 +644,4 @@ func TestPatchCompatibility(t *testing.T) {
}
})
}

}
4 changes: 4 additions & 0 deletions test/integration/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@ func TestEventCompatibility(t *testing.T) {
if err != nil {
t.Fatal(err)
}

stopCh := make(chan struct{})
defer close(stopCh)
oldBroadcaster := record.NewBroadcaster()
defer oldBroadcaster.Shutdown()
oldRecorder := oldBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "integration"})
oldBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{Interface: client.CoreV1().Events("")})
oldRecorder.Eventf(regarding, v1.EventTypeNormal, "started", "note")

newBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
defer newBroadcaster.Shutdown()
newRecorder := newBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-scheduler")
newBroadcaster.StartRecordingToSink(stopCh)
newRecorder.Eventf(regarding, related, v1.EventTypeNormal, "memoryPressure", "killed", "memory pressure")
Expand Down