/
dynamic_controller.go
131 lines (109 loc) · 4.4 KB
/
dynamic_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package crds
import (
"context"
"sync"
"time"
"github.com/go-logr/logr"
"github.com/samber/lo"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/utils"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
)
// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=list;watch
type Controller interface {
SetupWithManager(mgr ctrl.Manager) error
}
// DynamicCRDController ensures that RequiredCRDs are installed in the cluster and only then sets up its Controller
// that depends on them.
// In case the CRDs are not installed at start-up time, DynamicCRDController will set up a watch for CustomResourceDefinition
// and will dynamically set up its Controller once it detects that all RequiredCRDs are already in place.
type DynamicCRDController struct {
Log logr.Logger
Manager ctrl.Manager
CacheSyncTimeout time.Duration
Controller Controller
RequiredCRDs []schema.GroupVersionResource
// startControllerOnce ensures that the controller is started only once.
startControllerOnce sync.Once
}
func (r *DynamicCRDController) SetupWithManager(mgr ctrl.Manager) error {
if r.allRequiredCRDsInstalled() {
r.Log.V(util.DebugLevel).Info("All required CustomResourceDefinitions are installed, skipping DynamicCRDController set up")
return r.setupController(mgr)
}
r.Log.Info("Required CustomResourceDefinitions are not installed, setting up a watch for them in case they are installed afterward")
c, err := controller.New("DynamicCRDController", mgr, controller.Options{
Reconciler: r,
LogConstructor: func(_ *reconcile.Request) logr.Logger {
return r.Log
},
CacheSyncTimeout: r.CacheSyncTimeout,
})
if err != nil {
return err
}
return c.Watch(
source.Kind(mgr.GetCache(), &apiextensionsv1.CustomResourceDefinition{}),
&handler.EnqueueRequestForObject{},
predicate.NewPredicateFuncs(r.isOneOfRequiredCRDs),
)
}
func (r *DynamicCRDController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("CustomResourceDefinition", req.NamespacedName)
crd := new(apiextensionsv1.CustomResourceDefinition)
if err := r.Manager.GetClient().Get(ctx, req.NamespacedName, crd); err != nil {
if apierrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("Object enqueued no longer exists, skipping", "name", req.Name)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
log.V(util.DebugLevel).Info("Processing CustomResourceDefinition", "name", req.Name)
if !r.allRequiredCRDsInstalled() {
log.V(util.DebugLevel).Info("Still not all required CustomResourceDefinitions are installed, waiting")
return ctrl.Result{}, nil
}
var startControllerErr error
r.startControllerOnce.Do(func() {
log.V(util.InfoLevel).Info("All required CustomResourceDefinitions are installed, setting up the controller")
startControllerErr = r.setupController(r.Manager)
})
if startControllerErr != nil {
return ctrl.Result{}, startControllerErr
}
return ctrl.Result{}, nil
}
func (r *DynamicCRDController) SetLogger(logger logr.Logger) {
r.Log = logger
}
func (r *DynamicCRDController) allRequiredCRDsInstalled() bool {
return lo.EveryBy(r.RequiredCRDs, func(gvr schema.GroupVersionResource) bool {
return utils.CRDExists(r.Manager.GetClient().RESTMapper(), gvr)
})
}
func (r *DynamicCRDController) isOneOfRequiredCRDs(obj client.Object) bool {
crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
return false
}
return lo.ContainsBy(r.RequiredCRDs, func(gvr schema.GroupVersionResource) bool {
versionMatches := lo.ContainsBy(crd.Spec.Versions, func(crdv apiextensionsv1.CustomResourceDefinitionVersion) bool {
return crdv.Name == gvr.Version
})
return crd.Spec.Group == gvr.Group &&
crd.Status.AcceptedNames.Plural == gvr.Resource &&
versionMatches
})
}
func (r *DynamicCRDController) setupController(mgr ctrl.Manager) error {
return r.Controller.SetupWithManager(mgr)
}