Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds controller to allow addon to reconcile on changes #46

Merged
merged 16 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/resources/cluster-management-addon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
supportedConfigs:
# Describes the general addon configuration applicable for all managed clusters. It includes:
# - Default subscription channel name for install the `Red Hat OpenShift Logging` operator on each managed cluster.
# - Default subscription channel name for install the `Red Hat OpenShift distributed tracing data collection` operator on each managed cluster.
# - Default subscription channel name for install the `Red Hat OpenShift distributed tracing platform` operator on each managed cluster.
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
- group: addon.open-cluster-management.io
resource: addondeploymentconfigs
defaultConfig:
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ require (
sigs.k8s.io/controller-runtime v0.17.0
)

require gopkg.in/yaml.v3 v3.0.1
require (
github.com/go-logr/logr v1.4.1
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/BurntSushi/toml v1.2.1 // indirect
Expand All @@ -51,7 +54,6 @@ require (
github.com/fatih/structs v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down
173 changes: 173 additions & 0 deletions internal/controllers/watcher/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package watcher

import (
"context"
"errors"
"fmt"

"github.com/go-logr/logr"
otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
loggingv1 "github.com/openshift/cluster-logging-operator/apis/logging/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"open-cluster-management.io/addon-framework/pkg/addonmanager"
addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
workapiv1 "open-cluster-management.io/api/work/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var noReconcilePred = builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool { return false },
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
})

type WatcherManager struct {
mgr *ctrl.Manager
logger logr.Logger
}

func NewWatcherManager(logger logr.Logger, scheme *runtime.Scheme, addonManager *addonmanager.AddonManager) (*WatcherManager, error) {
l := logger.WithName("mcoa-watcher")

ctrl.SetLogger(l)

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
if err != nil {
return nil, fmt.Errorf("unable to start manager: %w", err)
}

if err = (&WatcherReconciler{
Client: mgr.GetClient(),
Log: l.WithName("controllers").WithName("mcoa-watcher"),
Scheme: mgr.GetScheme(),
addonnManager: addonManager,
}).SetupWithManager(mgr); err != nil {
return nil, fmt.Errorf("unable to create mcoa-watcher controller: %w", err)
}

if err = mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
return nil, fmt.Errorf("unable to set up health check: %w", err)
}
if err = mgr.AddReadyzCheck("check", healthz.Ping); err != nil {
return nil, fmt.Errorf("unable to set up ready check: %w", err)
}

wm := WatcherManager{
mgr: &mgr,
logger: l,
}

return &wm, nil
}

func (wm *WatcherManager) Start(ctx context.Context) {
wm.logger.Info("Starting watcher manager")
go func() {
err := (*wm.mgr).Start(ctx)
if err != nil {
wm.logger.Error(err, "there was an error while running the reconciliation watcher")
}
}()
periklis marked this conversation as resolved.
Show resolved Hide resolved
}

// WatcherReconciler triggers reconciliation in the AddonManager when something changes in an upstream resource
type WatcherReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
addonnManager *addonmanager.AddonManager
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
}

var (
noNameErr = errors.New("no name for reconciliation request")
noNamespaceErr = errors.New("no namespace for reconciliation request")
)

// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
func (r *WatcherReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if req.Name == "" {
return ctrl.Result{}, noNameErr
}
if req.Namespace == "" {
return ctrl.Result{}, noNamespaceErr
}
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
(*r.addonnManager).Trigger(req.Namespace, req.Name)

r.Log.V(2).Info("reconciliation triggered", "cluster", req.Namespace)

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *WatcherReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&addonapiv1alpha1.ManagedClusterAddOn{}, noReconcilePred).
Watches(&corev1.Secret{}, r.enqueueForClusterSpecificResource(), builder.OnlyMetadata).
Watches(&loggingv1.ClusterLogForwarder{}, r.enqueueForClusterSpecificResource(), builder.OnlyMetadata).
Watches(&otelv1alpha1.OpenTelemetryCollector{}, r.enqueueForClusterSpecificResource(), builder.OnlyMetadata).
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
Complete(r)
}

func (r *WatcherReconciler) enqueueForClusterSpecificResource() handler.EventHandler {
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
key := client.ObjectKey{Name: "multicluster-observability-addon", Namespace: obj.GetNamespace()}
addon := &addonapiv1alpha1.ManagedClusterAddOn{}
if err := r.Client.Get(ctx, key, addon); err != nil {
if apierrors.IsNotFound(err) {
switch obj.(type) {
case *corev1.Secret:
return r.getSecretReconcileRequests(ctx, obj, addon)
}
return nil
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
}
r.Log.Error(err, "Error getting managedclusteraddon resources in event handler")
return nil
}

return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: addon.Name,
Namespace: addon.Namespace,
},
},
}
})
}

// getSecretReconcileRequests gets reconcile.Request for secrets referenced in ManifestWorks.
func (r *WatcherReconciler) getSecretReconcileRequests(ctx context.Context, obj client.Object, addon *addonapiv1alpha1.ManagedClusterAddOn) []reconcile.Request {
rqs := []reconcile.Request{}
mws := &workapiv1.ManifestWorkList{}
if err := r.Client.List(ctx, mws); err != nil {
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
for _, mw := range mws.Items {
for _, m := range mw.Spec.Workload.Manifests {
if equality.Semantic.DeepEqual(m.Object, obj) {
rqs = append(rqs,
// Trigger a reconcile request for the addon in the ManifestWork namespace
reconcile.Request{
NamespacedName: types.NamespacedName{
Name: addon.Name,
Namespace: mw.Namespace,
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
},
},
)
}
}
}
}
return rqs
}
85 changes: 42 additions & 43 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"time"

"github.com/ViaQ/logerr/v2/log"
certmanagerv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
otelv1alpha1 "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
routev1 "github.com/openshift/api/route/v1"
Expand All @@ -16,11 +17,14 @@ import (
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/rhobs/multicluster-observability-addon/internal/addon"
addonhelm "github.com/rhobs/multicluster-observability-addon/internal/addon/helm"
"github.com/rhobs/multicluster-observability-addon/internal/controllers/watcher"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes/scheme"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
utilflag "k8s.io/component-base/cli/flag"
logs "k8s.io/component-base/logs/api/v1"
Expand All @@ -32,10 +36,27 @@ import (
"open-cluster-management.io/addon-framework/pkg/version"
addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned"
workv1 "open-cluster-management.io/api/work/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(addonapiv1alpha1.AddToScheme(scheme))
utilruntime.Must(workv1.AddToScheme(scheme))
utilruntime.Must(loggingapis.AddToScheme(scheme))
utilruntime.Must(otelv1alpha1.AddToScheme(scheme))
utilruntime.Must(operatorsv1.AddToScheme(scheme))
utilruntime.Must(operatorsv1alpha1.AddToScheme(scheme))
utilruntime.Must(routev1.Install(scheme))
utilruntime.Must(certmanagerv1.AddToScheme(scheme))

// +kubebuilder:scaffold:scheme
}

func main() {
rand.Seed(time.Now().UTC().UnixNano()) // nolint:staticcheck

Expand Down Expand Up @@ -85,6 +106,8 @@ func newControllerCommand() *cobra.Command {
}

func runController(ctx context.Context, kubeConfig *rest.Config) error {
logger := log.NewLogger("mcoa")

addonClient, err := addonv1alpha1client.NewForConfig(kubeConfig)
if err != nil {
return err
Expand All @@ -98,43 +121,6 @@ func runController(ctx context.Context, kubeConfig *rest.Config) error {

registrationOption := addon.NewRegistrationOption(utilrand.String(5))

// Necessary to reconcile ClusterLogging and ClusterLogForwarder
err = loggingapis.AddToScheme(scheme.Scheme)
if err != nil {
return err
}
// Necessary to reconcile OpenTelemetryCollectors
err = otelv1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}
// Necessary to reconcile OperatorGroups
err = operatorsv1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}
// Necessary to reconcile Subscriptions
err = operatorsv1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}
// Necessary for metrics to get Routes hosts
if err = routev1.Install(scheme.Scheme); err != nil {
return err
}

// Necessary to reconcile cert-manager resources
err = certmanagerv1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}

// Reconcile AddOnDeploymentConfig
err = addonapiv1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
return err
}

httpClient, err := rest.HTTPClientFor(kubeConfig)
if err != nil {
return err
Expand All @@ -146,7 +132,7 @@ func runController(ctx context.Context, kubeConfig *rest.Config) error {
}

opts := client.Options{
Scheme: scheme.Scheme,
Scheme: scheme,
Mapper: mapper,
HTTPClient: httpClient,
}
Expand All @@ -171,21 +157,34 @@ func runController(ctx context.Context, kubeConfig *rest.Config) error {
).
WithGetValuesFuncs(addonConfigValuesFn, addonhelm.GetValuesFunc(k8sClient)).
WithAgentRegistrationOption(registrationOption).
WithScheme(scheme.Scheme).
WithScheme(scheme).
BuildHelmAgentAddon()
if err != nil {
klog.Errorf("failed to build agent %v", err)
logger.Error(err, "failed to build agent")
return err
}

err = mgr.AddAgent(mcoaAgentAddon)
if err != nil {
klog.Fatal(err)
logger.Error(err, "unable to add mcoa agent addon")
return err
}

disableReconciliation := os.Getenv("DISABLE_WATCHER_CONTROLLER")
if disableReconciliation == "" {
var wm *watcher.WatcherManager
wm, err = watcher.NewWatcherManager(logger, scheme, &mgr)
if err != nil {
logger.Error(err, "unable to create watcher manager")
return err
}

wm.Start(ctx)
}

err = mgr.Start(ctx)
if err != nil {
klog.Fatal(err)
return err
}
<-ctx.Done()

Expand Down
Loading