From 4eb05559e3eef012eb6f597e99f7c291b4fe3b23 Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Thu, 27 Apr 2023 17:43:06 -0500 Subject: [PATCH] Fix memory leak in service mirror Fixes #10746 ## Repro Just by leaving two linked clusters by themselves and monitoring the service mirror pod's RSS (via the `container_memory_working_set_bytes` metric) and goroutines count (via the `go_goroutines` metric) one can see the small but consistent memory increase accompanied by periodic increases of goroutines in batches of 5 (see PR for chart). ## Diagnosis Those goroutines count increases can be correlated in the controller's log with the probe worker and cluster watcher recycling: ``` time="2023-04-27T10:06:14Z" level=info msg="Stopping probe worker" probe-key=target time="2023-04-27T10:06:14Z" level=info msg="Starting probe worker" probe-key=target time="2023-04-27T10:06:14Z" level=info msg="Received: Stop" apiAddress="https://192.168.32.4:6443" cluster=remote ``` Enabling pprof, the beginning of the goroutine stack dump look like this: ``` goroutine profile: total 94 10 @ 0x43c976 0x406d3b 0x406878 0x175ee77 0xb2aefe 0xb2adb6 0xb2aca9 0x175edeb 0x175ed95 0xb2cfda 0x46de41 # 0x175ee76 k8s.io/client-go/tools/cache.(*processorListener).run.func1+0x56 /go/pkg/mod/k8s.io/client-go@v0.27.1/tools/cache/shared_informer.go:968 # 0xb2aefd k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1+0x3d /go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/backoff.go:226 # 0xb2adb5 k8s.io/apimachinery/pkg/util/wait.BackoffUntil+0xb5 /go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/backoff.go:227 # 0xb2aca8 k8s.io/apimachinery/pkg/util/wait.JitterUntil+0x88 /go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/backoff.go:204 # 0x175edea k8s.io/apimachinery/pkg/util/wait.Until+0x6a /go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/backoff.go:161 # 0x175ed94 k8s.io/client-go/tools/cache.(*processorListener).run+0x14 /go/pkg/mod/k8s.io/client-go@v0.27.1/tools/cache/shared_informer.go:967 # 0xb2cfd9 k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1+0x59 /go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/wait.go:72 10 @ 0x43c976 0x44c81c 0x175eb99 0xb2cfda 0x46de41 # 0x175eb98 k8s.io/client-go/tools/cache.(*processorListener).pop+0x118 /go/pkg/mod/k8s.io/client-go@v0.27.1/tools/cache/shared_informer.go:938 # 0xb2cfd9 k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1+0x59 /go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/wait.go:72 8 @ 0x43c976 0x406d3b 0x406878 0x19f35d3 0x46de41 # 0x19f35d2 k8s.io/client-go/tools/record.(*eventBroadcasterImpl).StartEventWatcher.func1+0x72 /go/pkg/mod/k8s.io/client-go@v0.27.1/tools/record/event.go:320 8 @ 0x43c976 0x406d3b 0x406878 0xa88825 0x46de41 # 0xa88824 k8s.io/apimachinery/pkg/watch.(*Broadcaster).loop+0x64 /go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/watch/mux.go:268 8 @ 0x43c976 0x46abb5 0x1a2a7a5 0x46de41 # 0x46abb4 time.Sleep+0x134 /usr/local/go/src/runtime/time.go:195 # 0x1a2a7a4 github.com/linkerd/linkerd2/multicluster/service-mirror.(*Ticker).loop+0x64 /linkerd-build/multicluster/service-mirror/jittered_ticker.go:55 ``` The number of goroutines shown here is almost the same as the number of goroutines count jumps we've seen in the chart when this was taken, which suggests these are indeed the leaks. The service mirror main function contains a loop that restarts the cluster watcher (and with it its probe worker) whenever the main watch following the Link resources terminates (I don't know why that happens). And some resources aren't getting cleaned up upon the restart, producint the leak. It would be interesting to see (as a followup) why that restart is required here, but not in the other controllers. ## Resolution We can map each one of those entries in the dump to these leaks: ### jitterred_ticker.go Each probe worker starts a jittered ticker, whose `Stop()` method was never called. Fixed that through a `defer` statement. ### event.go With every watcher restart a new k8s event recorder was instantiated but it wasn't cleaned. Added a `Shutdown()` call to the event's broadcaster in the cluster watcher's `Stop()` method. ### mux.go, shared_informer.go and friends The cluster watcher attached event handlers to informers for Services, Endpoints and Namespaces. Added `RemoveEventHandler()` to them on the cluster watcher's `Stop()` method. ## Result After the fix, we can see the goroutines count remains stable (see PR for chart). --- .../service-mirror/cluster_watcher.go | 34 +++++++++++++++++-- multicluster/service-mirror/probe_worker.go | 1 + 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index dc41cd4059b43..d55200f1d6eaf 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -43,6 +43,7 @@ type ( remoteAPIClient *k8s.API localAPIClient *k8s.API stopper chan struct{} + eventBroadcaster record.EventBroadcaster recorder record.EventRecorder log *logging.Entry eventsQueue workqueue.RateLimitingInterface @@ -51,6 +52,14 @@ type ( gatewayAlive bool liveness chan bool headlessServicesEnabled bool + + informerHandlers + } + + informerHandlers struct { + svcHandler cache.ResourceEventHandlerRegistration + epHandler cache.ResourceEventHandlerRegistration + nsHandler cache.ResourceEventHandlerRegistration } // RemoteServiceCreated is generated whenever a remote service is created Observing @@ -184,6 +193,7 @@ func NewRemoteClusterServiceWatcher( remoteAPIClient: remoteAPI, localAPIClient: localAPI, stopper: stopper, + eventBroadcaster: eventBroadcaster, recorder: recorder, log: logging.WithFields(logging.Fields{ "cluster": clusterName, @@ -781,7 +791,8 @@ func (rcsw *RemoteClusterServiceWatcher) processEvents(ctx context.Context) { func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { rcsw.remoteAPIClient.Sync(rcsw.stopper) rcsw.eventsQueue.Add(&OrphanedServicesGcTriggered{}) - _, err := rcsw.remoteAPIClient.Svc().Informer().AddEventHandler( + var err error + rcsw.svcHandler, err = rcsw.remoteAPIClient.Svc().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(svc interface{}) { rcsw.eventsQueue.Add(&OnAddCalled{svc.(*corev1.Service)}) @@ -811,7 +822,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { return err } - _, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( + rcsw.epHandler, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ // AddFunc only relevant for exported headless endpoints AddFunc: func(obj interface{}) { @@ -851,7 +862,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { return err } - _, err = rcsw.localAPIClient.NS().Informer().AddEventHandler( + rcsw.nsHandler, err = rcsw.localAPIClient.NS().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { rcsw.eventsQueue.Add(&OnLocalNamespaceAdded{obj.(*corev1.Namespace)}) @@ -897,6 +908,23 @@ func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) { rcsw.eventsQueue.Add(&ClusterUnregistered{}) } rcsw.eventsQueue.ShutDown() + rcsw.eventBroadcaster.Shutdown() + + if rcsw.svcHandler != nil { + if err := rcsw.remoteAPIClient.Svc().Informer().RemoveEventHandler(rcsw.svcHandler); err != nil { + rcsw.log.Warnf("error removing service informer handler: %s", err) + } + } + if rcsw.epHandler != nil { + if err := rcsw.remoteAPIClient.Endpoint().Informer().RemoveEventHandler(rcsw.epHandler); err != nil { + rcsw.log.Warnf("error removing service informer handler: %s", err) + } + } + if rcsw.nsHandler != nil { + if err := rcsw.localAPIClient.NS().Informer().RemoveEventHandler(rcsw.nsHandler); err != nil { + rcsw.log.Warnf("error removing service informer handler: %s", err) + } + } } func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() ([]corev1.EndpointAddress, error) { diff --git a/multicluster/service-mirror/probe_worker.go b/multicluster/service-mirror/probe_worker.go index 9992eaf216262..297c60cdeffd0 100644 --- a/multicluster/service-mirror/probe_worker.go +++ b/multicluster/service-mirror/probe_worker.go @@ -66,6 +66,7 @@ func (pw *ProbeWorker) run() { probeTickerPeriod := pw.probeSpec.Period maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period probeTicker := NewTicker(probeTickerPeriod, maxJitter) + defer probeTicker.Stop() probeLoop: for {