Skip to content

Commit

Permalink
Add support for expiring workload cluster kubeconfigs
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuckal777 committed Mar 15, 2024
1 parent ce2629b commit 36a5c54
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 95 deletions.
14 changes: 14 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2024 SAP SE
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package constants

const (
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ module github.com/sapcc/runtime-extension-maintenance-controller
go 1.22

require (
github.com/evanphx/json-patch/v5 v5.8.0
github.com/evanphx/json-patch v5.6.0+incompatible
github.com/go-logr/logr v1.4.1
github.com/onsi/ginkgo/v2 v2.16.0
github.com/onsi/gomega v1.31.1
go.uber.org/zap v1.26.0
k8s.io/api v0.29.0
k8s.io/apimachinery v0.29.0
k8s.io/client-go v0.29.0
Expand All @@ -19,6 +20,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.8.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
Expand Down Expand Up @@ -48,7 +50,6 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
Expand Down
104 changes: 104 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2024 SAP SE
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"
"os"

"github.com/sapcc/runtime-extension-maintenance-controller/management"
"github.com/sapcc/runtime-extension-maintenance-controller/workload"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/runtime"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
clusterv1beta1 "sigs.k8s.io/cluster-api/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
ctrlconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(clusterv1beta1.AddToScheme(scheme))
}

func main() {
var kubecontext string
opts := zap.Options{
Development: true,
TimeEncoder: zapcore.ISO8601TimeEncoder,
}
flag.StringVar(&kubecontext, "kubecontext", "", "The context to use from the kubeconfig (defaults to current-context)")
opts.BindFlags(flag.CommandLine)
flag.Parse()

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
restConfig := getKubeconfigOrDie(kubecontext)
setupLog.Info("loaded kubeconfig", "context", kubecontext, "host", restConfig.Host)

mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
LeaderElection: true,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
connections := workload.NewClusterConnections(mgr.GetClient(), func() context.Context { return ctx })
err = (&management.MachineReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("management"),
WorkloadNodeControllers: map[string]*workload.NodeController{},
CancelFuncs: map[string]context.CancelFunc{},
WorkloadContextFunc: func() context.Context {
return ctx
},
ClusterConnections: connections,
}).SetupWithManager(mgr)
if err != nil {
setupLog.Error(err, "unable to create machine controller")
os.Exit(1)
}

setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
setupLog.Info("received SIGTERM or SIGINT. See you later.")
}

func getKubeconfigOrDie(kubecontext string) *rest.Config {
if kubecontext == "" {
kubecontext = os.Getenv("KUBECONTEXT")
}
restConfig, err := ctrlconfig.GetConfigWithContext(kubecontext)
if err != nil {
setupLog.Error(err, "Failed to load kubeconfig")
os.Exit(1)
}
return restConfig
}
98 changes: 67 additions & 31 deletions management/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ package management

import (
"context"
"encoding/json"
"fmt"

jsonpatch "github.com/evanphx/json-patch"
"github.com/go-logr/logr"
"github.com/sapcc/runtime-extension-maintenance-controller/constants"
"github.com/sapcc/runtime-extension-maintenance-controller/workload"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
corev1_informers "k8s.io/client-go/informers/core/v1"
clusterv1beta1 "sigs.k8s.io/cluster-api/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -44,8 +44,8 @@ const (
type MachineReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
WorkloadNodeControllers map[string]*workload.NodeController
ClusterConnections *workload.ClusterConnections
// A WorkloadNodeController needs an interruptable long-running context.
// Reconcile may get a short context, so the long-running context is
// fetched from a factory function.
Expand All @@ -63,14 +63,14 @@ func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
if !ok || isMaintenanceController != MaintenanceControllerLabelValue {
return ctrl.Result{}, r.cleanupMachine(ctx, &machine, clusterName)
}
workloadController, ok := r.WorkloadNodeControllers[clusterName]
_, ok = r.WorkloadNodeControllers[clusterName]
clusterKey := types.NamespacedName{Namespace: machine.Namespace, Name: clusterName}
if !ok {
clusterKey := types.NamespacedName{Namespace: machine.Namespace, Name: clusterName}
var err error
workloadController, err = makeNodeCtrl(ctx, NodeControllerParamaters{
workloadController, err := makeNodeCtrl(ctx, NodeControllerParamaters{
cluster: clusterKey,
managementClient: r.Client,
log: r.Log,
log: ctrl.Log.WithName("workload"),
connections: r.ClusterConnections,
})
if err != nil {
return ctrl.Result{}, err
Expand All @@ -85,15 +85,15 @@ func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}
nodeName := machine.Status.NodeRef.Name
node, err := workloadController.Lister().Get(nodeName)
node, err := r.ClusterConnections.GetNode(ctx, workload.GetNodeParams{Cluster: clusterKey, Name: nodeName})
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get node %s in workload cluster %s node: %w", nodeName, clusterName, err)
}
originalNode := node.DeepCopy()
originalMachine := machine.DeepCopy()
r.propagateState(&machine, node)

if err := workloadController.PatchNode(ctx, node, originalNode); err != nil {
if err := r.patchNode(ctx, patchNodeParams{cluster: clusterKey, current: node, original: originalNode}); err != nil {
return ctrl.Result{}, err
}
r.Log.Info("patched node", "node", node.Name)
Expand Down Expand Up @@ -125,50 +125,52 @@ func (r *MachineReconciler) cleanupMachine(ctx context.Context, machine *cluster
cancel()
delete(r.CancelFuncs, cluster)
delete(r.WorkloadNodeControllers, cluster)
r.ClusterConnections.DeleteConn(types.NamespacedName{
Namespace: machine.Namespace,
Name: cluster,
})
r.Log.Info("stopped workload node reconciler, no machines enabled", "cluster", cluster)
}
return nil
}

type NodeControllerParamaters struct {
cluster types.NamespacedName
connections *workload.ClusterConnections
managementClient client.Client
log logr.Logger
}

// RBAC-Limited kubeconfigs are currently not possible: https://github.com/kubernetes-sigs/cluster-api/issues/5553
// and https://github.com/kubernetes-sigs/cluster-api/issues/3661
func makeNodeCtrl(ctx context.Context, params NodeControllerParamaters) (*workload.NodeController, error) {
// name string, ns string
secretKey := types.NamespacedName{Namespace: params.cluster.Namespace, Name: params.cluster.Name + "-kubeconfig"}
var kubeConfigSecret corev1.Secret
if err := params.managementClient.Get(ctx, secretKey, &kubeConfigSecret); err != nil {
return nil, err
}
kubeconfig, ok := kubeConfigSecret.Data["value"]
if !ok {
return nil, fmt.Errorf("secret %s has no value key", params.cluster.String())
}
restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to load workload cluster %s kubeconfig: %w", params.cluster.String(), err)
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to create clientset workload cluster %s: %w", params.cluster.String(), err)
}
controller, err := workload.NewNodeController(workload.NodeControllerOptions{
Log: params.log.WithName(params.cluster.Name),
Log: params.log,
ManagementClient: params.managementClient,
WorkloadClient: clientset,
Connections: params.connections,
Cluster: params.cluster,
})
if err != nil {
return nil, err
}
workloadClient, err := workload.MakeClient(ctx, params.managementClient, params.cluster)
if err != nil {
return nil, fmt.Errorf(
"failed to initialize node controller for workload cluster %s: %w",
params.cluster.String(),
err,
)
}
conn := workload.NewConnection(
workloadClient,
func(ni corev1_informers.NodeInformer) {
err := controller.AttachTo(ni)
if err != nil {
params.log.Error(err, "failed to attach workload node controller to informer", "cluster", params.cluster.String())
}
},
)
params.connections.AddConn(ctx, params.cluster, conn)
return &controller, nil
}

Expand Down Expand Up @@ -216,6 +218,40 @@ func (r *MachineReconciler) patchMachine(ctx context.Context, current, original
return nil
}

type patchNodeParams struct {
cluster types.NamespacedName
current *corev1.Node
original *corev1.Node
}

func (r *MachineReconciler) patchNode(ctx context.Context, params patchNodeParams) error {
if equality.Semantic.DeepEqual(params.current, params.original) {
return nil
}
originalMarshaled, err := json.Marshal(params.original)
if err != nil {
return err
}
currentMarshaled, err := json.Marshal(params.current)
if err != nil {
return err
}
patch, err := jsonpatch.CreateMergePatch(originalMarshaled, currentMarshaled)
if err != nil {
return err
}
patchParams := workload.PatchNodeParams{
Cluster: params.cluster,
Name: params.current.Name,
MergePatch: patch,
}
if err := r.ClusterConnections.PatchNode(ctx, patchParams); err != nil {
return err
}
r.Log.Info("patched node", "node", params.current.Name)
return nil
}

func (r *MachineReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
Expand Down
3 changes: 2 additions & 1 deletion management/management_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,16 @@ var _ = BeforeSuite(func() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
stopController = cancel

connections := workload.NewClusterConnections(managementClient, func() context.Context { return ctx })
err = (&management.MachineReconciler{
Client: managementClient,
Log: GinkgoLogr,
Scheme: scheme.Scheme,
WorkloadNodeControllers: map[string]*workload.NodeController{},
CancelFuncs: map[string]context.CancelFunc{},
WorkloadContextFunc: func() context.Context {
return ctx
},
ClusterConnections: connections,
}).SetupWithManager(k8sManager)
Expect(err).To(Succeed())

Expand Down
Loading

0 comments on commit 36a5c54

Please sign in to comment.