Skip to content

Commit

Permalink
chore: modernize machined/pkg/controllers/k8s
Browse files Browse the repository at this point in the history
This is going to be multipart effort to finally use safe.* wrappers in the production code.
Quick regexp search shows that there are around 150 direct type assertions on resources (excluding the ones in this commit).

Also - migrate from `interface{}` to `any` and use `slices.Sort*` instead of `sort.*` where possible.

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
DmitriyMV committed Dec 15, 2023
1 parent 760f793 commit 59b6239
Show file tree
Hide file tree
Showing 22 changed files with 122 additions and 120 deletions.
13 changes: 4 additions & 9 deletions internal/app/machined/pkg/controllers/k8s/address_filter.go
Expand Up @@ -11,7 +11,6 @@ import (
"slices"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
Expand Down Expand Up @@ -97,20 +96,16 @@ func (ctrl *AddressFilterController) Run(ctx context.Context, r controller.Runti
serviceCIDRs = append(serviceCIDRs, ipPrefix)
}

if err = r.Modify(ctx, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterNoK8s), func(r resource.Resource) error {
spec := r.(*network.NodeAddressFilter).TypedSpec()

spec.ExcludeSubnets = append(slices.Clone(podCIDRs), serviceCIDRs...)
if err = safe.WriterModify(ctx, r, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterNoK8s), func(r *network.NodeAddressFilter) error {
r.TypedSpec().ExcludeSubnets = append(slices.Clone(podCIDRs), serviceCIDRs...)

return nil
}); err != nil {
return fmt.Errorf("error updating output resource: %w", err)
}

if err = r.Modify(ctx, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterOnlyK8s), func(r resource.Resource) error {
spec := r.(*network.NodeAddressFilter).TypedSpec()

spec.IncludeSubnets = append(slices.Clone(podCIDRs), serviceCIDRs...)
if err = safe.WriterModify(ctx, r, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterOnlyK8s), func(r *network.NodeAddressFilter) error {
r.TypedSpec().IncludeSubnets = append(slices.Clone(podCIDRs), serviceCIDRs...)

return nil
}); err != nil {
Expand Down
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/maps"
"github.com/siderolabs/gen/optional"
Expand Down Expand Up @@ -107,7 +108,7 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
}

// wait for etcd to be healthy as kube-apiserver is using local etcd instance
etcdResource, err := r.Get(ctx, resource.NewMetadata(v1alpha1.NamespaceName, v1alpha1.ServiceType, "etcd", resource.VersionUndefined))
etcdResource, err := safe.ReaderGetByID[*v1alpha1.Service](ctx, r, "etcd")
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
Expand All @@ -120,11 +121,11 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
return err
}

if !etcdResource.(*v1alpha1.Service).TypedSpec().Healthy {
if !etcdResource.TypedSpec().Healthy {
continue
}

secretsStatusResource, err := r.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.SecretsStatusType, k8s.StaticPodSecretsStaticPodID, resource.VersionUndefined))
secretsStatusResource, err := safe.ReaderGetByID[*k8s.SecretsStatus](ctx, r, k8s.StaticPodSecretsStaticPodID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
Expand All @@ -137,9 +138,9 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
return err
}

secretsVersion := secretsStatusResource.(*k8s.SecretsStatus).TypedSpec().Version
secretsVersion := secretsStatusResource.TypedSpec().Version

configStatusResource, err := r.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.ConfigStatusType, k8s.ConfigStatusStaticPodID, resource.VersionUndefined))
configStatusResource, err := safe.ReaderGetByID[*k8s.ConfigStatus](ctx, r, k8s.ConfigStatusStaticPodID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
Expand All @@ -152,7 +153,7 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
return err
}

configVersion := configStatusResource.(*k8s.ConfigStatus).TypedSpec().Version
configVersion := configStatusResource.TypedSpec().Version

touchedIDs := map[string]struct{}{}

Expand Down Expand Up @@ -339,7 +340,7 @@ func goGCEnvFromResources(resources v1.ResourceRequirements) (envVar v1.EnvVar)
return envVar
}

func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context, r controller.Runtime, logger *zap.Logger,
func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context, r controller.Runtime, _ *zap.Logger,
configResource resource.Resource, secretsVersion, configVersion string,
) (string, error) {
cfg := configResource.(*k8s.APIServerConfig).TypedSpec()
Expand Down Expand Up @@ -445,8 +446,8 @@ func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context
env = append(env, goGCEnv)
}

return k8s.APIServerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, k8s.APIServerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
return k8s.APIServerID, safe.WriterModify(ctx, r, k8s.NewStaticPod(k8s.NamespaceName, k8s.APIServerID), func(r *k8s.StaticPod) error {
return k8sadapter.StaticPod(r).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
Expand Down Expand Up @@ -555,7 +556,7 @@ func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context
}

func (ctrl *ControlPlaneStaticPodController) manageControllerManager(ctx context.Context, r controller.Runtime,
logger *zap.Logger, configResource resource.Resource, secretsVersion, configVersion string,
_ *zap.Logger, configResource resource.Resource, secretsVersion, _ string,
) (string, error) {
cfg := configResource.(*k8s.ControllerManagerConfig).TypedSpec()

Expand Down Expand Up @@ -619,8 +620,8 @@ func (ctrl *ControlPlaneStaticPodController) manageControllerManager(ctx context
env = append(env, goGCEnv)
}

return k8s.ControllerManagerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, k8s.ControllerManagerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
return k8s.ControllerManagerID, safe.WriterModify(ctx, r, k8s.NewStaticPod(k8s.NamespaceName, k8s.ControllerManagerID), func(r *k8s.StaticPod) error {
return k8sadapter.StaticPod(r).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
Expand Down Expand Up @@ -712,7 +713,7 @@ func (ctrl *ControlPlaneStaticPodController) manageControllerManager(ctx context
}

func (ctrl *ControlPlaneStaticPodController) manageScheduler(ctx context.Context, r controller.Runtime,
logger *zap.Logger, configResource resource.Resource, secretsVersion, configVersion string,
_ *zap.Logger, configResource resource.Resource, secretsVersion, _ string,
) (string, error) {
cfg := configResource.(*k8s.SchedulerConfig).TypedSpec()

Expand Down Expand Up @@ -758,8 +759,8 @@ func (ctrl *ControlPlaneStaticPodController) manageScheduler(ctx context.Context
env = append(env, goGCEnv)
}

return k8s.SchedulerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, k8s.SchedulerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
return k8s.SchedulerID, safe.WriterModify(ctx, r, k8s.NewStaticPod(k8s.NamespaceName, k8s.SchedulerID), func(r *k8s.StaticPod) error {
return k8sadapter.StaticPod(r).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
Expand Down
12 changes: 6 additions & 6 deletions internal/app/machined/pkg/controllers/k8s/control_plane_test.go
Expand Up @@ -319,13 +319,13 @@ func (suite *K8sControlPlaneSuite) TestReconcileResources() {
APIServerConfig: &v1alpha1.APIServerConfig{
ResourcesConfig: &v1alpha1.ResourcesConfig{
Requests: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": "100m",
"memory": "1Gi",
},
},
Limits: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": 2,
"memory": "1500Mi",
},
Expand All @@ -335,13 +335,13 @@ func (suite *K8sControlPlaneSuite) TestReconcileResources() {
ControllerManagerConfig: &v1alpha1.ControllerManagerConfig{
ResourcesConfig: &v1alpha1.ResourcesConfig{
Requests: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": "50m",
"memory": "500Mi",
},
},
Limits: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": 1,
"memory": "1000Mi",
},
Expand All @@ -351,13 +351,13 @@ func (suite *K8sControlPlaneSuite) TestReconcileResources() {
SchedulerConfig: &v1alpha1.SchedulerConfig{
ResourcesConfig: &v1alpha1.ResourcesConfig{
Requests: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": "150m",
"memory": "2Gi",
},
},
Limits: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": 3,
"memory": "2000Mi",
},
Expand Down
29 changes: 15 additions & 14 deletions internal/app/machined/pkg/controllers/k8s/endpoint.go
Expand Up @@ -9,11 +9,11 @@ import (
"fmt"
"net/netip"
"reflect"
"sort"
"slices"
"time"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
"go.uber.org/zap"
Expand Down Expand Up @@ -80,7 +80,7 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l
case <-r.EventCh():
}

machineTypeRes, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineTypeType, config.MachineTypeID, resource.VersionUndefined))
machineTypeRes, err := safe.ReaderGetByID[*config.MachineType](ctx, r, config.MachineTypeID)
if err != nil {
if state.IsNotFoundError(err) {
continue
Expand All @@ -89,7 +89,7 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l
return fmt.Errorf("error getting machine type: %w", err)
}

machineType := machineTypeRes.(*config.MachineType).MachineType()
machineType := machineTypeRes.MachineType()

switch machineType { //nolint:exhaustive
case machine.TypeWorker:
Expand Down Expand Up @@ -170,7 +170,7 @@ func (ctrl *EndpointController) watchEndpointsOnControlPlane(ctx context.Context
return nil
}

secretsResources, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesType, secrets.KubernetesID, resource.VersionUndefined))
secretsResources, err := safe.ReaderGetByID[*secrets.Kubernetes](ctx, r, secrets.KubernetesID)
if err != nil {
if state.IsNotFoundError(err) {
return nil
Expand All @@ -179,7 +179,7 @@ func (ctrl *EndpointController) watchEndpointsOnControlPlane(ctx context.Context
return err
}

secrets := secretsResources.(*secrets.Kubernetes).TypedSpec()
secrets := secretsResources.TypedSpec()

kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
// using here kubeconfig with cluster control plane endpoint, as endpoint discovery should work before local API server is ready
Expand Down Expand Up @@ -207,16 +207,17 @@ func (ctrl *EndpointController) updateEndpointsResource(ctx context.Context, r c
}
}

sort.Slice(addrs, func(i, j int) bool { return addrs[i].Compare(addrs[j]) < 0 })
slices.SortFunc(addrs, func(a, b netip.Addr) int { return a.Compare(b) })

if err := r.Modify(ctx,
if err := safe.WriterModify(ctx,
r,
k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneAPIServerEndpointsID),
func(r resource.Resource) error {
if !reflect.DeepEqual(r.(*k8s.Endpoint).TypedSpec().Addresses, addrs) {
func(r *k8s.Endpoint) error {
if !reflect.DeepEqual(r.TypedSpec().Addresses, addrs) {
logger.Debug("updated controlplane endpoints", zap.Any("endpoints", addrs))
}

r.(*k8s.Endpoint).TypedSpec().Addresses = addrs
r.TypedSpec().Addresses = addrs

return nil
},
Expand Down Expand Up @@ -287,9 +288,9 @@ func kubernetesEndpointWatcher(ctx context.Context, logger *zap.Logger, client *
}

if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { notifyCh <- obj.(*corev1.Endpoints) },
DeleteFunc: func(_ interface{}) { notifyCh <- &corev1.Endpoints{} },
UpdateFunc: func(_, obj interface{}) { notifyCh <- obj.(*corev1.Endpoints) },
AddFunc: func(obj any) { notifyCh <- obj.(*corev1.Endpoints) },
DeleteFunc: func(_ any) { notifyCh <- &corev1.Endpoints{} },
UpdateFunc: func(_, obj any) { notifyCh <- obj.(*corev1.Endpoints) },
}); err != nil {
return nil, nil, fmt.Errorf("error adding watch event handler: %w", err)
}
Expand Down
22 changes: 12 additions & 10 deletions internal/app/machined/pkg/controllers/k8s/extra_manifest.go
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-getter/v2"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (ctrl *ExtraManifestController) Run(ctx context.Context, r controller.Runti
}

// wait for network to be ready as networking is required to download extra manifests
networkResource, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.StatusType, network.StatusID, resource.VersionUndefined))
networkResource, err := safe.ReaderGetByID[*network.Status](ctx, r, network.StatusID)
if err != nil {
if state.IsNotFoundError(err) {
continue
Expand All @@ -82,13 +83,13 @@ func (ctrl *ExtraManifestController) Run(ctx context.Context, r controller.Runti
return err
}

networkStatus := networkResource.(*network.Status).TypedSpec()
networkStatus := networkResource.TypedSpec()

if !(networkStatus.AddressReady && networkStatus.ConnectivityReady) {
continue
}

configResource, err := r.Get(ctx, k8s.NewExtraManifestsConfig().Metadata())
configResource, err := safe.ReaderGetByID[*k8s.ExtraManifestsConfig](ctx, r, k8s.ExtraManifestsConfigID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
Expand All @@ -101,7 +102,7 @@ func (ctrl *ExtraManifestController) Run(ctx context.Context, r controller.Runti
return err
}

config := *configResource.(*k8s.ExtraManifestsConfig).TypedSpec()
config := *configResource.TypedSpec()

var multiErr *multierror.Error

Expand Down Expand Up @@ -211,9 +212,9 @@ func (ctrl *ExtraManifestController) processURL(ctx context.Context, r controlle
return
}

if err = r.Modify(ctx, k8s.NewManifest(k8s.ControlPlaneNamespaceName, id),
func(r resource.Resource) error {
return k8sadapter.Manifest(r.(*k8s.Manifest)).SetYAML(contents)
if err = safe.WriterModify(ctx, r, k8s.NewManifest(k8s.ControlPlaneNamespaceName, id),
func(r *k8s.Manifest) error {
return k8sadapter.Manifest(r).SetYAML(contents)
}); err != nil {
err = fmt.Errorf("error updating manifests: %w", err)

Expand All @@ -224,11 +225,12 @@ func (ctrl *ExtraManifestController) processURL(ctx context.Context, r controlle
}

func (ctrl *ExtraManifestController) processInline(ctx context.Context, r controller.Runtime, manifest k8s.ExtraManifest, id resource.ID) error {
err := r.Modify(
err := safe.WriterModify(
ctx,
r,
k8s.NewManifest(k8s.ControlPlaneNamespaceName, id),
func(r resource.Resource) error {
return k8sadapter.Manifest(r.(*k8s.Manifest)).SetYAML([]byte(manifest.InlineManifest))
func(r *k8s.Manifest) error {
return k8sadapter.Manifest(r).SetYAML([]byte(manifest.InlineManifest))
},
)
if err != nil {
Expand Down
Expand Up @@ -60,7 +60,7 @@ func (r *NodeWatcher) Watch(ctx context.Context, logger *zap.Logger) (<-chan str

notifyCh := make(chan struct{}, 1)

notify := func(_ interface{}) {
notify := func(_ any) {
select {
case notifyCh <- struct{}{}:
default:
Expand All @@ -78,7 +78,7 @@ func (r *NodeWatcher) Watch(ctx context.Context, logger *zap.Logger) (<-chan str
if _, err := r.nodes.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: notify,
DeleteFunc: notify,
UpdateFunc: func(_, _ interface{}) { notify(nil) },
UpdateFunc: func(_, _ any) { notify(nil) },
}); err != nil {
return nil, nil, fmt.Errorf("failed to add event handler: %w", err)
}
Expand Down
Expand Up @@ -102,7 +102,7 @@ func (suite *KubeletConfigSuite) TestReconcile() {
},
},
KubeletExtraConfig: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"serverTLSBootstrap": true,
},
},
Expand Down Expand Up @@ -170,7 +170,7 @@ func (suite *KubeletConfigSuite) TestReconcile() {
spec.ExtraMounts,
)
suite.Assert().Equal(
map[string]interface{}{
map[string]any{
"serverTLSBootstrap": true,
},
spec.ExtraConfig,
Expand Down
2 changes: 1 addition & 1 deletion internal/app/machined/pkg/controllers/k8s/kubelet_spec.go
Expand Up @@ -213,7 +213,7 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime
}
}

func prepareExtraConfig(extraConfig map[string]interface{}) (*kubeletconfig.KubeletConfiguration, error) {
func prepareExtraConfig(extraConfig map[string]any) (*kubeletconfig.KubeletConfiguration, error) {
// check for fields that can't be overridden via extraConfig
var multiErr *multierror.Error

Expand Down

0 comments on commit 59b6239

Please sign in to comment.