Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds controller to allow addon to reconcile on changes #46

Merged
merged 16 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/resources/cluster-management-addon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
supportedConfigs:
# Describes the general addon configuration applicable for all managed clusters. It includes:
# - Default subscription channel name for install the `Red Hat OpenShift Logging` operator on each managed cluster.
# - Default subscription channel name for install the `Red Hat OpenShift distributed tracing data collection` operator on each managed cluster.
# - Default subscription channel name for install the `Red Hat OpenShift distributed tracing platform` operator on each managed cluster.
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
- group: addon.open-cluster-management.io
resource: addondeploymentconfigs
defaultConfig:
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ require (
sigs.k8s.io/controller-runtime v0.17.0
)

require gopkg.in/yaml.v3 v3.0.1
require (
github.com/go-logr/logr v1.4.1
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/BurntSushi/toml v1.2.1 // indirect
Expand All @@ -51,7 +54,6 @@ require (
github.com/fatih/structs v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down
198 changes: 198 additions & 0 deletions internal/controllers/watcher/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package watcher

import (
"context"
"errors"
"fmt"

"github.com/go-logr/logr"
otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
loggingv1 "github.com/openshift/cluster-logging-operator/apis/logging/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"open-cluster-management.io/addon-framework/pkg/addonmanager"
addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var noReconcilePred = builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool { return false },
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
})

type WatcherManager struct {
mgr *ctrl.Manager
logger logr.Logger
}

func NewWatcherManager(logger logr.Logger, scheme *runtime.Scheme, addonManager *addonmanager.AddonManager) (*WatcherManager, error) {
l := logger.WithName("mcoa-watcher")

ctrl.SetLogger(l)

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
if err != nil {
return nil, fmt.Errorf("unable to start manager: %w", err)
}

if err = (&WatcherReconciler{
Client: mgr.GetClient(),
Log: l.WithName("controllers").WithName("mcoa-watcher"),
Scheme: mgr.GetScheme(),
addonnManager: addonManager,
}).SetupWithManager(mgr); err != nil {
return nil, fmt.Errorf("unable to create mcoa-watcher controller: %w", err)
}

if err = mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
return nil, fmt.Errorf("unable to set up health check: %w", err)
}
if err = mgr.AddReadyzCheck("check", healthz.Ping); err != nil {
return nil, fmt.Errorf("unable to set up ready check: %w", err)
}

wm := WatcherManager{
mgr: &mgr,
logger: l,
}

return &wm, nil
}

func (wm *WatcherManager) Start(ctx context.Context) {
wm.logger.Info("Starting watcher manager")
go func() {
err := (*wm.mgr).Start(ctx)
if err != nil {
wm.logger.Error(err, "there was an error while running the reconciliation watcher")
}
}()
periklis marked this conversation as resolved.
Show resolved Hide resolved
}

// WatcherReconciler reconciles the ManagedClusterAddon to annotate the ManiestWorks resource
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
type WatcherReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
addonnManager *addonmanager.AddonManager
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
}

var (
noNameErr = errors.New("no name for reconciliation request")
noNamespaceErr = errors.New("no namespace for reconciliation request")
)

// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
func (r *WatcherReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if req.Name == "" {
return ctrl.Result{}, noNameErr
}
if req.Namespace == "" {
return ctrl.Result{}, noNamespaceErr
}
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
(*r.addonnManager).Trigger(req.Namespace, req.Name)

r.Log.V(2).Info("reconciliation triggered", "cluster", req.Namespace)

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *WatcherReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&addonapiv1alpha1.ManagedClusterAddOn{}, noReconcilePred).
Watches(&corev1.ConfigMap{}, r.enqueueForClusterSpecificResource(), builder.OnlyMetadata).
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
Watches(&corev1.Secret{}, r.enqueueForClusterSpecificResource(), builder.OnlyMetadata).
Watches(&loggingv1.ClusterLogForwarder{}, r.enqueueForClusterWideResource(), builder.OnlyMetadata).
Watches(&otelv1alpha1.OpenTelemetryCollector{}, r.enqueueForClusterWideResource(), builder.OnlyMetadata).
Complete(r)
}

func (r *WatcherReconciler) enqueueForClusterSpecificResource() handler.EventHandler {
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
key := client.ObjectKey{Name: "multicluster-observability-addon", Namespace: obj.GetNamespace()}
addon := &addonapiv1alpha1.ManagedClusterAddOn{}
if err := r.Client.Get(ctx, key, addon); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
r.Log.Error(err, "Error getting managedclusteraddon resources in event handler")
return nil
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: addon.Name,
Namespace: addon.Namespace,
},
},
}
})
}

func (r *WatcherReconciler) enqueueForClusterWideResource() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
addonList := &addonapiv1alpha1.ManagedClusterAddOnList{}
if err := r.Client.List(ctx, addonList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
"open-cluster-management.io/addon-name": "multicluster-observability-addon",
}),
}); err != nil {
r.Log.Error(err, "Error listing managedclusteraddon resources in event handler")
return nil
}

if len(addonList.Items) == 0 {
r.Log.V(2).Info("no managedclusteraddon found")
return nil
}

clustersetValue, clustersetExists := obj.GetAnnotations()["cluster.open-cluster-management.io/clusterset"]
var clustersInClusterSet map[string]struct{}
if clustersetExists {
clusterList := &clusterv1.ManagedClusterList{}
if err := r.Client.List(ctx, clusterList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
"cluster.open-cluster-management.io/clusterset": clustersetValue,
}),
}); err != nil {
r.Log.Error(err, "Error listing managedcluster resources in event handler")
return nil
}
clustersInClusterSet = make(map[string]struct{}, len(clusterList.Items))
for _, cluster := range clusterList.Items {
clustersInClusterSet[cluster.Name] = struct{}{}
}
}

var requests []reconcile.Request
for _, addon := range addonList.Items {
_, installed := clustersInClusterSet[addon.Namespace]
if clustersetExists && !installed {
continue
}

requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: addon.Namespace,
Name: addon.Name,
},
})
}
return requests
})
}
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
85 changes: 42 additions & 43 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"time"

"github.com/ViaQ/logerr/v2/log"
certmanagerv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
routev1 "github.com/openshift/api/route/v1"
Expand All @@ -16,11 +17,14 @@ import (
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/rhobs/multicluster-observability-addon/internal/addon"
addonhelm "github.com/rhobs/multicluster-observability-addon/internal/addon/helm"
"github.com/rhobs/multicluster-observability-addon/internal/controllers/watcher"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes/scheme"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
utilflag "k8s.io/component-base/cli/flag"
logs "k8s.io/component-base/logs/api/v1"
Expand All @@ -32,10 +36,27 @@ import (
"open-cluster-management.io/addon-framework/pkg/version"
addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned"
workv1 "open-cluster-management.io/api/work/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(addonapiv1alpha1.AddToScheme(scheme))
utilruntime.Must(workv1.AddToScheme(scheme))
utilruntime.Must(loggingapis.AddToScheme(scheme))
utilruntime.Must(otelv1alpha1.AddToScheme(scheme))
utilruntime.Must(operatorsv1.AddToScheme(scheme))
utilruntime.Must(operatorsv1alpha1.AddToScheme(scheme))
utilruntime.Must(routev1.Install(scheme))
utilruntime.Must(certmanagerv1.AddToScheme(scheme))

// +kubebuilder:scaffold:scheme
}

func main() {
rand.Seed(time.Now().UTC().UnixNano()) // nolint:staticcheck

Expand Down Expand Up @@ -85,6 +106,8 @@ func newControllerCommand() *cobra.Command {
}

func runController(ctx context.Context, kubeConfig *rest.Config) error {
logger := log.NewLogger("mcoa")

addonClient, err := addonv1alpha1client.NewForConfig(kubeConfig)
if err != nil {
return err
Expand All @@ -98,43 +121,6 @@ func runController(ctx context.Context, kubeConfig *rest.Config) error {

registrationOption := addon.NewRegistrationOption(utilrand.String(5))

// Necessary to reconcile ClusterLogging and ClusterLogForwarder
err = loggingapis.AddToScheme(scheme.Scheme)
if err != nil {
return err
}
// Necessary to reconcile OpenTelemetryCollectors
err = otelv1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}
// Necessary to reconcile OperatorGroups
err = operatorsv1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}
// Necessary to reconcile Subscriptions
err = operatorsv1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}
// Necessary for metrics to get Routes hosts
if err = routev1.Install(scheme.Scheme); err != nil {
return err
}

// Necessary to reconcile cert-manager resources
err = certmanagerv1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}

// Reconcile AddOnDeploymentConfig
err = addonapiv1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}

httpClient, err := rest.HTTPClientFor(kubeConfig)
if err != nil {
return err
Expand All @@ -146,7 +132,7 @@ func runController(ctx context.Context, kubeConfig *rest.Config) error {
}

opts := client.Options{
Scheme: scheme.Scheme,
Scheme: scheme,
Mapper: mapper,
HTTPClient: httpClient,
}
Expand All @@ -171,21 +157,34 @@ func runController(ctx context.Context, kubeConfig *rest.Config) error {
).
WithGetValuesFuncs(addonConfigValuesFn, addonhelm.GetValuesFunc(k8sClient)).
WithAgentRegistrationOption(registrationOption).
WithScheme(scheme.Scheme).
WithScheme(scheme).
BuildHelmAgentAddon()
if err != nil {
klog.Errorf("failed to build agent %v", err)
logger.Error(err, "failed to build agent")
return err
}

err = mgr.AddAgent(mcoaAgentAddon)
if err != nil {
klog.Fatal(err)
logger.Error(err, "unable to add mcoa agent addon")
return err
}

disableReconciliation := os.Getenv("DISABLE_WATCHER_CONTROLLER")
if disableReconciliation == "" {
var wm *watcher.WatcherManager
wm, err = watcher.NewWatcherManager(logger, scheme, &mgr)
if err != nil {
logger.Error(err, "unable to create watcher manager")
return err
}

wm.Start(ctx)
}

err = mgr.Start(ctx)
if err != nil {
klog.Fatal(err)
return err
}
<-ctx.Done()

Expand Down
Loading