Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak in service mirror #10833

Merged
merged 1 commit into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 31 additions & 3 deletions multicluster/service-mirror/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
remoteAPIClient *k8s.API
localAPIClient *k8s.API
stopper chan struct{}
eventBroadcaster record.EventBroadcaster
recorder record.EventRecorder
log *logging.Entry
eventsQueue workqueue.RateLimitingInterface
Expand All @@ -51,6 +52,14 @@ type (
gatewayAlive bool
liveness chan bool
headlessServicesEnabled bool

informerHandlers
}

informerHandlers struct {
svcHandler cache.ResourceEventHandlerRegistration
epHandler cache.ResourceEventHandlerRegistration
nsHandler cache.ResourceEventHandlerRegistration
}

// RemoteServiceCreated is generated whenever a remote service is created Observing
Expand Down Expand Up @@ -184,6 +193,7 @@ func NewRemoteClusterServiceWatcher(
remoteAPIClient: remoteAPI,
localAPIClient: localAPI,
stopper: stopper,
eventBroadcaster: eventBroadcaster,
recorder: recorder,
log: logging.WithFields(logging.Fields{
"cluster": clusterName,
Expand Down Expand Up @@ -781,7 +791,8 @@ func (rcsw *RemoteClusterServiceWatcher) processEvents(ctx context.Context) {
func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
rcsw.remoteAPIClient.Sync(rcsw.stopper)
rcsw.eventsQueue.Add(&OrphanedServicesGcTriggered{})
_, err := rcsw.remoteAPIClient.Svc().Informer().AddEventHandler(
var err error
rcsw.svcHandler, err = rcsw.remoteAPIClient.Svc().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(svc interface{}) {
rcsw.eventsQueue.Add(&OnAddCalled{svc.(*corev1.Service)})
Expand Down Expand Up @@ -811,7 +822,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
return err
}

_, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler(
rcsw.epHandler, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// AddFunc only relevant for exported headless endpoints
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -851,7 +862,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
return err
}

_, err = rcsw.localAPIClient.NS().Informer().AddEventHandler(
rcsw.nsHandler, err = rcsw.localAPIClient.NS().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rcsw.eventsQueue.Add(&OnLocalNamespaceAdded{obj.(*corev1.Namespace)})
Expand Down Expand Up @@ -897,6 +908,23 @@ func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) {
rcsw.eventsQueue.Add(&ClusterUnregistered{})
}
rcsw.eventsQueue.ShutDown()
rcsw.eventBroadcaster.Shutdown()

if rcsw.svcHandler != nil {
if err := rcsw.remoteAPIClient.Svc().Informer().RemoveEventHandler(rcsw.svcHandler); err != nil {
rcsw.log.Warnf("error removing service informer handler: %s", err)
}
}
if rcsw.epHandler != nil {
if err := rcsw.remoteAPIClient.Endpoint().Informer().RemoveEventHandler(rcsw.epHandler); err != nil {
rcsw.log.Warnf("error removing service informer handler: %s", err)
}
}
if rcsw.nsHandler != nil {
if err := rcsw.localAPIClient.NS().Informer().RemoveEventHandler(rcsw.nsHandler); err != nil {
rcsw.log.Warnf("error removing service informer handler: %s", err)
}
}
}

func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() ([]corev1.EndpointAddress, error) {
Expand Down
1 change: 1 addition & 0 deletions multicluster/service-mirror/probe_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (pw *ProbeWorker) run() {
probeTickerPeriod := pw.probeSpec.Period
maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period
probeTicker := NewTicker(probeTickerPeriod, maxJitter)
defer probeTicker.Stop()

probeLoop:
for {
Expand Down