From f881755d4897695b28b4759ec4253c423ba67e56 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Wed, 16 Sep 2020 13:19:46 -0400 Subject: [PATCH] Eliminate possible deadlocks in ClusterCacheTracker Switch to a single lock in ClusterCacheTracker to eliminate possible deadlocks when simultaneously deleted & recreating a client/cache for the same cluster. Signed-off-by: Andy Goldstein --- controllers/machinehealthcheck_controller.go | 1 + controllers/remote/cluster_cache.go | 489 ++++++------------ .../remote/cluster_cache_healthcheck_test.go | 44 +- .../remote/cluster_cache_reconciler.go | 77 +++ .../remote/cluster_cache_reconciler_test.go | 267 ++-------- .../remote/cluster_cache_tracker_test.go | 394 ++++---------- controllers/remote/stoppable_cache.go | 45 ++ 7 files changed, 406 insertions(+), 911 deletions(-) create mode 100644 controllers/remote/cluster_cache_reconciler.go create mode 100644 controllers/remote/stoppable_cache.go diff --git a/controllers/machinehealthcheck_controller.go b/controllers/machinehealthcheck_controller.go index df980198829..52a3388e65e 100644 --- a/controllers/machinehealthcheck_controller.go +++ b/controllers/machinehealthcheck_controller.go @@ -404,6 +404,7 @@ func (r *MachineHealthCheckReconciler) watchClusterNodes(ctx context.Context, cl } if err := r.Tracker.Watch(ctx, remote.WatchInput{ + Name: "machinehealthcheck-watchClusterNodes", Cluster: util.ObjectKey(cluster), Watcher: r.controller, Kind: &corev1.Node{}, diff --git a/controllers/remote/cluster_cache.go b/controllers/remote/cluster_cache.go index 0e8cfefc191..9bc7d4fa209 100644 --- a/controllers/remote/cluster_cache.go +++ b/controllers/remote/cluster_cache.go @@ -18,19 +18,15 @@ package remote import ( "context" - "fmt" - "reflect" "sync" "time" "github.com/go-logr/logr" "github.com/pkg/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" - kerrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -39,10 +35,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -54,319 +48,206 @@ const ( healthCheckUnhealthyThreshold = 10 ) -// clusterCache embeds cache.Cache and combines it with a stop channel. -type clusterCache struct { - cache.Cache - - lock sync.Mutex - mapper meta.RESTMapper - stopped bool - stop chan struct{} -} - -// Stop closes the cache.Cache's stop channel if it has not already been stopped. -func (cc *clusterCache) Stop() { - cc.lock.Lock() - defer cc.lock.Unlock() - - if cc.stopped { - return - } - - cc.stopped = true - close(cc.stop) -} - // ClusterCacheTracker manages client caches for workload clusters. type ClusterCacheTracker struct { log logr.Logger client client.Client scheme *runtime.Scheme - delegatingClientsLock sync.RWMutex - delegatingClients map[client.ObjectKey]*client.DelegatingClient - - clusterCachesLock sync.RWMutex - clusterCaches map[client.ObjectKey]*clusterCache - - watchesLock sync.RWMutex - watches map[client.ObjectKey]map[watchInfo]struct{} + lock sync.RWMutex + clusterAccessors map[client.ObjectKey]*clusterAccessor } // NewClusterCacheTracker creates a new ClusterCacheTracker. func NewClusterCacheTracker(log logr.Logger, manager ctrl.Manager) (*ClusterCacheTracker, error) { - m := &ClusterCacheTracker{ - log: log, - client: manager.GetClient(), - scheme: manager.GetScheme(), - delegatingClients: make(map[client.ObjectKey]*client.DelegatingClient), - clusterCaches: make(map[client.ObjectKey]*clusterCache), - watches: make(map[client.ObjectKey]map[watchInfo]struct{}), - } - - return m, nil + return &ClusterCacheTracker{ + log: log, + client: manager.GetClient(), + scheme: manager.GetScheme(), + clusterAccessors: make(map[client.ObjectKey]*clusterAccessor), + }, nil } -// Watcher is a scoped-down interface from Controller that only knows how to watch. -type Watcher interface { - // Watch watches src for changes, sending events to eventHandler if they pass predicates. - Watch(src source.Source, eventHandler handler.EventHandler, predicates ...predicate.Predicate) error -} +// GetClient returns a client for the given cluster. +func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { + t.lock.Lock() + defer t.lock.Unlock() -// watchInfo is used as a map key to uniquely identify a watch. Because predicates is a slice, it cannot be included. -type watchInfo struct { - watcher Watcher - gvk schema.GroupVersionKind - - // Comparing the eventHandler as an interface doesn't work because reflect.DeepEqual - // will assert functions are false if they are non-nil. - // Use a signature string representation instead as this can be compared. - // The signature function is expected to produce a unique output for each unique handler - // function that is passed to it. - // In combination with the watcher, this should be enough to identify unique watches. - eventHandlerSignature string -} + accessor, err := t.getClusterAccessorLH(ctx, cluster) + if err != nil { + return nil, err + } -// eventHandlerSignature generates a unique identifier for the given eventHandler by -// printing it to a string using "%#v". -// Eg "&handler.EnqueueRequestsFromMapFunc{ToRequests:(handler.ToRequestsFunc)(0x271afb0)}" -func eventHandlerSignature(h handler.EventHandler) string { - return fmt.Sprintf("%#v", h) + return accessor.client, nil } -// watchExists returns true if watch has already been established. This does NOT hold any lock. -func (m *ClusterCacheTracker) watchExists(cluster client.ObjectKey, watch watchInfo) bool { - watchesForCluster, clusterFound := m.watches[cluster] - if !clusterFound { - return false - } - - for w := range watchesForCluster { - if reflect.DeepEqual(w, watch) { - return true - } - } - return false +// clusterAccessor represents the combination of a client, cache, and watches for a remote cluster. +type clusterAccessor struct { + cache *stoppableCache + client *client.DelegatingClient + watches sets.String } -// deleteWatchesForCluster removes the watches for cluster from the tracker. -func (m *ClusterCacheTracker) deleteWatchesForCluster(cluster client.ObjectKey) { - m.watchesLock.Lock() - defer m.watchesLock.Unlock() +// clusterAccessorExists returns true if a clusterAccessor exists for cluster. +func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bool { + t.lock.RLock() + defer t.lock.RUnlock() - delete(m.watches, cluster) + _, exists := t.clusterAccessors[cluster] + return exists } -// WatchInput specifies the parameters used to establish a new watch for a remote cluster. -type WatchInput struct { - // Cluster is the key for the remote cluster. - Cluster client.ObjectKey - - // Watcher is the watcher (controller) whose Reconcile() function will be called for events. - Watcher Watcher +// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a +// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held). +func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey) (*clusterAccessor, error) { + a := t.clusterAccessors[cluster] + if a != nil { + return a, nil + } - // Kind is the type of resource to watch. - Kind runtime.Object + a, err := t.newClusterAccessor(ctx, cluster) + if err != nil { + return nil, errors.Wrap(err, "error creating client and cache for remote cluster") + } - // EventHandler contains the event handlers to invoke for resource events. - EventHandler handler.EventHandler + t.clusterAccessors[cluster] = a - // Predicates is used to filter resource events. - Predicates []predicate.Predicate + return a, nil } -// Watch watches a remote cluster for resource events. If the watch already exists based on cluster, watcher, -// kind, and eventHandler, then this is a no-op. -func (m *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error { - gvk, err := apiutil.GVKForObject(input.Kind, m.scheme) +// newClusterAccessor creates a new clusterAccessor. +func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster client.ObjectKey) (*clusterAccessor, error) { + // Get a rest config for the remote cluster + config, err := RESTConfig(ctx, t.client, cluster) if err != nil { - return err + return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String()) } + config.Timeout = defaultClientTimeout - wi := watchInfo{ - watcher: input.Watcher, - gvk: gvk, - eventHandlerSignature: eventHandlerSignature(input.EventHandler), - } - - // First, check if the watch already exists - var exists bool - m.watchesLock.RLock() - exists = m.watchExists(input.Cluster, wi) - m.watchesLock.RUnlock() - - if exists { - m.log.V(4).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "kind", fmt.Sprintf("%T", input.Kind)) - return nil + // Create a mapper for it + mapper, err := apiutil.NewDynamicRESTMapper(config) + if err != nil { + return nil, errors.Wrapf(err, "error creating dynamic rest mapper for remote cluster %q", cluster.String()) } - // Doesn't exist - grab the write lock - m.watchesLock.Lock() - defer m.watchesLock.Unlock() - - // Need to check if another goroutine created the watch while this one was waiting for the lock - if m.watchExists(input.Cluster, wi) { - m.log.V(4).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "kind", fmt.Sprintf("%T", input.Kind)) - return nil + // Create the client for the remote cluster + c, err := client.New(config, client.Options{Scheme: t.scheme, Mapper: mapper}) + if err != nil { + return nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) } - // Need to create the watch - watchesForCluster, found := m.watches[input.Cluster] - if !found { - watchesForCluster = make(map[watchInfo]struct{}) - m.watches[input.Cluster] = watchesForCluster + // Create the cache for the remote cluster + cacheOptions := cache.Options{ + Scheme: t.scheme, + Mapper: mapper, } - - cache, err := m.getOrCreateClusterCache(ctx, input.Cluster) + remoteCache, err := cache.New(config, cacheOptions) if err != nil { - return err + return nil, errors.Wrapf(err, "error creating cache for remote cluster %q", cluster.String()) } - if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, cache), input.EventHandler, input.Predicates...); err != nil { - return errors.Wrap(err, "error creating watch") + // We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache. + cache := &stoppableCache{ + Cache: remoteCache, + stop: make(chan struct{}), } - watchesForCluster[wi] = struct{}{} - - return nil -} + // Start the cache!!! + go cache.Start(cache.stop) -// GetClient returns a client for the given cluster. -func (m *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { - return m.getOrCreateDelegatingClient(ctx, cluster) -} + // Start cluster healthcheck!!! + go t.healthCheckCluster(&healthCheckInput{ + stop: cache.stop, + cluster: cluster, + cfg: config, + }) -// getOrCreateClusterClient returns a delegating client for the specified cluster, creating a new one if needed. -func (m *ClusterCacheTracker) getOrCreateDelegatingClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { - c := m.getDelegatingClient(cluster) - if c != nil { - return c, nil + delegatingClient := &client.DelegatingClient{ + Reader: cache, + Writer: c, + StatusClient: c, } - return m.newDelegatingClient(ctx, cluster) + return &clusterAccessor{ + cache: cache, + client: delegatingClient, + watches: sets.NewString(), + }, nil } -// getClusterCache returns the clusterCache for cluster, or nil if it does not exist. -func (m *ClusterCacheTracker) getDelegatingClient(cluster client.ObjectKey) *client.DelegatingClient { - m.delegatingClientsLock.RLock() - defer m.delegatingClientsLock.RUnlock() +// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker. +func (t *ClusterCacheTracker) deleteAccessor(cluster client.ObjectKey) { + t.lock.Lock() + defer t.lock.Unlock() - return m.delegatingClients[cluster] -} + a, exists := t.clusterAccessors[cluster] + if !exists { + return + } -// newDelegatingClient creates a new delegating client. -func (m *ClusterCacheTracker) newDelegatingClient(ctx context.Context, cluster client.ObjectKey) (*client.DelegatingClient, error) { - m.delegatingClientsLock.Lock() - defer m.delegatingClientsLock.Unlock() + t.log.V(2).Info("Deleting clusterAccessor", "cluster", cluster.String()) - // If another goroutine created the client while this one was waiting to acquire the write lock, return that - // instead of overwriting it. - if delegatingClient, exists := m.delegatingClients[cluster]; exists { - return delegatingClient, nil - } + t.log.V(4).Info("Stopping cache", "cluster", cluster.String()) + a.cache.Stop() + t.log.V(4).Info("Cache stopped", "cluster", cluster.String()) - cache, err := m.getOrCreateClusterCache(ctx, cluster) - if err != nil { - return nil, err - } - config, err := RESTConfig(ctx, m.client, cluster) - if err != nil { - return nil, errors.Wrap(err, "error fetching REST client config for remote cluster") - } - config.Timeout = defaultClientTimeout - c, err := client.New(config, client.Options{Scheme: m.scheme, Mapper: cache.mapper}) - if err != nil { - return nil, err - } - delegatingClient := &client.DelegatingClient{ - Reader: cache, - Writer: c, - StatusClient: c, - } - m.delegatingClients[cluster] = delegatingClient - return delegatingClient, nil + delete(t.clusterAccessors, cluster) } -func (m *ClusterCacheTracker) deleteDelegatingClient(cluster client.ObjectKey) { - m.delegatingClientsLock.Lock() - defer m.delegatingClientsLock.Unlock() - - delete(m.delegatingClients, cluster) +// Watcher is a scoped-down interface from Controller that only knows how to watch. +type Watcher interface { + // Watch watches src for changes, sending events to eventHandler if they pass predicates. + Watch(src source.Source, eventHandler handler.EventHandler, predicates ...predicate.Predicate) error } -// getOrCreateClusterCache returns the clusterCache for cluster, creating a new ClusterCache if needed. -func (m *ClusterCacheTracker) getOrCreateClusterCache(ctx context.Context, cluster client.ObjectKey) (*clusterCache, error) { - cache := m.getClusterCache(cluster) - if cache != nil { - return cache, nil - } +// WatchInput specifies the parameters used to establish a new watch for a remote cluster. +type WatchInput struct { + // Name represents a unique watch request for the specified Cluster. + Name string - return m.newClusterCache(ctx, cluster) -} + // Cluster is the key for the remote cluster. + Cluster client.ObjectKey -// getClusterCache returns the clusterCache for cluster, or nil if it does not exist. -func (m *ClusterCacheTracker) getClusterCache(cluster client.ObjectKey) *clusterCache { - m.clusterCachesLock.RLock() - defer m.clusterCachesLock.RUnlock() + // Watcher is the watcher (controller) whose Reconcile() function will be called for events. + Watcher Watcher - return m.clusterCaches[cluster] -} + // Kind is the type of resource to watch. + Kind runtime.Object -// newClusterCache creates and starts a new clusterCache for cluster. -func (m *ClusterCacheTracker) newClusterCache(ctx context.Context, cluster client.ObjectKey) (*clusterCache, error) { - m.clusterCachesLock.Lock() - defer m.clusterCachesLock.Unlock() + // EventHandler contains the event handlers to invoke for resource events. + EventHandler handler.EventHandler - // If another goroutine created the cache while this one was waiting to acquire the write lock, return that - // instead of overwriting it. - if c, exists := m.clusterCaches[cluster]; exists { - return c, nil - } + // Predicates is used to filter resource events. + Predicates []predicate.Predicate +} - config, err := RESTConfig(ctx, m.client, cluster) - if err != nil { - return nil, errors.Wrap(err, "error fetching REST client config for remote cluster") +// Watch watches a remote cluster for resource events. If the watch already exists based on input.Name, this is a no-op. +func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error { + if input.Name == "" { + return errors.New("input.Name is required") } - mapper, err := apiutil.NewDynamicRESTMapper(config) - if err != nil { - return nil, errors.Wrap(err, "error creating dynamic rest mapper for remote cluster") - } + t.lock.Lock() + defer t.lock.Unlock() - cacheOptions := cache.Options{ - Scheme: m.scheme, - Mapper: mapper, - } - remoteCache, err := cache.New(config, cacheOptions) + a, err := t.getClusterAccessorLH(ctx, input.Cluster) if err != nil { - return nil, errors.Wrap(err, "error creating cache for remote cluster") + return err } - stop := make(chan struct{}) - cc := &clusterCache{ - Cache: remoteCache, - stop: stop, - mapper: mapper, + if a.watches.Has(input.Name) { + t.log.V(4).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "name", input.Name) + return nil } - m.clusterCaches[cluster] = cc - // Start the cache!!! - go remoteCache.Start(cc.stop) - // Start cluster healthcheck!!! - go m.healthCheckCluster(&healthCheckInput{ - stop: cc.stop, - cluster: cluster, - cfg: config, - }) - - return cc, nil -} + // Need to create the watch + if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, a.cache), input.EventHandler, input.Predicates...); err != nil { + return errors.Wrap(err, "error creating watch") + } -func (m *ClusterCacheTracker) deleteClusterCache(cluster client.ObjectKey) { - m.clusterCachesLock.Lock() - defer m.clusterCachesLock.Unlock() + a.watches.Insert(input.Name) - delete(m.clusterCaches, cluster) + return nil } // healthCheckInput provides the input for the healthCheckCluster method @@ -380,8 +261,8 @@ type healthCheckInput struct { path string } -// validate sets default values if optional parameters are not set -func (h *healthCheckInput) validate() { +// setDefaults sets default values if optional parameters are not set +func (h *healthCheckInput) setDefaults() { if h.interval == 0 { h.interval = healthCheckPollInterval } @@ -399,16 +280,17 @@ func (h *healthCheckInput) validate() { // healthCheckCluster will poll the cluster's API at the path given and, if there are // `unhealthyThreshold` consecutive failures, will deem the cluster unhealthy. // Once the cluster is deemed unhealthy, the cluster's cache is stopped and removed. -func (m *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) { +func (t *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) { // populate optional params for healthCheckInput - in.validate() + in.setDefaults() unhealthyCount := 0 + // This gets us a client that can make raw http(s) calls to the remote apiserver. We only need to create it once + // and we can reuse it inside the polling loop. codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()} cfg := rest.CopyConfig(in.cfg) cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}) - restClient, restClientErr := rest.UnversionedRESTClientFor(cfg) runHealthCheckWithThreshold := func() (bool, error) { @@ -417,7 +299,7 @@ func (m *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) { } cluster := &clusterv1.Cluster{} - if err := m.client.Get(context.TODO(), in.cluster, cluster); err != nil { + if err := t.client.Get(context.TODO(), in.cluster, cluster); err != nil { if apierrors.IsNotFound(err) { // If the cluster can't be found, we should delete the cache. return false, err @@ -425,22 +307,21 @@ func (m *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) { // Otherwise, requeue. return false, nil } + if !cluster.Status.InfrastructureReady || !cluster.Status.ControlPlaneInitialized { // If the infrastructure or control plane aren't marked as ready, we should requeue and wait. return false, nil } - remoteCache := m.getClusterCache(in.cluster) - if remoteCache == nil { + if !t.clusterAccessorExists(in.cluster) { // Cache for this cluster has already been cleaned up. // Nothing to do, so return true. return true, nil } - // healthCheckPath returning an error is considered a failed health check - // (Either an issue was encountered connecting or the API returned an error). - // If no error occurs, reset the unhealthy coutner. - err := healthCheckPath(restClient, in.requestTimeout, in.path) + // An error here means there was either an issue connecting or the API returned an error. + // If no error occurs, reset the unhealthy counter. + _, err := restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw() if err != nil { unhealthyCount++ } else { @@ -448,9 +329,7 @@ func (m *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) { } if unhealthyCount >= in.unhealthyThreshold { - // `healthCheckUnhealthyThreshold` (or more) consecutive failures. // Cluster is now considered unhealthy. - // return last error from `doHealthCheck` return false, err } @@ -461,83 +340,7 @@ func (m *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) { // An error returned implies the health check has failed a sufficient number of // times for the cluster to be considered unhealthy if err != nil { - c := m.getClusterCache(in.cluster) - if c == nil { - return - } - - m.log.Error(err, "Error health checking cluster", "cluster", in.cluster.String()) - - // Stop the cache and clean up - c.Stop() - m.deleteClusterCache(in.cluster) - m.deleteDelegatingClient(in.cluster) - m.deleteWatchesForCluster(in.cluster) + t.log.Error(err, "Error health checking cluster", "cluster", in.cluster.String()) + t.deleteAccessor(in.cluster) } } - -// healthCheckPath attempts to request a given absolute path from the API server -// defined in the rest.Config and returns any errors that occurred during the request. -func healthCheckPath(restClient *rest.RESTClient, requestTimeout time.Duration, path string) error { - _, err := restClient.Get().AbsPath(path).Timeout(requestTimeout).DoRaw() - return err -} - -// ClusterCacheReconciler is responsible for stopping remote cluster caches when -// the cluster for the remote cache is being deleted. -type ClusterCacheReconciler struct { - Log logr.Logger - Client client.Client - Tracker *ClusterCacheTracker -} - -func (r *ClusterCacheReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { - _, err := ctrl.NewControllerManagedBy(mgr). - For(&clusterv1.Cluster{}). - WithOptions(options). - Build(r) - - if err != nil { - return errors.Wrap(err, "failed setting up with a controller manager") - } - return nil -} - -// Reconcile reconciles Clusters and removes ClusterCaches for any Cluster that cannot be retrieved from the -// management cluster. -func (r *ClusterCacheReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { - ctx := context.Background() - - log := r.Log.WithValues("namespace", req.Namespace, "name", req.Name) - log.V(4).Info("Reconciling") - - var cluster clusterv1.Cluster - - err := r.Client.Get(ctx, req.NamespacedName, &cluster) - if err == nil { - log.V(4).Info("Cluster still exists") - return reconcile.Result{}, nil - } else if !kerrors.IsNotFound(err) { - log.Error(err, "Error retrieving cluster") - return reconcile.Result{}, err - } - - log.V(4).Info("Cluster no longer exists") - - c := r.Tracker.getClusterCache(req.NamespacedName) - if c == nil { - log.V(4).Info("No current cluster cache exists - nothing to do") - return reconcile.Result{}, nil - } - - log.V(4).Info("Stopping cluster cache") - c.Stop() - - r.Tracker.deleteClusterCache(req.NamespacedName) - r.Tracker.deleteDelegatingClient(req.NamespacedName) - - log.V(4).Info("Deleting watches for cluster cache") - r.Tracker.deleteWatchesForCluster(req.NamespacedName) - - return reconcile.Result{}, nil -} diff --git a/controllers/remote/cluster_cache_healthcheck_test.go b/controllers/remote/cluster_cache_healthcheck_test.go index 7f2847d0d12..8e8e7e1bc0e 100644 --- a/controllers/remote/cluster_cache_healthcheck_test.go +++ b/controllers/remote/cluster_cache_healthcheck_test.go @@ -30,7 +30,6 @@ import ( "k8s.io/klog/klogr" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/kubeconfig" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -44,7 +43,7 @@ var _ = Describe("ClusterCache HealthCheck suite", func() { var testNamespace *corev1.Namespace var testClusterKey client.ObjectKey var cct *ClusterCacheTracker - var cc *clusterCache + var cc *stoppableCache var testPollInterval = 100 * time.Millisecond var testPollTimeout = 50 * time.Millisecond @@ -88,14 +87,14 @@ var _ = Describe("ClusterCache HealthCheck suite", func() { Expect(k8sClient.Status().Update(ctx, testCluster)).To(Succeed()) By("Creating a test cluster kubeconfig") - Expect(kubeconfig.CreateEnvTestSecret(k8sClient, testEnv.Config, testCluster)).To(Succeed()) + Expect(testEnv.CreateKubeconfigSecret(testCluster)).To(Succeed()) testClusterKey = util.ObjectKey(testCluster) - cc = &clusterCache{ + cc = &stoppableCache{ stop: make(chan struct{}), } - cct.clusterCaches[testClusterKey] = cc + cct.clusterAccessors[testClusterKey] = &clusterAccessor{cache: cc} }) AfterEach(func() { @@ -114,35 +113,17 @@ var _ = Describe("ClusterCache HealthCheck suite", func() { go cct.healthCheckCluster(&healthCheckInput{stop, testClusterKey, testEnv.Config, testPollInterval, testPollTimeout, testUnhealthyThreshold, "/"}) // Make sure this passes for at least two seconds, to give the health check goroutine time to run. - Consistently(func() *clusterCache { - cct.clusterCachesLock.RLock() - defer cct.clusterCachesLock.RUnlock() - return cct.clusterCaches[testClusterKey] - }, 2*time.Second, 100*time.Millisecond).ShouldNot(BeNil()) - Expect(func() bool { - cc.lock.Lock() - defer cc.lock.Unlock() - return cc.stopped - }()).To(BeFalse()) + Consistently(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeTrue()) }) It("with an invalid path", func() { stop := make(chan struct{}) defer close(stop) - go cct.healthCheckCluster(&healthCheckInput{stop, testClusterKey, testEnv.Config, testPollInterval, testPollTimeout, testUnhealthyThreshold, "/foo"}) + go cct.healthCheckCluster(&healthCheckInput{stop, testClusterKey, testEnv.Config, testPollInterval, testPollTimeout, testUnhealthyThreshold, "/clusterAccessor"}) // This should succeed after N consecutive failed requests. - Eventually(func() *clusterCache { - cct.clusterCachesLock.RLock() - defer cct.clusterCachesLock.RUnlock() - return cct.clusterCaches[testClusterKey] - }).Should(BeNil()) - Expect(func() bool { - cc.lock.Lock() - defer cc.lock.Unlock() - return cc.stopped - }()).To(BeTrue()) + Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeFalse()) }) It("with an invalid config", func() { @@ -162,16 +143,7 @@ var _ = Describe("ClusterCache HealthCheck suite", func() { go cct.healthCheckCluster(&healthCheckInput{stop, testClusterKey, config, testPollInterval, testPollTimeout, testUnhealthyThreshold, "/"}) // This should succeed after N consecutive failed requests. - Eventually(func() *clusterCache { - cct.clusterCachesLock.RLock() - defer cct.clusterCachesLock.RUnlock() - return cct.clusterCaches[testClusterKey] - }).Should(BeNil()) - Expect(func() bool { - cc.lock.Lock() - defer cc.lock.Unlock() - return cc.stopped - }()).To(BeTrue()) + Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeFalse()) }) }) }) diff --git a/controllers/remote/cluster_cache_reconciler.go b/controllers/remote/cluster_cache_reconciler.go new file mode 100644 index 00000000000..94547a7f1b7 --- /dev/null +++ b/controllers/remote/cluster_cache_reconciler.go @@ -0,0 +1,77 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "context" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" + clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// ClusterCacheReconciler is responsible for stopping remote cluster caches when +// the cluster for the remote cache is being deleted. +type ClusterCacheReconciler struct { + Log logr.Logger + Client client.Client + Tracker *ClusterCacheTracker +} + +func (r *ClusterCacheReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { + _, err := ctrl.NewControllerManagedBy(mgr). + For(&clusterv1.Cluster{}). + WithOptions(options). + Build(r) + + if err != nil { + return errors.Wrap(err, "failed setting up with a controller manager") + } + return nil +} + +// Reconcile reconciles Clusters and removes ClusterCaches for any Cluster that cannot be retrieved from the +// management cluster. +func (r *ClusterCacheReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + ctx := context.Background() + + log := r.Log.WithValues("namespace", req.Namespace, "name", req.Name) + log.V(4).Info("Reconciling") + + var cluster clusterv1.Cluster + + err := r.Client.Get(ctx, req.NamespacedName, &cluster) + if err == nil { + log.V(4).Info("Cluster still exists") + return reconcile.Result{}, nil + } else if !kerrors.IsNotFound(err) { + log.Error(err, "Error retrieving cluster") + return reconcile.Result{}, err + } + + log.V(2).Info("Cluster no longer exists") + + r.Tracker.deleteAccessor(req.NamespacedName) + + return reconcile.Result{}, nil + +} diff --git a/controllers/remote/cluster_cache_reconciler_test.go b/controllers/remote/cluster_cache_reconciler_test.go index cd51d79ef54..1700edc56f9 100644 --- a/controllers/remote/cluster_cache_reconciler_test.go +++ b/controllers/remote/cluster_cache_reconciler_test.go @@ -20,37 +20,30 @@ import ( "fmt" . "github.com/onsi/ginkgo" - . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" - gtypes "github.com/onsi/gomega/types" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/kubeconfig" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" ) var _ = Describe("ClusterCache Reconciler suite", func() { Context("When running the ClusterCacheReconciler", func() { - var mgr manager.Manager - var doneMgr chan struct{} - var cct *ClusterCacheTracker - var k8sClient client.Client - - var testNamespace *corev1.Namespace - var clusterRequest1, clusterRequest2, clusterRequest3 reconcile.Request - var clusterCache1, clusterCache2, clusterCache3 *clusterCache + var ( + mgr manager.Manager + doneMgr chan struct{} + cct *ClusterCacheTracker + k8sClient client.Client + testNamespace *corev1.Namespace + ) - // createAndWatchCluster creates a new cluster and ensures the clusterCacheTracker is watching the cluster - createAndWatchCluster := func(clusterName string) (reconcile.Request, *clusterCache) { + // createAndWatchCluster creates a new cluster and ensures the clusterCacheTracker has a clusterAccessor for it + createAndWatchCluster := func(clusterName string) { By(fmt.Sprintf("Creating a cluster %q", clusterName)) testCluster := &clusterv1.Cluster{ ObjectMeta: metav1.ObjectMeta{ @@ -60,45 +53,24 @@ var _ = Describe("ClusterCache Reconciler suite", func() { } Expect(k8sClient.Create(ctx, testCluster)).To(Succeed()) - // Check the cluster can be fetch from the API server + // Check the cluster can be fetched from the API server testClusterKey := util.ObjectKey(testCluster) Eventually(func() error { return k8sClient.Get(ctx, testClusterKey, &clusterv1.Cluster{}) }, timeout).Should(Succeed()) By("Creating a test cluster kubeconfig") - Expect(kubeconfig.CreateEnvTestSecret(k8sClient, testEnv.Config, testCluster)).To(Succeed()) - // Check the secret can be fetch from the API server + Expect(testEnv.CreateKubeconfigSecret(testCluster)).To(Succeed()) + + // Check the secret can be fetched from the API server secretKey := client.ObjectKey{Namespace: testNamespace.GetName(), Name: fmt.Sprintf("%s-kubeconfig", testCluster.GetName())} Eventually(func() error { return k8sClient.Get(ctx, secretKey, &corev1.Secret{}) }, timeout).Should(Succeed()) - watcher, _ := newTestWatcher() - kind := &corev1.Node{} - eventHandler := &handler.Funcs{} - - By("Calling watch on the test cluster") - // TODO: Make this test not rely on Watch doing its job? - Expect(cct.Watch(ctx, WatchInput{ - Cluster: testClusterKey, - Watcher: watcher, - Kind: kind, - EventHandler: eventHandler, - Predicates: []predicate.Predicate{ - &predicate.ResourceVersionChangedPredicate{}, - &predicate.GenerationChangedPredicate{}, - }, - })).To(Succeed()) - - // Check that a cache was created for the cluster - cc := func() *clusterCache { - cct.clusterCachesLock.RLock() - defer cct.clusterCachesLock.RUnlock() - return cct.clusterCaches[testClusterKey] - }() - Expect(cc).ToNot(BeNil()) - return reconcile.Request{NamespacedName: testClusterKey}, cc + By("Creating a clusterAccessor for the cluster") + _, err := cct.GetClient(ctx, testClusterKey) + Expect(err).To(BeNil()) } BeforeEach(func() { @@ -110,42 +82,34 @@ var _ = Describe("ClusterCache Reconciler suite", func() { }) Expect(err).NotTo(HaveOccurred()) - doneMgr = make(chan struct{}) + By("Setting up a ClusterCacheTracker") + cct, err = NewClusterCacheTracker(log.NullLogger{}, mgr) + Expect(err).NotTo(HaveOccurred()) + + By("Creating the ClusterCacheReconciler") + r := &ClusterCacheReconciler{ + Log: log.NullLogger{}, + Client: mgr.GetClient(), + Tracker: cct, + } + Expect(r.SetupWithManager(mgr, controller.Options{})).To(Succeed()) + By("Starting the manager") + doneMgr = make(chan struct{}) go func() { Expect(mgr.Start(doneMgr)).To(Succeed()) }() k8sClient = mgr.GetClient() - By("Setting up a ClusterCacheTracker") - cct, err = NewClusterCacheTracker(log.NullLogger{}, mgr) - Expect(err).NotTo(HaveOccurred()) - By("Creating a namespace for the test") testNamespace = &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{GenerateName: "cluster-cache-test-"}} Expect(k8sClient.Create(ctx, testNamespace)).To(Succeed()) - By("Starting the ClusterCacheReconciler") - r := &ClusterCacheReconciler{ - Log: &log.NullLogger{}, - Client: mgr.GetClient(), - Tracker: cct, - } - Expect(r.SetupWithManager(mgr, controller.Options{})).To(Succeed()) - By("Creating clusters to test with") - clusterRequest1, clusterCache1 = createAndWatchCluster("cluster-1") - clusterRequest2, clusterCache2 = createAndWatchCluster("cluster-2") - clusterRequest3, clusterCache3 = createAndWatchCluster("cluster-3") - - // Manually call Reconcile to ensure the Reconcile is completed before assertions - _, err = r.Reconcile(clusterRequest1) - Expect(err).ToNot(HaveOccurred()) - _, err = r.Reconcile(clusterRequest2) - Expect(err).ToNot(HaveOccurred()) - _, err = r.Reconcile(clusterRequest3) - Expect(err).ToNot(HaveOccurred()) + createAndWatchCluster("cluster-1") + createAndWatchCluster("cluster-2") + createAndWatchCluster("cluster-3") }) AfterEach(func() { @@ -157,171 +121,20 @@ var _ = Describe("ClusterCache Reconciler suite", func() { close(doneMgr) }) - type clusterKeyAndCache struct { - key client.ObjectKey - cache *clusterCache - } - - type clusterDeletedInput struct { - stoppedClusters func() []clusterKeyAndCache - runningClusters func() []clusterKeyAndCache - } - - DescribeTable("when clusters are deleted", func(in *clusterDeletedInput) { - for _, cluster := range in.stoppedClusters() { - By(fmt.Sprintf("Deleting cluster %q", cluster.key.Name)) + It("should remove clusterAccessors when clusters are deleted", func() { + for _, clusterName := range []string{"cluster-1", "cluster-2", "cluster-3"} { + By(fmt.Sprintf("Deleting cluster %q", clusterName)) obj := &clusterv1.Cluster{ ObjectMeta: metav1.ObjectMeta{ - Name: cluster.key.Name, - Namespace: cluster.key.Namespace, + Namespace: testNamespace.Name, + Name: clusterName, }, } Expect(k8sClient.Delete(ctx, obj)).To(Succeed()) - } - - // Check the stopped clustser's caches eventually stop - for _, cluster := range in.stoppedClusters() { - By(fmt.Sprintf("Checking cluster %q's cache is stopped", cluster.key.Name)) - cc := cluster.cache - Eventually(func() bool { - cc.lock.Lock() - defer cc.lock.Unlock() - return cc.stopped - }, timeout).Should(BeTrue()) - } - // Check the running cluster's caches are still running - for _, cluster := range in.runningClusters() { - By(fmt.Sprintf("Checking cluster %q's cache is running", cluster.key.Name)) - cc := cluster.cache - Consistently(func() bool { - cc.lock.Lock() - defer cc.lock.Unlock() - return cc.stopped - }).Should(BeFalse()) + By(fmt.Sprintf("Checking cluster %q's clusterAccessor is removed", clusterName)) + Eventually(func() bool { return cct.clusterAccessorExists(util.ObjectKey(obj)) }, timeout).Should(BeFalse()) } - - By("Checking deleted Cluster's Caches are removed", func() { - matchers := []gtypes.GomegaMatcher{} - for _, cluster := range in.stoppedClusters() { - matchers = append(matchers, Not(HaveKey(cluster.key))) - } - for _, cluster := range in.runningClusters() { - matchers = append(matchers, HaveKeyWithValue(cluster.key, cluster.cache)) - } - matchers = append(matchers, HaveLen(len(in.runningClusters()))) - - Eventually(func() map[client.ObjectKey]*clusterCache { - cct.clusterCachesLock.RLock() - defer cct.clusterCachesLock.RUnlock() - return cct.clusterCaches - }, timeout).Should(SatisfyAll(matchers...)) - }) - - By("Checking deleted Cluster's Watches are removed", func() { - matchers := []gtypes.GomegaMatcher{} - for _, cluster := range in.stoppedClusters() { - matchers = append(matchers, Not(HaveKey(cluster.key))) - } - for _, cluster := range in.runningClusters() { - matchers = append(matchers, HaveKeyWithValue(cluster.key, Not(BeEmpty()))) - } - matchers = append(matchers, HaveLen(len(in.runningClusters()))) - - Eventually(func() map[client.ObjectKey]map[watchInfo]struct{} { - cct.watchesLock.RLock() - defer cct.watchesLock.RUnlock() - return cct.watches - }(), timeout).Should(SatisfyAll(matchers...)) - }) - }, - Entry("when no clusters deleted", &clusterDeletedInput{ - stoppedClusters: func() []clusterKeyAndCache { - return []clusterKeyAndCache{} - }, - runningClusters: func() []clusterKeyAndCache { - return []clusterKeyAndCache{ - { - key: clusterRequest1.NamespacedName, - cache: clusterCache1, - }, - { - key: clusterRequest2.NamespacedName, - cache: clusterCache2, - }, - { - key: clusterRequest3.NamespacedName, - cache: clusterCache3, - }, - } - }, - }), - Entry("when test-cluster-1 is deleted", &clusterDeletedInput{ - stoppedClusters: func() []clusterKeyAndCache { - return []clusterKeyAndCache{ - { - key: clusterRequest1.NamespacedName, - cache: clusterCache1, - }, - } - }, - runningClusters: func() []clusterKeyAndCache { - return []clusterKeyAndCache{ - { - key: clusterRequest2.NamespacedName, - cache: clusterCache2, - }, - { - key: clusterRequest3.NamespacedName, - cache: clusterCache3, - }, - } - }, - }), - Entry("when test-cluster-2 is deleted", &clusterDeletedInput{ - stoppedClusters: func() []clusterKeyAndCache { - return []clusterKeyAndCache{ - { - key: clusterRequest2.NamespacedName, - cache: clusterCache2, - }, - } - }, - runningClusters: func() []clusterKeyAndCache { - return []clusterKeyAndCache{ - { - key: clusterRequest1.NamespacedName, - cache: clusterCache1, - }, - { - key: clusterRequest3.NamespacedName, - cache: clusterCache3, - }, - } - }, - }), - Entry("when test-cluster-1 and test-cluster-3 are deleted", &clusterDeletedInput{ - stoppedClusters: func() []clusterKeyAndCache { - return []clusterKeyAndCache{ - { - key: clusterRequest1.NamespacedName, - cache: clusterCache1, - }, - { - key: clusterRequest3.NamespacedName, - cache: clusterCache3, - }, - } - }, - runningClusters: func() []clusterKeyAndCache { - return []clusterKeyAndCache{ - { - key: clusterRequest2.NamespacedName, - cache: clusterCache2, - }, - } - }, - }), - ) + }) }) }) diff --git a/controllers/remote/cluster_cache_tracker_test.go b/controllers/remote/cluster_cache_tracker_test.go index 7a62c7f0adc..a8f79c97919 100644 --- a/controllers/remote/cluster_cache_tracker_test.go +++ b/controllers/remote/cluster_cache_tracker_test.go @@ -18,112 +18,46 @@ package remote import ( "context" - "fmt" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/util/workqueue" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/kubeconfig" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" ) -var _ = Describe("ClusterCache Tracker suite", func() { - Context("When Watch is called for a cluster", func() { - type assertWatchInput struct { - tracker *ClusterCacheTracker - clusterKey client.ObjectKey - watcher Watcher - kind runtime.Object - gvkForKind schema.GroupVersionKind - eventHandler handler.EventHandler - predicates []predicate.Predicate - watcherInfo chan testWatcherInfo - watchCount int - } - - var assertWatch = func(i *assertWatchInput) { - It("should create a running cache for the cluster", func() { - cache, ok := func() (*clusterCache, bool) { - i.tracker.clusterCachesLock.RLock() - defer i.tracker.clusterCachesLock.RUnlock() - cc, ok := i.tracker.clusterCaches[i.clusterKey] - return cc, ok - }() - Expect(ok).To(BeTrue()) - stop := make(chan struct{}) - Expect(func() bool { - cache.lock.Lock() - defer cache.lock.Unlock() - return cache.stopped - }()).To(BeFalse()) - Expect(func() chan struct{} { - cache.lock.Lock() - defer cache.lock.Unlock() - return cache.stop - }()).ToNot(BeNil()) - // WaitForCacheSync returns false if Start was not called - Expect(cache.Cache.WaitForCacheSync(stop)).To(BeTrue()) - }) - - It("should add a watchInfo for the watch", func() { - expectedInfo := watchInfo{ - watcher: i.watcher, - gvk: i.gvkForKind, - eventHandlerSignature: eventHandlerSignature(i.eventHandler), - } - Expect(func() map[watchInfo]struct{} { - i.tracker.watchesLock.RLock() - defer i.tracker.watchesLock.RUnlock() - return i.tracker.watches[i.clusterKey] - }()).To(HaveKey(Equal(expectedInfo))) - }) - - It("should call the Watcher with the correct values", func() { - infos := []testWatcherInfo{} - - watcherInfoLen := len(i.watcherInfo) - for j := 0; j < watcherInfoLen; j++ { - infos = append(infos, <-i.watcherInfo) - } - - Expect(infos).To(ContainElement( - Equal(testWatcherInfo{ - handler: i.eventHandler, - predicates: i.predicates, - }), - )) - }) - - It("should call the Watcher the expected number of times", func() { - Expect(i.watcherInfo).Should(HaveLen(i.watchCount)) - }) - } - - var mgr manager.Manager - var doneMgr chan struct{} - var cct *ClusterCacheTracker - var k8sClient client.Client - - var testNamespace *corev1.Namespace - var input assertWatchInput +func mapper(i handler.MapObject) []reconcile.Request { + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: i.Meta.GetNamespace(), + Name: "mapped-" + i.Meta.GetName(), + }, + }, + } +} - mapper := func(_ handler.MapObject) []reconcile.Request { - return []reconcile.Request{} - } +var _ = Describe("ClusterCache Tracker suite", func() { + Describe("watching", func() { + var ( + mgr manager.Manager + doneMgr chan struct{} + cct *ClusterCacheTracker + k8sClient client.Client + testNamespace *corev1.Namespace + c *testController + w Watcher + clusterA *clusterv1.Cluster + ) BeforeEach(func() { By("Setting up a new manager") @@ -134,6 +68,12 @@ var _ = Describe("ClusterCache Tracker suite", func() { }) Expect(err).NotTo(HaveOccurred()) + c = &testController{ + ch: make(chan string), + } + w, err = ctrl.NewControllerManagedBy(mgr).For(&clusterv1.MachineDeployment{}).Build(c) + Expect(err).To(BeNil()) + doneMgr = make(chan struct{}) By("Starting the manager") go func() { @@ -151,49 +91,20 @@ var _ = Describe("ClusterCache Tracker suite", func() { Expect(k8sClient.Create(ctx, testNamespace)).To(Succeed()) By("Creating a test cluster") - testCluster := &clusterv1.Cluster{ + clusterA = &clusterv1.Cluster{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-cluster", - Namespace: testNamespace.GetName(), + Namespace: testNamespace.GetName(), + Name: "test-cluster", + Annotations: make(map[string]string), }, } - Expect(k8sClient.Create(ctx, testCluster)).To(Succeed()) - testCluster.Status.ControlPlaneInitialized = true - testCluster.Status.InfrastructureReady = true - Expect(k8sClient.Status().Update(ctx, testCluster)).To(Succeed()) + Expect(k8sClient.Create(ctx, clusterA)).To(Succeed()) + clusterA.Status.ControlPlaneInitialized = true + clusterA.Status.InfrastructureReady = true + Expect(k8sClient.Status().Update(ctx, clusterA)).To(Succeed()) By("Creating a test cluster kubeconfig") - Expect(kubeconfig.CreateEnvTestSecret(k8sClient, testEnv.Config, testCluster)).To(Succeed()) - - testClusterKey := util.ObjectKey(testCluster) - watcher, watcherInfo := newTestWatcher() - kind := &corev1.Node{} - gvkForNode := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Node"} - eventHandler := &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(mapper)} - - input = assertWatchInput{ - cct, - testClusterKey, - watcher, - kind, - gvkForNode, - eventHandler, - []predicate.Predicate{ - &predicate.ResourceVersionChangedPredicate{}, - &predicate.GenerationChangedPredicate{}, - }, - watcherInfo, - 1, - } - - By("Calling watch on the test cluster") - Expect(cct.Watch(ctx, WatchInput{ - Cluster: input.clusterKey, - Watcher: input.watcher, - Kind: input.kind, - EventHandler: input.eventHandler, - Predicates: input.predicates, - })).To(Succeed()) + Expect(testEnv.CreateKubeconfigSecret(clusterA)).To(Succeed()) }) AfterEach(func() { @@ -205,199 +116,72 @@ var _ = Describe("ClusterCache Tracker suite", func() { close(doneMgr) }) - Context("when watch is called for a cluster", func() { - assertWatch(&input) - }) + It("with the same name should succeed and not have duplicates", func() { + By("Creating the watch") + Expect(cct.Watch(ctx, WatchInput{ + Name: "watch1", + Cluster: util.ObjectKey(clusterA), + Watcher: w, + Kind: &clusterv1.Cluster{}, + EventHandler: &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(mapper)}, + })).To(Succeed()) - Context("when Watch is called for a second time with the same input", func() { - BeforeEach(func() { - By("Calling watch on the test cluster") + By("Waiting to receive the watch notification") + Expect(<-c.ch).To(Equal("mapped-" + clusterA.Name)) - kind := &corev1.Node{} - eventHandler := &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(mapper)} + By("Ensuring no additional watch notifications arrive") + Consistently(func() int { + return len(c.ch) + }).Should(Equal(0)) - // Check the second copies match - Expect(kind).To(Equal(input.kind)) - Expect(fmt.Sprintf("%#v", eventHandler)).To(Equal(fmt.Sprintf("%#v", input.eventHandler))) + By("Updating the cluster") + clusterA.Annotations["update1"] = "1" + Expect(k8sClient.Update(ctx, clusterA)).Should(Succeed()) - Expect(cct.Watch(ctx, WatchInput{ - Cluster: input.clusterKey, - Watcher: input.watcher, - Kind: kind, - EventHandler: eventHandler, - Predicates: input.predicates, - })).To(Succeed()) - }) + By("Waiting to receive the watch notification") + Expect(<-c.ch).To(Equal("mapped-" + clusterA.Name)) - assertWatch(&input) - }) - - Context("when watch is called with a different Kind", func() { - BeforeEach(func() { - configMapKind := &corev1.ConfigMap{} - configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} - input.kind = configMapKind - input.gvkForKind = configMapGVK - input.watchCount = 2 - - By("Calling watch on the test cluster") - Expect(cct.Watch(ctx, WatchInput{ - Cluster: input.clusterKey, - Watcher: input.watcher, - Kind: input.kind, - EventHandler: input.eventHandler, - Predicates: input.predicates, - })).To(Succeed()) - }) + By("Ensuring no additional watch notifications arrive") + Consistently(func() int { + return len(c.ch) + }).Should(Equal(0)) - assertWatch(&input) - }) - - Context("when watch is called with a different EventHandler", func() { - BeforeEach(func() { - input.eventHandler = &handler.Funcs{ - CreateFunc: func(_ event.CreateEvent, _ workqueue.RateLimitingInterface) {}, - } - input.watchCount = 2 - - By("Calling watch on the test cluster") - Expect(cct.Watch(ctx, WatchInput{ - Cluster: input.clusterKey, - Watcher: input.watcher, - Kind: input.kind, - EventHandler: input.eventHandler, - Predicates: input.predicates, - })).To(Succeed()) - }) - - assertWatch(&input) - }) + By("Creating the same watch a second time") + Expect(cct.Watch(ctx, WatchInput{ + Name: "watch1", + Cluster: util.ObjectKey(clusterA), + Watcher: w, + Kind: &clusterv1.Cluster{}, + EventHandler: &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(mapper)}, + })).To(Succeed()) - Context("when watch is called with different Predicates", func() { - BeforeEach(func() { - input.predicates = []predicate.Predicate{} - input.watchCount = 1 - - By("Calling watch on the test cluster") - Expect(cct.Watch(ctx, WatchInput{ - Cluster: input.clusterKey, - Watcher: input.watcher, - Kind: input.kind, - EventHandler: input.eventHandler, - Predicates: input.predicates, - })).To(Succeed()) - }) + By("Ensuring no additional watch notifications arrive") + Consistently(func() int { + return len(c.ch) + }).Should(Equal(0)) - It("does not call the Watcher a second time", func() { - Expect(input.watcherInfo).Should(HaveLen(1)) - }) - }) + By("Updating the cluster") + clusterA.Annotations["update1"] = "2" + Expect(k8sClient.Update(ctx, clusterA)).Should(Succeed()) - Context("when watch is called with different Cluster", func() { - var differentClusterInput assertWatchInput - - BeforeEach(func() { - // Copy the input so we can check both clusters at once - differentClusterInput = assertWatchInput{ - cct, - input.clusterKey, - input.watcher, - input.kind, - input.gvkForKind, - input.eventHandler, - []predicate.Predicate{ - &predicate.ResourceVersionChangedPredicate{}, - &predicate.GenerationChangedPredicate{}, - }, - input.watcherInfo, - 1, - } - - By("Creating a different cluster") - testCluster := &clusterv1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "different-cluster", - Namespace: testNamespace.GetName(), - }, - } - Expect(k8sClient.Create(ctx, testCluster)).To(Succeed()) - - By("Creating a test cluster kubeconfig") - Expect(kubeconfig.CreateEnvTestSecret(k8sClient, testEnv.Config, testCluster)).To(Succeed()) - // Check the secret can be fetch from the API server - secretKey := client.ObjectKey{Namespace: testNamespace.GetName(), Name: fmt.Sprintf("%s-kubeconfig", testCluster.GetName())} - Eventually(func() error { - return k8sClient.Get(ctx, secretKey, &corev1.Secret{}) - }, timeout).Should(Succeed()) - - differentClusterInput.clusterKey = util.ObjectKey(testCluster) - differentClusterInput.watchCount = 2 - - input.watchCount = 2 - - By("Calling watch on the test cluster") - Expect(cct.Watch(ctx, WatchInput{ - Cluster: differentClusterInput.clusterKey, - Watcher: differentClusterInput.watcher, - Kind: differentClusterInput.kind, - EventHandler: differentClusterInput.eventHandler, - Predicates: differentClusterInput.predicates, - })).To(Succeed()) - }) + By("Waiting to receive the watch notification") + Expect(<-c.ch).To(Equal("mapped-" + clusterA.Name)) - // Check conditions for both clusters are still satisfied - assertWatch(&input) - assertWatch(&differentClusterInput) - - It("should have independent caches for each cluster", func() { - testClusterCache, ok := func() (*clusterCache, bool) { - cct.clusterCachesLock.RLock() - defer cct.clusterCachesLock.RUnlock() - cc, ok := cct.clusterCaches[input.clusterKey] - return cc, ok - }() - Expect(ok).To(BeTrue()) - differentClusterCache, ok := func() (*clusterCache, bool) { - cct.clusterCachesLock.RLock() - defer cct.clusterCachesLock.RUnlock() - cc, ok := cct.clusterCaches[differentClusterInput.clusterKey] - return cc, ok - }() - Expect(ok).To(BeTrue()) - // Check stop channels are different, so that they can be stopped independently - Expect(testClusterCache.stop).ToNot(Equal(differentClusterCache.stop)) - // Caches should also be different as they were constructed for different clusters - // Check the memory locations to assert independence - Expect(fmt.Sprintf("%p", testClusterCache.Cache)).ToNot(Equal(fmt.Sprintf("%p", differentClusterCache.Cache))) - }) + By("Ensuring no additional watch notifications arrive") + Consistently(func() int { + return len(c.ch) + }).Should(Equal(0)) }) }) }) -type testWatchFunc = func(source.Source, handler.EventHandler, ...predicate.Predicate) error - -type testWatcher struct { - watchInfo chan testWatcherInfo -} - -type testWatcherInfo struct { - handler handler.EventHandler - predicates []predicate.Predicate -} - -func (t *testWatcher) Watch(s source.Source, h handler.EventHandler, ps ...predicate.Predicate) error { - t.watchInfo <- testWatcherInfo{ - handler: h, - predicates: ps, - } - return nil +type testController struct { + ch chan string } -func newTestWatcher() (Watcher, chan testWatcherInfo) { - watchInfo := make(chan testWatcherInfo, 5) - return &testWatcher{ - watchInfo: watchInfo, - }, watchInfo +func (c *testController) Reconcile(req reconcile.Request) (reconcile.Result, error) { + c.ch <- req.Name + return ctrl.Result{}, nil } func cleanupTestSecrets(ctx context.Context, c client.Client) error { diff --git a/controllers/remote/stoppable_cache.go b/controllers/remote/stoppable_cache.go new file mode 100644 index 00000000000..64c98abbb3b --- /dev/null +++ b/controllers/remote/stoppable_cache.go @@ -0,0 +1,45 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "sync" + + "sigs.k8s.io/controller-runtime/pkg/cache" +) + +// stoppableCache embeds cache.Cache and combines it with a stop channel. +type stoppableCache struct { + cache.Cache + + lock sync.Mutex + stopped bool + stop chan struct{} +} + +// Stop closes the cache.Cache's stop channel if it has not already been stopped. +func (cc *stoppableCache) Stop() { + cc.lock.Lock() + defer cc.lock.Unlock() + + if cc.stopped { + return + } + + cc.stopped = true + close(cc.stop) +}