Skip to content

Commit

Permalink
Fix memory leak in service mirror (#10833)
Browse files Browse the repository at this point in the history
Fixes #10746

## Repro

Just by leaving two linked clusters by themselves and monitoring the service mirror pod's RSS (via the `container_memory_working_set_bytes` metric) and goroutines count (via the `go_goroutines` metric) one can see the small but consistent memory increase accompanied by periodic increases of goroutines in batches of 5.

![Screenshot from 2023-04-27 09-28-48](https://user-images.githubusercontent.com/554287/235007199-604dd517-47ae-4f95-a863-21ae16eca898.png)

## Diagnosis

Those goroutines count increases can be correlated in the controller's log with the probe worker and cluster watcher recycling:

```
time="2023-04-27T10:06:14Z" level=info msg="Stopping probe worker" probe-key=target
time="2023-04-27T10:06:14Z" level=info msg="Starting probe worker" probe-key=target
time="2023-04-27T10:06:14Z" level=info msg="Received: Stop" apiAddress="https://192.168.32.4:6443" cluster=remote
```

Enabling pprof, the beginning of the goroutine stack dump look like this:

```
goroutine profile: total 94
10 @ 0x43c976 0x406d3b 0x406878 0x175ee77 0xb2aefe 0xb2adb6 0xb2aca9 0x175edeb 0x175ed95 0xb2cfda 0x46de41
#	0x175ee76	k8s.io/client-go/tools/cache.(*processorListener).run.func1+0x56	/go/pkg/mod/k8s.io/client-go@v0.27.1/tools/cache/shared_informer.go:968
#	0xb2aefd	k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1+0x3d		/go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/backoff.go:226
#	0xb2adb5	k8s.io/apimachinery/pkg/util/wait.BackoffUntil+0xb5			/go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/backoff.go:227
#	0xb2aca8	k8s.io/apimachinery/pkg/util/wait.JitterUntil+0x88			/go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/backoff.go:204
#	0x175edea	k8s.io/apimachinery/pkg/util/wait.Until+0x6a				/go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/backoff.go:161
#	0x175ed94	k8s.io/client-go/tools/cache.(*processorListener).run+0x14		/go/pkg/mod/k8s.io/client-go@v0.27.1/tools/cache/shared_informer.go:967
#	0xb2cfd9	k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1+0x59		/go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/wait.go:72

10 @ 0x43c976 0x44c81c 0x175eb99 0xb2cfda 0x46de41
#	0x175eb98	k8s.io/client-go/tools/cache.(*processorListener).pop+0x118	/go/pkg/mod/k8s.io/client-go@v0.27.1/tools/cache/shared_informer.go:938
#	0xb2cfd9	k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1+0x59	/go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/util/wait/wait.go:72

8 @ 0x43c976 0x406d3b 0x406878 0x19f35d3 0x46de41
#	0x19f35d2	k8s.io/client-go/tools/record.(*eventBroadcasterImpl).StartEventWatcher.func1+0x72	/go/pkg/mod/k8s.io/client-go@v0.27.1/tools/record/event.go:320

8 @ 0x43c976 0x406d3b 0x406878 0xa88825 0x46de41
#	0xa88824	k8s.io/apimachinery/pkg/watch.(*Broadcaster).loop+0x64	/go/pkg/mod/k8s.io/apimachinery@v0.27.1/pkg/watch/mux.go:268

8 @ 0x43c976 0x46abb5 0x1a2a7a5 0x46de41
#	0x46abb4	time.Sleep+0x134								/usr/local/go/src/runtime/time.go:195
#	0x1a2a7a4	github.com/linkerd/linkerd2/multicluster/service-mirror.(*Ticker).loop+0x64	/linkerd-build/multicluster/service-mirror/jittered_ticker.go:55
```

The number of goroutines shown here is almost the same as the number of goroutines count jumps we've seen in the chart when this was taken, which suggests these are indeed the leaks.

The service mirror main function contains a loop that restarts the cluster watcher (and with it its probe worker) whenever the main watch following the Link resources terminates (I don't know why that happens). And some resources aren't getting cleaned up upon the restart, producing the leak. It would be interesting to see (as a followup) why that restart is required here, but not in the other controllers.

## Resolution

We can map each one of those entries in the dump to these leaks:

### jitterred_ticker.go

Each probe worker starts a jittered ticker, whose `Stop()` method was never called. Fixed that through a `defer` statement.

### event.go

With every watcher restart a new k8s event recorder was instantiated but it wasn't cleaned. Added a `Shutdown()` call to the event's broadcaster in the cluster watcher's `Stop()` method.

### mux.go, shared_informer.go and friends

The cluster watcher attached event handlers to informers for Services, Endpoints and Namespaces. Added `RemoveEventHandler()` to them on the cluster watcher's `Stop()` method.

## Result

After the fix, we can see the goroutines count remains stable.

![image](https://user-images.githubusercontent.com/554287/235007727-3b3b6f31-901d-4c05-aedb-9fa3d3178ee1.png)
  • Loading branch information
alpeb authored and adleong committed Jun 20, 2023
1 parent dc18965 commit 76cba42
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
34 changes: 31 additions & 3 deletions multicluster/service-mirror/cluster_watcher.go
Expand Up @@ -43,6 +43,7 @@ type (
remoteAPIClient *k8s.API
localAPIClient *k8s.API
stopper chan struct{}
eventBroadcaster record.EventBroadcaster
recorder record.EventRecorder
log *logging.Entry
eventsQueue workqueue.RateLimitingInterface
Expand All @@ -51,6 +52,14 @@ type (
gatewayAlive bool
liveness chan bool
headlessServicesEnabled bool

informerHandlers
}

informerHandlers struct {
svcHandler cache.ResourceEventHandlerRegistration
epHandler cache.ResourceEventHandlerRegistration
nsHandler cache.ResourceEventHandlerRegistration
}

// RemoteServiceCreated is generated whenever a remote service is created Observing
Expand Down Expand Up @@ -184,6 +193,7 @@ func NewRemoteClusterServiceWatcher(
remoteAPIClient: remoteAPI,
localAPIClient: localAPI,
stopper: stopper,
eventBroadcaster: eventBroadcaster,
recorder: recorder,
log: logging.WithFields(logging.Fields{
"cluster": clusterName,
Expand Down Expand Up @@ -781,7 +791,8 @@ func (rcsw *RemoteClusterServiceWatcher) processEvents(ctx context.Context) {
func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
rcsw.remoteAPIClient.Sync(rcsw.stopper)
rcsw.eventsQueue.Add(&OrphanedServicesGcTriggered{})
_, err := rcsw.remoteAPIClient.Svc().Informer().AddEventHandler(
var err error
rcsw.svcHandler, err = rcsw.remoteAPIClient.Svc().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(svc interface{}) {
rcsw.eventsQueue.Add(&OnAddCalled{svc.(*corev1.Service)})
Expand Down Expand Up @@ -811,7 +822,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
return err
}

_, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler(
rcsw.epHandler, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// AddFunc only relevant for exported headless endpoints
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -851,7 +862,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
return err
}

_, err = rcsw.localAPIClient.NS().Informer().AddEventHandler(
rcsw.nsHandler, err = rcsw.localAPIClient.NS().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rcsw.eventsQueue.Add(&OnLocalNamespaceAdded{obj.(*corev1.Namespace)})
Expand Down Expand Up @@ -897,6 +908,23 @@ func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) {
rcsw.eventsQueue.Add(&ClusterUnregistered{})
}
rcsw.eventsQueue.ShutDown()
rcsw.eventBroadcaster.Shutdown()

if rcsw.svcHandler != nil {
if err := rcsw.remoteAPIClient.Svc().Informer().RemoveEventHandler(rcsw.svcHandler); err != nil {
rcsw.log.Warnf("error removing service informer handler: %s", err)
}
}
if rcsw.epHandler != nil {
if err := rcsw.remoteAPIClient.Endpoint().Informer().RemoveEventHandler(rcsw.epHandler); err != nil {
rcsw.log.Warnf("error removing service informer handler: %s", err)
}
}
if rcsw.nsHandler != nil {
if err := rcsw.localAPIClient.NS().Informer().RemoveEventHandler(rcsw.nsHandler); err != nil {
rcsw.log.Warnf("error removing service informer handler: %s", err)
}
}
}

func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() ([]corev1.EndpointAddress, error) {
Expand Down
1 change: 1 addition & 0 deletions multicluster/service-mirror/probe_worker.go
Expand Up @@ -66,6 +66,7 @@ func (pw *ProbeWorker) run() {
probeTickerPeriod := pw.probeSpec.Period
maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period
probeTicker := NewTicker(probeTickerPeriod, maxJitter)
defer probeTicker.Stop()

probeLoop:
for {
Expand Down

0 comments on commit 76cba42

Please sign in to comment.