Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [do-not-merge] 🌱 Generics testing #10464

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 21 additions & 35 deletions bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go
Expand Up @@ -108,30 +108,32 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl
}

b := ctrl.NewControllerManagedBy(mgr).
For(&bootstrapv1.KubeadmConfig{}).
Named("kubeadmConfig").
Add(builder.For(mgr,
&bootstrapv1.KubeadmConfig{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &bootstrapv1.KubeadmConfig{}),
)).
WithOptions(options).
Watches(
Add(builder.Watches(mgr,
&clusterv1.Machine{},
handler.EnqueueRequestsFromMapFunc(r.MachineToBootstrapMapFunc),
).WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue))
handler.EnqueueRequestsFromTypedMapFunc(r.MachineToBootstrapMapFunc),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}),
))

if feature.Gates.Enabled(feature.MachinePool) {
b = b.Watches(
b = b.Add(builder.Watches(mgr,
&expv1.MachinePool{},
handler.EnqueueRequestsFromMapFunc(r.MachinePoolToBootstrapMapFunc),
)
handler.EnqueueRequestsFromTypedMapFunc(r.MachinePoolToBootstrapMapFunc),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &expv1.MachinePool{}),
))
}

b = b.Watches(
b = b.Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.ClusterToKubeadmConfigs),
builder.WithPredicates(
predicates.All(ctrl.LoggerFrom(ctx),
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue),
),
),
)
handler.EnqueueRequestsFromTypedMapFunc(r.ClusterToKubeadmConfigs),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
))

if err := b.Complete(r); err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand Down Expand Up @@ -875,14 +877,8 @@ func (r *KubeadmConfigReconciler) tokenCheckRefreshOrRotationInterval() time.Dur

// ClusterToKubeadmConfigs is a handler.ToRequestsFunc to be used to enqueue
// requests for reconciliation of KubeadmConfigs.
func (r *KubeadmConfigReconciler) ClusterToKubeadmConfigs(ctx context.Context, o client.Object) []ctrl.Request {
func (r *KubeadmConfigReconciler) ClusterToKubeadmConfigs(ctx context.Context, c *clusterv1.Cluster) []ctrl.Request {
result := []ctrl.Request{}

c, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}

selectors := []client.ListOption{
client.InNamespace(c.Namespace),
client.MatchingLabels{
Expand Down Expand Up @@ -923,12 +919,7 @@ func (r *KubeadmConfigReconciler) ClusterToKubeadmConfigs(ctx context.Context, o

// MachineToBootstrapMapFunc is a handler.ToRequestsFunc to be used to enqueue
// request for reconciliation of KubeadmConfig.
func (r *KubeadmConfigReconciler) MachineToBootstrapMapFunc(_ context.Context, o client.Object) []ctrl.Request {
m, ok := o.(*clusterv1.Machine)
if !ok {
panic(fmt.Sprintf("Expected a Machine but got a %T", o))
}

func (r *KubeadmConfigReconciler) MachineToBootstrapMapFunc(_ context.Context, m *clusterv1.Machine) []ctrl.Request {
result := []ctrl.Request{}
if m.Spec.Bootstrap.ConfigRef != nil && m.Spec.Bootstrap.ConfigRef.GroupVersionKind() == bootstrapv1.GroupVersion.WithKind("KubeadmConfig") {
name := client.ObjectKey{Namespace: m.Namespace, Name: m.Spec.Bootstrap.ConfigRef.Name}
Expand All @@ -939,12 +930,7 @@ func (r *KubeadmConfigReconciler) MachineToBootstrapMapFunc(_ context.Context, o

// MachinePoolToBootstrapMapFunc is a handler.ToRequestsFunc to be used to enqueue
// request for reconciliation of KubeadmConfig.
func (r *KubeadmConfigReconciler) MachinePoolToBootstrapMapFunc(_ context.Context, o client.Object) []ctrl.Request {
m, ok := o.(*expv1.MachinePool)
if !ok {
panic(fmt.Sprintf("Expected a MachinePool but got a %T", o))
}

func (r *KubeadmConfigReconciler) MachinePoolToBootstrapMapFunc(_ context.Context, m *expv1.MachinePool) []ctrl.Request {
result := []ctrl.Request{}
configRef := m.Spec.Template.Spec.Bootstrap.ConfigRef
if configRef != nil && configRef.GroupVersionKind().GroupKind() == bootstrapv1.GroupVersion.WithKind("KubeadmConfig").GroupKind() {
Expand Down
Expand Up @@ -57,7 +57,7 @@ func TestKubeadmConfigReconciler_MachineToBootstrapMapFuncReturn(t *testing.T) {
g := NewWithT(t)
cluster := builder.Cluster("my-cluster", metav1.NamespaceDefault).Build()
objs := []client.Object{cluster}
machineObjs := []client.Object{}
machineObjs := []*clusterv1.Machine{}
var expectedConfigName string
for i := 0; i < 3; i++ {
configName := fmt.Sprintf("my-config-%d", i)
Expand Down
12 changes: 3 additions & 9 deletions controllers/external/tracker.go
Expand Up @@ -22,9 +22,8 @@ import (

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -42,7 +41,7 @@ type ObjectTracker struct {
}

// Watch uses the controller to issue a Watch only if the object hasn't been seen before.
func (o *ObjectTracker) Watch(log logr.Logger, obj runtime.Object, handler handler.EventHandler, p ...predicate.Predicate) error {
func (o *ObjectTracker) Watch(log logr.Logger, obj client.Object, handler handler.EventHandler, p ...predicate.Predicate) error {
// Consider this a no-op if the controller isn't present.
if o.Controller == nil {
return nil
Expand All @@ -54,14 +53,9 @@ func (o *ObjectTracker) Watch(log logr.Logger, obj runtime.Object, handler handl
return nil
}

u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)

log.Info(fmt.Sprintf("Adding watch on external object %q", gvk.String()))
err := o.Controller.Watch(
source.Kind(o.Cache, u),
handler,
append(p, predicates.ResourceNotPaused(log))...,
source.Kind(o.Cache, obj, handler, append(p, predicates.ResourceNotPaused(log, obj))...),
)
if err != nil {
o.m.Delete(key)
Expand Down
4 changes: 1 addition & 3 deletions controllers/external/tracker_test.go
Expand Up @@ -24,9 +24,7 @@ import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand Down Expand Up @@ -55,7 +53,7 @@ func newWatchCountController(raiseError bool) *watchCountController {
}
}

func (c *watchCountController) Watch(_ source.Source, _ handler.EventHandler, _ ...predicate.Predicate) error {
func (c *watchCountController) Watch(_ source.Source) error {
c.count++
if c.raiseError {
return errors.New("injected failure")
Expand Down
6 changes: 4 additions & 2 deletions controllers/remote/cluster_cache_reconciler.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -43,9 +44,10 @@ type ClusterCacheReconciler struct {
func (r *ClusterCacheReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
err := ctrl.NewControllerManagedBy(mgr).
Named("remote/clustercache").
For(&clusterv1.Cluster{}).
Add(builder.For(mgr, &clusterv1.Cluster{},
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
)).
WithOptions(options).
WithEventFilter(predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Complete(r)

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions controllers/remote/cluster_cache_tracker.go
Expand Up @@ -536,7 +536,7 @@ func (t *ClusterCacheTracker) deleteAccessor(_ context.Context, cluster client.O
// Watcher is a scoped-down interface from Controller that only knows how to watch.
type Watcher interface {
// Watch watches src for changes, sending events to eventHandler if they pass predicates.
Watch(src source.Source, eventHandler handler.EventHandler, predicates ...predicate.Predicate) error
Watch(src source.Source) error
}

// WatchInput specifies the parameters used to establish a new watch for a remote cluster.
Expand Down Expand Up @@ -585,7 +585,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
}

// Need to create the watch
if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind), input.EventHandler, input.Predicates...); err != nil {
if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

Expand Down
4 changes: 3 additions & 1 deletion controllers/remote/cluster_cache_tracker_test.go
Expand Up @@ -84,8 +84,10 @@ func TestClusterCacheTracker(t *testing.T) {
c = &testController{
ch: make(chan string),
}
w, err = ctrl.NewControllerManagedBy(mgr).For(&clusterv1.MachineDeployment{}).Build(c)

watch, err := ctrl.NewControllerManagedBy(mgr).For(&clusterv1.MachineDeployment{}).Build(c)
g.Expect(err).ToNot(HaveOccurred())
w = watch

mgrContext, mgrCancel = context.WithCancel(ctx)
t.Log("Starting the manager")
Expand Down
34 changes: 16 additions & 18 deletions controlplane/kubeadm/internal/controllers/controller.go
Expand Up @@ -91,20 +91,23 @@ type KubeadmControlPlaneReconciler struct {

func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
c, err := ctrl.NewControllerManagedBy(mgr).
For(&controlplanev1.KubeadmControlPlane{}).
Owns(&clusterv1.Machine{}).
Named("kubeadmControlPlane").
Add(builder.For(mgr,
&controlplanev1.KubeadmControlPlane{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &controlplanev1.KubeadmControlPlane{}),
)).
Add(builder.Owns(mgr,
&controlplanev1.KubeadmControlPlane{},
&clusterv1.Machine{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}),
)).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Watches(
Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.ClusterToKubeadmControlPlane),
builder.WithPredicates(
predicates.All(ctrl.LoggerFrom(ctx),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue),
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
),
),
).Build(r)
handler.EnqueueRequestsFromTypedMapFunc(r.ClusterToKubeadmControlPlane),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
)).Build(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}
Expand Down Expand Up @@ -574,12 +577,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, con

// ClusterToKubeadmControlPlane is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation
// for KubeadmControlPlane based on updates to a Cluster.
func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(_ context.Context, o client.Object) []ctrl.Request {
c, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}

func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(_ context.Context, c *clusterv1.Cluster) []ctrl.Request {
controlPlaneRef := c.Spec.ControlPlaneRef
if controlPlaneRef != nil && controlPlaneRef.Kind == kubeadmControlPlaneKind {
return []ctrl.Request{{NamespacedName: client.ObjectKey{Namespace: controlPlaneRef.Namespace, Name: controlPlaneRef.Name}}}
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/internal/webhooks/scale.go
Expand Up @@ -45,7 +45,7 @@ func (v *ScaleValidator) SetupWebhookWithManager(mgr ctrl.Manager) error {
// ScaleValidator validates KCP for replicas.
type ScaleValidator struct {
Client client.Reader
decoder *admission.Decoder
decoder admission.Decoder
}

// Handle will validate for number of replicas.
Expand Down
22 changes: 9 additions & 13 deletions exp/addons/internal/controllers/clusterresourceset_controller.go
Expand Up @@ -18,7 +18,6 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -67,11 +66,15 @@ type ClusterResourceSetReconciler struct {

func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&addonsv1.ClusterResourceSet{}).
Watches(
Named("clusterResourceSet").
Add(builder.For(mgr, &addonsv1.ClusterResourceSet{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &addonsv1.ClusterResourceSet{}),
)).
Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.clusterToClusterResourceSet),
).
handler.EnqueueRequestsFromTypedMapFunc(r.clusterToClusterResourceSet),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
)).
WatchesMetadata(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(r.resourceToClusterResourceSet),
Expand All @@ -87,7 +90,6 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr
),
).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Complete(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand Down Expand Up @@ -434,14 +436,8 @@ func (r *ClusterResourceSetReconciler) ensureResourceOwnerRef(ctx context.Contex
}

// clusterToClusterResourceSet is mapper function that maps clusters to ClusterResourceSet.
func (r *ClusterResourceSetReconciler) clusterToClusterResourceSet(ctx context.Context, o client.Object) []ctrl.Request {
func (r *ClusterResourceSetReconciler) clusterToClusterResourceSet(ctx context.Context, cluster *clusterv1.Cluster) []ctrl.Request {
result := []ctrl.Request{}

cluster, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}

resourceList := &addonsv1.ClusterResourceSetList{}
if err := r.Client.List(ctx, resourceList, client.InNamespace(cluster.Namespace)); err != nil {
return nil
Expand Down
Expand Up @@ -23,6 +23,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -49,13 +50,14 @@ type ClusterResourceSetBindingReconciler struct {

func (r *ClusterResourceSetBindingReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
err := ctrl.NewControllerManagedBy(mgr).
Named("clusterResourceSetBinding").
For(&addonsv1.ClusterResourceSetBinding{}).
Watches(
Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.clusterToClusterResourceSetBinding),
).
handler.EnqueueRequestsFromTypedMapFunc(r.clusterToClusterResourceSetBinding),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
)).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Complete(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand Down Expand Up @@ -106,7 +108,7 @@ func (r *ClusterResourceSetBindingReconciler) Reconcile(ctx context.Context, req
}

// clusterToClusterResourceSetBinding is mapper function that maps clusters to ClusterResourceSetBinding.
func (r *ClusterResourceSetBindingReconciler) clusterToClusterResourceSetBinding(_ context.Context, o client.Object) []ctrl.Request {
func (r *ClusterResourceSetBindingReconciler) clusterToClusterResourceSetBinding(_ context.Context, o *clusterv1.Cluster) []ctrl.Request {
return []reconcile.Request{
{
NamespacedName: client.ObjectKey{
Expand Down
20 changes: 9 additions & 11 deletions exp/internal/controllers/machinepool_controller.go
Expand Up @@ -88,20 +88,18 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M
}

c, err := ctrl.NewControllerManagedBy(mgr).
For(&expv1.MachinePool{}).
Named("machinepool").
Add(builder.For(mgr,
&expv1.MachinePool{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &expv1.MachinePool{}))).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Watches(
Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(clusterToMachinePools),
handler.EnqueueRequestsFromTypedMapFunc(clusterToMachinePools),
// TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources?
builder.WithPredicates(
predicates.All(ctrl.LoggerFrom(ctx),
predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue),
),
),
).
predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
)).
Build(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand Down
2 changes: 1 addition & 1 deletion exp/internal/webhooks/machinepool.go
Expand Up @@ -55,7 +55,7 @@ func (webhook *MachinePool) SetupWebhookWithManager(mgr ctrl.Manager) error {

// MachinePool implements a validation and defaulting webhook for MachinePool.
type MachinePool struct {
decoder *admission.Decoder
decoder admission.Decoder
}

var _ webhook.CustomValidator = &MachinePool{}
Expand Down