diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index dc41cd4059b43..d55200f1d6eaf 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -43,6 +43,7 @@ type ( remoteAPIClient *k8s.API localAPIClient *k8s.API stopper chan struct{} + eventBroadcaster record.EventBroadcaster recorder record.EventRecorder log *logging.Entry eventsQueue workqueue.RateLimitingInterface @@ -51,6 +52,14 @@ type ( gatewayAlive bool liveness chan bool headlessServicesEnabled bool + + informerHandlers + } + + informerHandlers struct { + svcHandler cache.ResourceEventHandlerRegistration + epHandler cache.ResourceEventHandlerRegistration + nsHandler cache.ResourceEventHandlerRegistration } // RemoteServiceCreated is generated whenever a remote service is created Observing @@ -184,6 +193,7 @@ func NewRemoteClusterServiceWatcher( remoteAPIClient: remoteAPI, localAPIClient: localAPI, stopper: stopper, + eventBroadcaster: eventBroadcaster, recorder: recorder, log: logging.WithFields(logging.Fields{ "cluster": clusterName, @@ -781,7 +791,8 @@ func (rcsw *RemoteClusterServiceWatcher) processEvents(ctx context.Context) { func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { rcsw.remoteAPIClient.Sync(rcsw.stopper) rcsw.eventsQueue.Add(&OrphanedServicesGcTriggered{}) - _, err := rcsw.remoteAPIClient.Svc().Informer().AddEventHandler( + var err error + rcsw.svcHandler, err = rcsw.remoteAPIClient.Svc().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(svc interface{}) { rcsw.eventsQueue.Add(&OnAddCalled{svc.(*corev1.Service)}) @@ -811,7 +822,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { return err } - _, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( + rcsw.epHandler, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ // AddFunc only relevant for exported headless endpoints AddFunc: func(obj interface{}) { @@ -851,7 +862,7 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { return err } - _, err = rcsw.localAPIClient.NS().Informer().AddEventHandler( + rcsw.nsHandler, err = rcsw.localAPIClient.NS().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { rcsw.eventsQueue.Add(&OnLocalNamespaceAdded{obj.(*corev1.Namespace)}) @@ -897,6 +908,23 @@ func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) { rcsw.eventsQueue.Add(&ClusterUnregistered{}) } rcsw.eventsQueue.ShutDown() + rcsw.eventBroadcaster.Shutdown() + + if rcsw.svcHandler != nil { + if err := rcsw.remoteAPIClient.Svc().Informer().RemoveEventHandler(rcsw.svcHandler); err != nil { + rcsw.log.Warnf("error removing service informer handler: %s", err) + } + } + if rcsw.epHandler != nil { + if err := rcsw.remoteAPIClient.Endpoint().Informer().RemoveEventHandler(rcsw.epHandler); err != nil { + rcsw.log.Warnf("error removing service informer handler: %s", err) + } + } + if rcsw.nsHandler != nil { + if err := rcsw.localAPIClient.NS().Informer().RemoveEventHandler(rcsw.nsHandler); err != nil { + rcsw.log.Warnf("error removing service informer handler: %s", err) + } + } } func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() ([]corev1.EndpointAddress, error) { diff --git a/multicluster/service-mirror/probe_worker.go b/multicluster/service-mirror/probe_worker.go index 9992eaf216262..297c60cdeffd0 100644 --- a/multicluster/service-mirror/probe_worker.go +++ b/multicluster/service-mirror/probe_worker.go @@ -66,6 +66,7 @@ func (pw *ProbeWorker) run() { probeTickerPeriod := pw.probeSpec.Period maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period probeTicker := NewTicker(probeTickerPeriod, maxJitter) + defer probeTicker.Stop() probeLoop: for {