Skip to content

Commit

Permalink
Optimize registration process
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuildthecloud committed Oct 3, 2020
1 parent 623bf4c commit ef891e9
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 99 deletions.
12 changes: 7 additions & 5 deletions pkg/apis/fleet.cattle.io/v1alpha1/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
)

var (
ClusterConditionReady = "Ready"
ClusterGroupAnnotation = "fleet.cattle.io/cluster-group"
ClusterNamespaceAnnotation = "fleet.cattle.io/cluster-namespace"
ClusterAnnotation = "fleet.cattle.io/cluster"
ManagedLabel = "fleet.cattle.io/managed"
ClusterConditionReady = "Ready"
ClusterGroupAnnotation = "fleet.cattle.io/cluster-group"
ClusterNamespaceAnnotation = "fleet.cattle.io/cluster-namespace"
ClusterAnnotation = "fleet.cattle.io/cluster"
ClusterRegistrationAnnotation = "fleet.cattle.io/cluster-registration"
ClusterRegistrationNamespaceAnnotation = "fleet.cattle.io/cluster-registration-namespace"
ManagedLabel = "fleet.cattle.io/managed"

BootstrapToken = "fleet.cattle.io/bootstrap-token"
)
Expand Down
51 changes: 29 additions & 22 deletions pkg/controllers/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ import (
"github.com/rancher/fleet/pkg/controllers/clusterregistration"
fleetcontrollers "github.com/rancher/fleet/pkg/generated/controllers/fleet.cattle.io/v1alpha1"
"github.com/rancher/fleet/pkg/summary"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
corecontrollers "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/name"
"github.com/rancher/wrangler/pkg/relatedresource"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)

type handler struct {
apply apply.Apply
clusters fleetcontrollers.ClusterCache
clusterGroups fleetcontrollers.ClusterGroupCache
bundleDeployment fleetcontrollers.BundleDeploymentCache
namespaceCache corecontrollers.NamespaceCache
namespaces corecontrollers.NamespaceClient
gitRepos fleetcontrollers.GitRepoCache
}

Expand All @@ -38,25 +38,22 @@ func Register(ctx context.Context,
clusterGroups fleetcontrollers.ClusterGroupCache,
clusters fleetcontrollers.ClusterController,
gitRepos fleetcontrollers.GitRepoCache,
namespaces corecontrollers.NamespaceController, apply apply.Apply) {
namespaces corecontrollers.NamespaceController) {

h := &handler{
apply: apply,
clusterGroups: clusterGroups,
clusters: clusters.Cache(),
bundleDeployment: bundleDeployment.Cache(),
namespaceCache: namespaces.Cache(),
namespaces: namespaces,
gitRepos: gitRepos,
}

fleetcontrollers.RegisterClusterGeneratingHandler(ctx,
fleetcontrollers.RegisterClusterStatusHandler(ctx,
clusters,
apply,
"Processed",
"managed-cluster",
h.OnClusterChanged,
&generic.GeneratingHandlerOptions{
AllowClusterScoped: true,
})
h.OnClusterChanged)

relatedresource.Watch(ctx, "managed-cluster", h.findClusters(namespaces.Cache()), clusters, bundleDeployment)
}
Expand Down Expand Up @@ -86,14 +83,9 @@ func (h *handler) findClusters(namespaces corecontrollers.NamespaceCache) relate
}
}

func (h *handler) OnClusterChanged(cluster *fleet.Cluster, status fleet.ClusterStatus) ([]runtime.Object, fleet.ClusterStatus, error) {
func (h *handler) OnClusterChanged(cluster *fleet.Cluster, status fleet.ClusterStatus) (fleet.ClusterStatus, error) {
if cluster.DeletionTimestamp != nil {
return nil, status, nil
}

bundleDeployments, err := h.bundleDeployment.List(status.Namespace, labels.Everything())
if err != nil {
return nil, status, err
return status, nil
}

if status.Namespace == "" {
Expand All @@ -104,6 +96,11 @@ func (h *handler) OnClusterChanged(cluster *fleet.Cluster, status fleet.ClusterS
status.Namespace = ns
}

bundleDeployments, err := h.bundleDeployment.List(status.Namespace, labels.Everything())
if err != nil {
return status, err
}

status.DesiredReadyGitRepos = 0
status.ReadyGitRepos = 0
status.ResourceCounts = fleet.GitRepoResourceCounts{}
Expand Down Expand Up @@ -138,8 +135,13 @@ func (h *handler) OnClusterChanged(cluster *fleet.Cluster, status fleet.ClusterS
}

summary.SetReadyConditions(&status, "Bundle", status.Summary)
return []runtime.Object{
&v1.Namespace{
return status, h.createNamespace(cluster, status)
}

func (h *handler) createNamespace(cluster *fleet.Cluster, status fleet.ClusterStatus) error {
_, err := h.namespaceCache.Get(status.Namespace)
if apierrors.IsNotFound(err) {
_, err = h.namespaces.Create(&v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: status.Namespace,
Labels: map[string]string{
Expand All @@ -150,6 +152,11 @@ func (h *handler) OnClusterChanged(cluster *fleet.Cluster, status fleet.ClusterS
fleet.ClusterAnnotation: cluster.Name,
},
},
},
}, status, nil
})
}

if apierrors.IsAlreadyExists(err) {
return nil
}
return err
}
123 changes: 80 additions & 43 deletions pkg/controllers/clusterregistration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
rbaccontrollers "github.com/rancher/wrangler/pkg/generated/controllers/rbac/v1"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/name"
"github.com/rancher/wrangler/pkg/relatedresource"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand All @@ -26,8 +27,10 @@ import (
)

const (
AgentCredentialSecretType = "fleet.cattle.io/agent-credential"
clusterByClientID = "clusterByClientID"
AgentCredentialSecretType = "fleet.cattle.io/agent-credential"
clusterByClientID = "clusterByClientID"
clusterRegistrationByClientID = "clusterRegistrationByClientID"
deleteSecretAfter = 40 * time.Minute
)

type handler struct {
Expand All @@ -38,7 +41,7 @@ type handler struct {
clusters fleetcontrollers.ClusterClient
serviceAccountCache corecontrollers.ServiceAccountCache
secretsCache corecontrollers.SecretCache
secrets corecontrollers.SecretClient
secrets corecontrollers.SecretController
}

func Register(ctx context.Context,
Expand All @@ -49,16 +52,13 @@ func Register(ctx context.Context,
secret corecontrollers.SecretController,
role rbaccontrollers.RoleController,
roleBinding rbaccontrollers.RoleBindingController,
clusterRole rbaccontrollers.ClusterRoleController,
clusterRoleBinding rbaccontrollers.ClusterRoleBindingController,
clusterRegistration fleetcontrollers.ClusterRegistrationController,
clusterCache fleetcontrollers.ClusterCache,
clusters fleetcontrollers.ClusterClient) {
clusters fleetcontrollers.ClusterController) {
h := &handler{
systemNamespace: systemNamespace,
systemRegistrationNamespace: systemRegistrationNamespace,
clusterRegistration: clusterRegistration,
clusterCache: clusterCache,
clusterCache: clusters.Cache(),
clusters: clusters,
serviceAccountCache: serviceAccount.Cache(),
secrets: secret,
Expand All @@ -71,9 +71,6 @@ func Register(ctx context.Context,
secret,
role,
roleBinding,
clusterRole,
clusterRoleBinding,
clusterRegistration,
),
"",
"cluster-registration",
Expand All @@ -82,11 +79,54 @@ func Register(ctx context.Context,
AllowClusterScoped: true,
})

clusterCache.AddIndexer(clusterByClientID, func(obj *fleet.Cluster) ([]string, error) {
secret.OnChange(ctx, "registration-expire", h.OnSecretChange)
clusters.OnChange(ctx, "cluster-to-clusterregistration", h.OnCluster)
clusters.Cache().AddIndexer(clusterByClientID, func(obj *fleet.Cluster) ([]string, error) {
return []string{
fmt.Sprintf("%s/%s", obj.Namespace, obj.Spec.ClientID),
}, nil
})
clusterRegistration.Cache().AddIndexer(clusterRegistrationByClientID, func(obj *fleet.ClusterRegistration) ([]string, error) {
return []string{
fmt.Sprintf("%s/%s", obj.Namespace, obj.Spec.ClientID),
}, nil
})
relatedresource.Watch(ctx, "sa-to-cluster-registration", saToClusterRegistration, clusterRegistration, serviceAccount)
}

func saToClusterRegistration(namespace, name string, obj runtime.Object) ([]relatedresource.Key, error) {
if sa, ok := obj.(*v1.ServiceAccount); ok {
ns := sa.Annotations[fleet.ClusterRegistrationNamespaceAnnotation]
name := sa.Annotations[fleet.ClusterRegistrationAnnotation]
if ns != "" && name != "" && len(sa.Secrets) > 0 {
return []relatedresource.Key{{
Namespace: ns,
Name: name,
}}, nil
}
}
return nil, nil
}

func (h *handler) OnCluster(key string, cluster *fleet.Cluster) (*fleet.Cluster, error) {
if cluster == nil || cluster.Status.Namespace == "" {
return cluster, nil
}

crs, err := h.clusterRegistration.Cache().GetByIndex(clusterRegistrationByClientID,
fmt.Sprintf("%s/%s", cluster.Namespace, cluster.Spec.ClientID))
if err != nil {
return nil, err
}
for _, cr := range crs {
if !cr.Status.Granted {
logrus.Infof("Namespace assigned to %s/%s triggering %s/%s", cluster.Namespace, cluster.Name,
cr.Namespace, cr.Name)
h.clusterRegistration.Enqueue(cr.Namespace, cr.Name)
}
}

return cluster, nil
}

func (h *handler) authorizeCluster(sa *v1.ServiceAccount, cluster *fleet.Cluster, req *fleet.ClusterRegistration) (*v1.Secret, error) {
Expand Down Expand Up @@ -123,6 +163,21 @@ func (h *handler) authorizeCluster(sa *v1.ServiceAccount, cluster *fleet.Cluster
}, nil
}

func (h *handler) OnSecretChange(key string, secret *v1.Secret) (*v1.Secret, error) {
if secret == nil || secret.Namespace != h.systemRegistrationNamespace ||
secret.Labels[fleet.ClusterAnnotation] == "" {
return secret, nil
}

if time.Now().Sub(secret.CreationTimestamp.Time) > deleteSecretAfter {
logrus.Infof("Deleting expired registration secret %s/%s", secret.Namespace, secret.Name)
return secret, h.secrets.Delete(secret.Namespace, secret.Name, nil)
}

h.secrets.EnqueueAfter(secret.Namespace, secret.Name, deleteSecretAfter/2)
return secret, nil
}

func (h *handler) OnChange(request *fleet.ClusterRegistration, status fleet.ClusterRegistrationStatus) ([]runtime.Object, fleet.ClusterRegistrationStatus, error) {
var (
objects []runtime.Object
Expand All @@ -139,7 +194,7 @@ func (h *handler) OnChange(request *fleet.ClusterRegistration, status fleet.Clus
}

if cluster.Status.Namespace == "" {
h.clusterRegistration.EnqueueAfter(request.Namespace, request.Name, 2*time.Second)
status.ClusterName = cluster.Name
return nil, status, nil
}

Expand All @@ -154,13 +209,8 @@ func (h *handler) OnChange(request *fleet.ClusterRegistration, status fleet.Clus
}
}

logrus.Infof("Cluster registration %s/%s, secret created [%v], granted [%v]",
request.Namespace, request.Name, len(objects) > 0, status.Granted)

if !status.Granted {
// try again 2 seconds later
h.clusterRegistration.EnqueueAfter(request.Namespace, request.Name, 2*time.Second)
}
logrus.Infof("Cluster registration %s/%s, cluster %s/%s granted [%v]",
request.Namespace, request.Name, cluster.Namespace, cluster.Name, status.Granted)

status.ClusterName = cluster.Name
return append(objects,
Expand All @@ -169,8 +219,10 @@ func (h *handler) OnChange(request *fleet.ClusterRegistration, status fleet.Clus
Name: saName,
Namespace: cluster.Status.Namespace,
Annotations: map[string]string{
fleet.ManagedLabel: "true",
fleet.ClusterAnnotation: cluster.Name,
fleet.ManagedLabel: "true",
fleet.ClusterAnnotation: cluster.Name,
fleet.ClusterRegistrationAnnotation: request.Name,
fleet.ClusterRegistrationNamespaceAnnotation: request.Namespace,
},
},
},
Expand Down Expand Up @@ -232,26 +284,6 @@ func (h *handler) OnChange(request *fleet.ClusterRegistration, status fleet.Clus
Kind: "Role",
Name: request.Name,
},
},
&rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: name.SafeConcatName(request.Namespace, request.Name),
Annotations: map[string]string{
fleet.ManagedLabel: "true",
},
},
Subjects: []rbacv1.Subject{
{
Kind: "ServiceAccount",
Name: saName,
Namespace: cluster.Status.Namespace,
},
},
RoleRef: rbacv1.RoleRef{
APIGroup: rbacv1.GroupName,
Kind: "ClusterRole",
Name: "fleet-content",
},
}), status, nil
}

Expand Down Expand Up @@ -289,7 +321,6 @@ func (h *handler) createOrGetCluster(request *fleet.ClusterRegistration) (*fleet
}
labels[fleet.ClusterAnnotation] = clusterName

logrus.Infof("Creating cluster %s/%s", request.Namespace, clusterName)
cluster, err := h.clusters.Create(&fleet.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Expand All @@ -300,5 +331,11 @@ func (h *handler) createOrGetCluster(request *fleet.ClusterRegistration) (*fleet
ClientID: request.Spec.ClientID,
},
})
if apierrors.IsAlreadyExists(err) {
return h.clusters.Get(request.Namespace, clusterName, metav1.GetOptions{})
}
if err == nil {
logrus.Infof("Created cluster %s/%s", request.Namespace, clusterName)
}
return cluster, err
}

0 comments on commit ef891e9

Please sign in to comment.