Skip to content

Commit

Permalink
Add cluster webhook
Browse files Browse the repository at this point in the history
Signed-off-by: sumeng <zssumeng@gmail.com>
  • Loading branch information
sumengzs committed Apr 9, 2024
1 parent 035a96a commit d47133c
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 10 deletions.
52 changes: 46 additions & 6 deletions controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@ package controllers

import (
"context"
"github.com/sumengzs/multi-cluster/pkg/cluster"
"github.com/sumengzs/multi-cluster/pkg/pool"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

sumengzscnv1beta1 "github.com/sumengzs/multi-cluster/api/v1beta1"
"github.com/sumengzs/multi-cluster/api/v1beta1"
)

// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
// ClusterController reconciles a Cluster object
type ClusterController struct {
client.Client
Pool pool.Interface
Scheme *runtime.Scheme
}

Expand All @@ -46,7 +52,7 @@ type ClusterReconciler struct {
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *ClusterController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here
Expand All @@ -55,8 +61,42 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ClusterController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&sumengzscnv1beta1.Cluster{}).
For(&v1beta1.Cluster{}).
WithEventFilter(r.Predicate()).
Complete(r)
}

func (r *ClusterController) Predicate() predicate.Predicate {
return predicate.Funcs{
DeleteFunc: func(event event.DeleteEvent) bool {
clu := event.Object.(*v1beta1.Cluster)
r.Pool.Remove(clu.Name)
return false
},
UpdateFunc: func(event event.UpdateEvent) bool {
return false
},
CreateFunc: func(event event.CreateEvent) bool {
clu := event.Object.(*v1beta1.Cluster)
cc, err := cluster.
By(r.Client).
WithScheme(r.Scheme).
Named(clu.Name).
WithOptions().
Complete()
if err != nil {
klog.Errorf("error creating cluster %s: %v", clu.Name, err)
return false
}
err = r.Pool.Add(cc)
if err != nil {
klog.Errorf("error add cluster to pool %s: %v", cc.Name(), err)
r.Pool.Remove(cc.Name())
return false
}
return false
},
}
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
os.Exit(1)
}

if err = (&controllers.ClusterReconciler{
if err = (&controllers.ClusterController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Builder struct {
clusterName string
master client.Client
scheme *runtime.Scheme
options []InitOptions
}

func By(master client.Client) *Builder {
Expand All @@ -30,6 +31,11 @@ func (b *Builder) WithScheme(scheme *runtime.Scheme) *Builder {
return b
}

func (b *Builder) WithOptions(opts ...InitOptions) *Builder {
b.options = opts
return b
}

func (b *Builder) Named(clusterName string) *Builder {
b.clusterName = clusterName
return b
Expand All @@ -47,7 +53,7 @@ func (b *Builder) Complete() (Interface, error) {
if err != nil {
return nil, fmt.Errorf("failed to load client rest config: %s", err)
}
cluster, err := New(config, b.scheme)
cluster, err := New(config, b.scheme, b.options...)
if err != nil {
return nil, fmt.Errorf("failed to create cluster: %s", err)
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (

var _ Interface = &cluster{}

type InitOptions func(Interface) error

type cluster struct {
name string
ctx context.Context
Expand All @@ -50,7 +52,7 @@ type cluster struct {

// New returns a new cluster or error
// default status code is Stopped
func New(config *rest.Config, scheme *runtime.Scheme) (Interface, error) {
func New(config *rest.Config, scheme *runtime.Scheme, options ...InitOptions) (Interface, error) {
var clu *cluster
var err error

Expand All @@ -74,6 +76,7 @@ func New(config *rest.Config, scheme *runtime.Scheme) (Interface, error) {
if clu.extensions, err = clientset.NewForConfig(config); err != nil {
return nil, fmt.Errorf("failed to create api-extensions client: %s", err)
}
clu.extensions.ApiextensionsV1beta1().CustomResourceDefinitions()

if clu.discovery, err = discovery.NewDiscoveryClientForConfig(config); err != nil {
return nil, fmt.Errorf("failed to create discovery client: %s", err)
Expand All @@ -82,7 +85,13 @@ func New(config *rest.Config, scheme *runtime.Scheme) (Interface, error) {
clu.status = Stopped
clu.scheme = scheme

return &cluster{}, nil
for _, option := range options {
if err = option(clu); err != nil {
return nil, fmt.Errorf("failed to initialize options: %s", err)
}
}

return clu, nil
}

func (c *cluster) Start(ctx context.Context) error {
Expand Down
119 changes: 119 additions & 0 deletions pkg/webhook/cluster/validating/cluster_create_update_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Copyright 2023 The Multi Cluster Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package validating

import (
"context"
"github.com/sumengzs/multi-cluster/api/v1beta1"
"net/http"

admissionv1 "k8s.io/api/admission/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

var _ admission.Handler = &ClusterCreateUpdateHandler{}
var _ admission.DecoderInjector = &ClusterCreateUpdateHandler{}
var _ inject.Client = &ClusterCreateUpdateHandler{}

// ClusterCreateUpdateHandler handles Cluster
type ClusterCreateUpdateHandler struct {
// To use the client, you need to do the following:
// - uncomment it
// - import sigs.k8s.io/controller-runtime/pkg/client
// - uncomment the InjectClient method at the bottom of this file.
Client client.Client

// Decoder decodes objects
Decoder *admission.Decoder
}

// Handle handles admission requests.
func (h *ClusterCreateUpdateHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
switch req.AdmissionRequest.Operation {
case admissionv1.Create:
obj := &v1beta1.Cluster{}
if err := h.Decoder.Decode(req, obj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
errList := h.validateCluster(obj)
if len(errList) != 0 {
klog.ErrorS(errList.ToAggregate(), "Invalid cluster error")
return admission.Errored(http.StatusUnprocessableEntity, errList.ToAggregate())
}
case admissionv1.Update:
obj := &v1beta1.Cluster{}
if err := h.Decoder.Decode(req, obj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
oldObj := &v1beta1.Cluster{}
if err := h.Decoder.DecodeRaw(req.AdmissionRequest.OldObject, oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
errList := h.validateClusterUpdate(oldObj, obj)
if len(errList) != 0 {
klog.ErrorS(errList.ToAggregate(), "Invalid cluster error")
return admission.Errored(http.StatusUnprocessableEntity, errList.ToAggregate())
}
}
klog.Infof("handle cluster create update request successfully")
return admission.ValidationResponse(true, "")
}

func (h *ClusterCreateUpdateHandler) validateClusterUpdate(oldObj, newObj *v1beta1.Cluster) field.ErrorList {
latestObject := &v1beta1.Cluster{}
key := client.ObjectKeyFromObject(newObj)
err := h.Client.Get(context.TODO(), key, latestObject)
if err != nil {
return field.ErrorList{field.InternalError(field.NewPath("cluster"), err)}
}
if errorList := h.validateCluster(newObj); errorList != nil {
return errorList
}

return nil
}

func (h *ClusterCreateUpdateHandler) validateCluster(obj *v1beta1.Cluster) field.ErrorList {
return h.validateClusterSpec(&obj.Spec, field.NewPath("Spec"))
}

func (h *ClusterCreateUpdateHandler) validateClusterSpec(spec *v1beta1.ClusterSpec, path *field.Path) field.ErrorList {
return h.validateSpecConnectConfig(spec.Connect, path.Child("ConnectConfig"))
}

func (h *ClusterCreateUpdateHandler) validateSpecConnectConfig(config v1beta1.ConnectConfig, path *field.Path) field.ErrorList {
if config.Secret == nil && config.Config == nil && config.Token == nil {
return field.ErrorList{field.Invalid(path, "", "Secret, Config and Token cannot be empty as the same time")}
}
return nil
}

// InjectClient injects the client into the ClusterCreateUpdateHandler
func (h *ClusterCreateUpdateHandler) InjectClient(c client.Client) error {
h.Client = c
return nil
}

// InjectDecoder injects the decoder into the ClusterCreateUpdateHandler
func (h *ClusterCreateUpdateHandler) InjectDecoder(d *admission.Decoder) error {
h.Decoder = d
return nil
}
30 changes: 30 additions & 0 deletions pkg/webhook/cluster/validating/webhooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
Copyright 2023 The Multi Cluster Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package validating

import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

// +kubebuilder:webhook:path=/vcluster,mutating=false,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=sumengzs.cn,resources=clusters,verbs=create;update,versions=v1beta1,name=vcluster.kb.io

var (
// HandlerMap contains admission webhook handlers
HandlerMap = map[string]admission.Handler{
"vcluster": &ClusterCreateUpdateHandler{},
}
)

0 comments on commit d47133c

Please sign in to comment.