Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Goldstein <goldsteina@vmware.com>
  • Loading branch information
ncdc committed Sep 17, 2020
1 parent b1192a3 commit e0df884
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 40 deletions.
60 changes: 30 additions & 30 deletions controllers/remote/cluster_cache.go
Expand Up @@ -54,17 +54,17 @@ type ClusterCacheTracker struct {
client client.Client
scheme *runtime.Scheme

lock sync.RWMutex
accessors map[client.ObjectKey]*accessor
lock sync.RWMutex
clusterAccessors map[client.ObjectKey]*clusterAccessor
}

// NewClusterCacheTracker creates a new ClusterCacheTracker.
func NewClusterCacheTracker(log logr.Logger, manager ctrl.Manager) (*ClusterCacheTracker, error) {
return &ClusterCacheTracker{
log: log,
client: manager.GetClient(),
scheme: manager.GetScheme(),
accessors: make(map[client.ObjectKey]*accessor),
log: log,
client: manager.GetClient(),
scheme: manager.GetScheme(),
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
}, nil
}

Expand All @@ -73,50 +73,50 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje
t.lock.Lock()
defer t.lock.Unlock()

accessor, err := t.getAccessorLH(ctx, cluster)
accessor, err := t.getClusterAccessorLH(ctx, cluster)
if err != nil {
return nil, err
}

return accessor.client, nil
}

// accessor represents the combination of a client, cache, and watches for a remote cluster.
type accessor struct {
// clusterAccessor represents the combination of a client, cache, and watches for a remote cluster.
type clusterAccessor struct {
cache *stoppableCache
client *client.DelegatingClient
watches sets.String
}

// accessorExists returns true if an accessor exists for cluster.
func (t *ClusterCacheTracker) accessorExists(cluster client.ObjectKey) bool {
// clusterAccessorExists returns true if a clusterAccessor exists for cluster.
func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bool {
t.lock.RLock()
defer t.lock.RUnlock()

_, exists := t.accessors[cluster]
_, exists := t.clusterAccessors[cluster]
return exists
}

// getAccessorLH first tries to return an already-created accessor for cluster, falling back to creating an accessor
// if needed. Note, this method requires t.lock to already be held (LH=lock held).
func (t *ClusterCacheTracker) getAccessorLH(ctx context.Context, cluster client.ObjectKey) (*accessor, error) {
a := t.accessors[cluster]
// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a
// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held).
func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey) (*clusterAccessor, error) {
a := t.clusterAccessors[cluster]
if a != nil {
return a, nil
}

a, err := t.newAccessor(ctx, cluster)
a, err := t.newClusterAccessor(ctx, cluster)
if err != nil {
return nil, errors.Wrap(err, "error creating client and cache for remote cluster")
}

t.accessors[cluster] = a
t.clusterAccessors[cluster] = a

return a, nil
}

// newAccessor creates a new accessor.
func (t *ClusterCacheTracker) newAccessor(ctx context.Context, cluster client.ObjectKey) (*accessor, error) {
// newClusterAccessor creates a new clusterAccessor.
func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster client.ObjectKey) (*clusterAccessor, error) {
// Get a rest config for the remote cluster
config, err := RESTConfig(ctx, t.client, cluster)
if err != nil {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (t *ClusterCacheTracker) newAccessor(ctx context.Context, cluster client.Ob
}

// Start the cache!!!
go remoteCache.Start(cache.stop)
go cache.Start(cache.stop)

// Start cluster healthcheck!!!
go t.healthCheckCluster(&healthCheckInput{
Expand All @@ -163,35 +163,35 @@ func (t *ClusterCacheTracker) newAccessor(ctx context.Context, cluster client.Ob
})

delegatingClient := &client.DelegatingClient{
Reader: remoteCache,
Reader: cache,
Writer: c,
StatusClient: c,
}

return &accessor{
return &clusterAccessor{
cache: cache,
client: delegatingClient,
watches: sets.NewString(),
}, nil
}

// deleteAccessor stops an accessor's cache and removes the accessor from the tracker.
// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
func (t *ClusterCacheTracker) deleteAccessor(cluster client.ObjectKey) {
t.lock.Lock()
defer t.lock.Unlock()

a, exists := t.accessors[cluster]
a, exists := t.clusterAccessors[cluster]
if !exists {
return
}

t.log.V(2).Info("Deleting accessor", "cluster", cluster.String())
t.log.V(2).Info("Deleting clusterAccessor", "cluster", cluster.String())

t.log.V(4).Info("Stopping cache", "cluster", cluster.String())
a.cache.Stop()
t.log.V(4).Info("Cache stopped", "cluster", cluster.String())

delete(t.accessors, cluster)
delete(t.clusterAccessors, cluster)
}

// Watcher is a scoped-down interface from Controller that only knows how to watch.
Expand All @@ -202,7 +202,7 @@ type Watcher interface {

// WatchInput specifies the parameters used to establish a new watch for a remote cluster.
type WatchInput struct {
// Name represents a unique watch request.
// Name represents a unique watch request for the specified Cluster.
Name string

// Cluster is the key for the remote cluster.
Expand Down Expand Up @@ -230,7 +230,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
t.lock.Lock()
defer t.lock.Unlock()

a, err := t.getAccessorLH(ctx, input.Cluster)
a, err := t.getClusterAccessorLH(ctx, input.Cluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) {
return false, nil
}

if !t.accessorExists(in.cluster) {
if !t.clusterAccessorExists(in.cluster) {
// Cache for this cluster has already been cleaned up.
// Nothing to do, so return true.
return true, nil
Expand Down
10 changes: 5 additions & 5 deletions controllers/remote/cluster_cache_healthcheck_test.go
Expand Up @@ -94,7 +94,7 @@ var _ = Describe("ClusterCache HealthCheck suite", func() {
cc = &stoppableCache{
stop: make(chan struct{}),
}
cct.accessors[testClusterKey] = &accessor{cache: cc}
cct.clusterAccessors[testClusterKey] = &clusterAccessor{cache: cc}
})

AfterEach(func() {
Expand All @@ -113,17 +113,17 @@ var _ = Describe("ClusterCache HealthCheck suite", func() {
go cct.healthCheckCluster(&healthCheckInput{stop, testClusterKey, testEnv.Config, testPollInterval, testPollTimeout, testUnhealthyThreshold, "/"})

// Make sure this passes for at least two seconds, to give the health check goroutine time to run.
Consistently(func() bool { return cct.accessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeTrue())
Consistently(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeTrue())
})

It("with an invalid path", func() {
stop := make(chan struct{})
defer close(stop)

go cct.healthCheckCluster(&healthCheckInput{stop, testClusterKey, testEnv.Config, testPollInterval, testPollTimeout, testUnhealthyThreshold, "/accessor"})
go cct.healthCheckCluster(&healthCheckInput{stop, testClusterKey, testEnv.Config, testPollInterval, testPollTimeout, testUnhealthyThreshold, "/clusterAccessor"})

// This should succeed after N consecutive failed requests.
Eventually(func() bool { return cct.accessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeFalse())
Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeFalse())
})

It("with an invalid config", func() {
Expand All @@ -143,7 +143,7 @@ var _ = Describe("ClusterCache HealthCheck suite", func() {
go cct.healthCheckCluster(&healthCheckInput{stop, testClusterKey, config, testPollInterval, testPollTimeout, testUnhealthyThreshold, "/"})

// This should succeed after N consecutive failed requests.
Eventually(func() bool { return cct.accessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeFalse())
Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 2*time.Second, 100*time.Millisecond).Should(BeFalse())
})
})
})
10 changes: 5 additions & 5 deletions controllers/remote/cluster_cache_reconciler_test.go
Expand Up @@ -42,7 +42,7 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
testNamespace *corev1.Namespace
)

// createAndWatchCluster creates a new cluster and ensures the clusterCacheTracker has an accessor for it
// createAndWatchCluster creates a new cluster and ensures the clusterCacheTracker has a clusterAccessor for it
createAndWatchCluster := func(clusterName string) {
By(fmt.Sprintf("Creating a cluster %q", clusterName))
testCluster := &clusterv1.Cluster{
Expand All @@ -68,7 +68,7 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
return k8sClient.Get(ctx, secretKey, &corev1.Secret{})
}, timeout).Should(Succeed())

By("Creating an accessor for the cluster")
By("Creating a clusterAccessor for the cluster")
_, err := cct.GetClient(ctx, testClusterKey)
Expect(err).To(BeNil())
}
Expand Down Expand Up @@ -121,7 +121,7 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
close(doneMgr)
})

It("should remove accessors when clusters are deleted", func() {
It("should remove clusterAccessors when clusters are deleted", func() {
for _, clusterName := range []string{"cluster-1", "cluster-2", "cluster-3"} {
By(fmt.Sprintf("Deleting cluster %q", clusterName))
obj := &clusterv1.Cluster{
Expand All @@ -132,8 +132,8 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
}
Expect(k8sClient.Delete(ctx, obj)).To(Succeed())

By(fmt.Sprintf("Checking cluster %q's accessor is removed", clusterName))
Eventually(func() bool { return cct.accessorExists(util.ObjectKey(obj)) }, timeout).Should(BeFalse())
By(fmt.Sprintf("Checking cluster %q's clusterAccessor is removed", clusterName))
Eventually(func() bool { return cct.clusterAccessorExists(util.ObjectKey(obj)) }, timeout).Should(BeFalse())
}
})
})
Expand Down

0 comments on commit e0df884

Please sign in to comment.