Skip to content

Commit

Permalink
Merge pull request #279 from sabre1041/refactor-pruning
Browse files Browse the repository at this point in the history
Refactored pruning and error handling logic
  • Loading branch information
raffaelespazzoli committed Sep 15, 2023
2 parents efd060b + 0656728 commit 83f1000
Showing 1 changed file with 53 additions and 45 deletions.
98 changes: 53 additions & 45 deletions controllers/groupsync_controller.go
Expand Up @@ -18,7 +18,6 @@ package controllers

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -32,7 +31,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubeclock "k8s.io/apimachinery/pkg/util/clock"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
kubeclock "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -93,6 +93,8 @@ func (r *GroupSyncReconciler) Reconcile(context context.Context, req ctrl.Reques
return r.ManageError(context, instance, err)
}

syncErrors := []error{}

// Execute Each Provider Syncer
for _, groupSyncer := range groupSyncMgr.GroupSyncers {

Expand All @@ -105,22 +107,23 @@ func (r *GroupSyncReconciler) Reconcile(context context.Context, req ctrl.Reques

// Initialize Connection
if err := groupSyncer.Bind(); err != nil {
return r.wrapMetricsErrorWithMetrics(prometheusLabels, context, instance, err)
r.manageSyncError(prometheusLabels, &syncErrors, err)
continue
}

syncStartTime := time.Now().Format(time.RFC3339)
// Perform Sync
groups, err := groupSyncer.Sync()

if err != nil {
logger.Error(err, "Failed to Complete Sync", "Provider", groupSyncer.GetProviderName())
return r.wrapMetricsErrorWithMetrics(prometheusLabels, context, instance, err)
r.manageSyncError(prometheusLabels, &syncErrors, err)
continue
}

updatedGroups := 0
prunedGroups := 0

for _, group := range groups {
for i, group := range groups {

ocpGroup := &userv1.Group{}
err := r.GetClient().Get(context, types.NamespacedName{Name: group.Name, Namespace: ""}, ocpGroup)
Expand All @@ -136,11 +139,12 @@ func (r *GroupSyncReconciler) Reconcile(context context.Context, req ctrl.Reques
ocpGroup.Name = group.Name

} else if err != nil {
return r.wrapMetricsErrorWithMetrics(prometheusLabels, context, instance, err)
r.manageSyncError(prometheusLabels, &syncErrors, err)
continue
} else {
// Verify this group is not managed by another provider
if groupProviderLabel, exists := ocpGroup.Labels[constants.SyncProvider]; !exists || (groupProviderLabel != providerLabel) {
r.Log.Info("Group Provider Label Did Not Match Expected Provider Label", "Group Name", ocpGroup.Name, "Expected Label", providerLabel, "Found Label", groupProviderLabel)
r.Log.Info("Group Provider Label Did Not Match Expected Provider Label", "Provider", groupSyncer.GetProviderName(), "Group Name", ocpGroup.Name, "Expected Label", providerLabel, "Found Label", groupProviderLabel)
continue
}
}
Expand All @@ -163,34 +167,38 @@ func (r *GroupSyncReconciler) Reconcile(context context.Context, req ctrl.Reques
ocpGroup.Labels[constants.SyncProvider] = providerLabel

// Add Gloabl Annotations/Labels
ocpGroup.Annotations[constants.SyncTimestamp] = time.Now().UTC().Format(time.RFC3339)
now := time.Now().UTC().Format(time.RFC3339)
ocpGroup.Annotations[constants.SyncTimestamp] = now

ocpGroup.Users = group.Users

err = r.CreateOrUpdateResource(context, nil, "", ocpGroup)

group.UID = ocpGroup.UID
groups[i] = group

if err != nil {
r.Log.Error(err, "Failed to Create or Update OpenShift Group")
return r.wrapMetricsErrorWithMetrics(prometheusLabels, context, instance, err)
r.Log.Error(err, "Failed to Create or Update OpenShift Group", "Provider", groupSyncer.GetProviderName())
r.manageSyncError(prometheusLabels, &syncErrors, err)
continue
}

updatedGroups++
}

if groupSyncer.GetPrune() {
logger.Info("Start Pruning Groups")
prunedGroups, err = r.pruneGroups(context, instance, providerLabel, syncStartTime, logger)
logger.Info("Start Pruning Groups", "Provider", groupSyncer.GetProviderName())
prunedGroups, err = r.pruneGroups(context, instance, groups, groupSyncer.GetProviderName(), providerLabel, logger)
if err != nil {
r.Log.Error(err, "Failed to Prune Group")
return r.wrapMetricsErrorWithMetrics(prometheusLabels, context, instance, err)
r.Log.Error(err, "Failed to Prune Group", "Provider", groupSyncer.GetProviderName())
r.manageSyncError(prometheusLabels, &syncErrors, err)
}
logger.Info("Pruning Completed")
logger.Info("Pruning Completed", "Provider", groupSyncer.GetProviderName())
}

logger.Info("Sync Completed Successfully", "Provider", groupSyncer.GetProviderName(), "Groups Created or Updated", updatedGroups, "Groups Pruned", prunedGroups)

// Add Metrics

successfulGroupSyncs.With(prometheusLabels).Inc()
groupsSynchronized.With(prometheusLabels).Set(float64(updatedGroups))
groupSyncError.With(prometheusLabels).Set(0)
Expand All @@ -199,6 +207,11 @@ func (r *GroupSyncReconciler) Reconcile(context context.Context, req ctrl.Reques
}
}

// Throw error if error occurred during sync
if len(syncErrors) > 0 {
return r.ManageError(context, instance, utilerrors.NewAggregate(syncErrors))
}

instance.Status.LastSyncSuccessTime = &metav1.Time{Time: clock.Now()}

successResult, err := r.ManageSuccess(context, instance)
Expand All @@ -222,24 +235,18 @@ func (r *GroupSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *GroupSyncReconciler) wrapMetricsErrorWithMetrics(prometheusLabels prometheus.Labels, context context.Context, obj client.Object, issue error) (ctrl.Result, error) {
func (r *GroupSyncReconciler) manageSyncError(prometheusLabels prometheus.Labels, syncErrors *[]error, err error) {

unsuccessfulGroupSyncs.With(prometheusLabels).Inc()
groupSyncError.With(prometheusLabels).Set(1)

return r.ManageError(context, obj, issue)
*syncErrors = append(*syncErrors, err)

}

func (r *GroupSyncReconciler) pruneGroups(context context.Context, instance *redhatcopv1alpha1.GroupSync, providerLabel string, syncStartTime string, logger logr.Logger) (int, error) {
func (r *GroupSyncReconciler) pruneGroups(context context.Context, instance *redhatcopv1alpha1.GroupSync, syncedGroups []userv1.Group, providerName, providerLabel string, logger logr.Logger) (int, error) {
prunedGroups := 0

syncStartDatetime, syncStartParseError := time.Parse(time.RFC3339, syncStartTime)

// Should not occur
if syncStartParseError != nil {
return prunedGroups, syncStartParseError
}

ocpGroups := &userv1.GroupList{}
opts := []client.ListOption{
client.InNamespace(""),
Expand All @@ -252,31 +259,32 @@ func (r *GroupSyncReconciler) pruneGroups(context context.Context, instance *red

for _, group := range ocpGroups.Items {

if groupSyncTime, ok := group.Annotations[constants.SyncTimestamp]; ok {

groupSyncDatetime, groupSyncTimeParseErr := time.Parse(time.RFC3339, groupSyncTime)
// Remove group if not found in the list of synchronized groups
groupFound := isGroupFound(group, syncedGroups)

if groupSyncTimeParseErr == nil {
if groupSyncDatetime.Before(syncStartDatetime) {
logger.Info("pruneGroups", "Delete Group", group.Name)
err = r.GetClient().Delete(context, &group)
prunedGroups++
if err != nil {
return prunedGroups, err
}
}
} else {
if groupSyncTimeParseErr != nil {
logger.Error(groupSyncTimeParseErr, "Error parsing group start time annotation", "Group Name", group.Name, "Time", syncStartTime)
}
if !groupFound {
logger.Info("Pruning Group", "Provider", providerName, "Group", group.Name)
err = r.GetClient().Delete(context, &group)
prunedGroups++
if err != nil {
return prunedGroups, err
}
} else {
logger.Error(errors.New("unable to locate sync timestamp annotation"), "Group Name", group.Name)
}
}
return prunedGroups, nil
}

func isGroupFound(canidateGroup userv1.Group, baseGroups []userv1.Group) bool {

for _, baseGroup := range baseGroups {
if baseGroup.UID == canidateGroup.UID {
return true
}
}

return false
}

func mergeMap(m1, m2 map[string]string) map[string]string {

if m1 != nil {
Expand Down

0 comments on commit 83f1000

Please sign in to comment.