From 800762dc150f11be3eb94d8382d1c47dc24b4cf8 Mon Sep 17 00:00:00 2001 From: Artem Chernyshev Date: Fri, 24 May 2024 21:35:26 +0300 Subject: [PATCH] chore: rewrite `MachineStatus` to use `QController` This rewrite should improve performance of the controller drastically. Fixes: https://github.com/siderolabs/omni/issues/281 Signed-off-by: Artem Chernyshev --- cmd/integration-test/pkg/tests/talos.go | 4 + .../omni/internal/task/machine/helpers.go | 4 + .../omni/internal/task/machine/machine.go | 6 - .../controllers/omni/internal/task/task.go | 2 + .../omni/controllers/omni/machine_status.go | 572 ++++++++++-------- .../omni/machine_status_link_test.go | 6 +- .../controllers/omni/machine_status_test.go | 10 +- internal/backend/runtime/omni/omni.go | 4 +- 8 files changed, 334 insertions(+), 274 deletions(-) diff --git a/cmd/integration-test/pkg/tests/talos.go b/cmd/integration-test/pkg/tests/talos.go index 096e9a06..7a00d486 100644 --- a/cmd/integration-test/pkg/tests/talos.go +++ b/cmd/integration-test/pkg/tests/talos.go @@ -60,6 +60,10 @@ func clearConnectionRefused(ctx context.Context, t *testing.T, c *talosclient.Cl return retry.ExpectedError(err) } + if strings.Contains(err.Error(), "connection reset by peer") { + return retry.ExpectedError(err) + } + return err } diff --git a/internal/backend/runtime/omni/controllers/omni/internal/task/machine/helpers.go b/internal/backend/runtime/omni/controllers/omni/internal/task/machine/helpers.go index d0166829..40090d54 100644 --- a/internal/backend/runtime/omni/controllers/omni/internal/task/machine/helpers.go +++ b/internal/backend/runtime/omni/controllers/omni/internal/task/machine/helpers.go @@ -8,6 +8,7 @@ package machine import ( "context" "fmt" + "time" "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/resource/meta" @@ -41,6 +42,9 @@ func forEachResource[T resource.Resource]( // QueryRegisteredTypes gets all registered types from the meta namespace. func QueryRegisteredTypes(ctx context.Context, st state.State) (map[resource.Type]struct{}, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + // query all resources to start watching only resources that are defined for running version of talos resources, err := safe.StateList[*meta.ResourceDefinition](ctx, st, resource.NewMetadata(meta.NamespaceName, meta.ResourceDefinitionType, "", resource.VersionUndefined)) if err != nil { diff --git a/internal/backend/runtime/omni/controllers/omni/internal/task/machine/machine.go b/internal/backend/runtime/omni/controllers/omni/internal/task/machine/machine.go index 57269c91..efe0f511 100644 --- a/internal/backend/runtime/omni/controllers/omni/internal/task/machine/machine.go +++ b/internal/backend/runtime/omni/controllers/omni/internal/task/machine/machine.go @@ -192,12 +192,6 @@ func (spec CollectTaskSpec) RunTask(ctx context.Context, logger *zap.Logger, not registeredTypes, err := QueryRegisteredTypes(ctx, c.COSI) if err != nil { - // this is the first request to the Talos API - // if it fails we handle it and update the machine status with the request error - if !spec.sendInfo(ctx, Info{}, notifyCh, err) { - return nil - } - return err } diff --git a/internal/backend/runtime/omni/controllers/omni/internal/task/task.go b/internal/backend/runtime/omni/controllers/omni/internal/task/task.go index cfcf7843..aa3bc707 100644 --- a/internal/backend/runtime/omni/controllers/omni/internal/task/task.go +++ b/internal/backend/runtime/omni/controllers/omni/internal/task/task.go @@ -73,6 +73,8 @@ func (task *Task[T, S]) runWithRestarts(ctx context.Context) { // finished without an error if err == nil { + task.logger.Info("task finished") + return } diff --git a/internal/backend/runtime/omni/controllers/omni/machine_status.go b/internal/backend/runtime/omni/controllers/omni/machine_status.go index b9510c44..c7ef6985 100644 --- a/internal/backend/runtime/omni/controllers/omni/machine_status.go +++ b/internal/backend/runtime/omni/controllers/omni/machine_status.go @@ -10,10 +10,12 @@ import ( "fmt" "github.com/cosi-project/runtime/pkg/controller" + "github.com/cosi-project/runtime/pkg/controller/generic" "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/resource/kvutils" "github.com/cosi-project/runtime/pkg/safe" - cosistate "github.com/cosi-project/runtime/pkg/state" + "github.com/cosi-project/runtime/pkg/state" + "github.com/siderolabs/gen/optional" "github.com/siderolabs/image-factory/pkg/schematic" machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine" "go.uber.org/zap" @@ -25,7 +27,7 @@ import ( "github.com/siderolabs/omni/internal/backend/imagefactory" "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/helpers" "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/task" - "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/task/machine" + machinetask "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/task/machine" ) // SchematicEnsurer ensures that the given schematic exists in the image factory. @@ -33,325 +35,269 @@ type SchematicEnsurer interface { EnsureSchematic(ctx context.Context, inputSchematic schematic.Schematic) (imagefactory.EnsuredSchematic, error) } +// MachineStatusControllerName is the name of the MachineStatusController. +const MachineStatusControllerName = "MachineStatusController" + // MachineStatusController manages omni.MachineStatuses based on information from Talos API. type MachineStatusController struct { - runner *task.Runner[machine.InfoChan, machine.CollectTaskSpec] ImageFactoryClient SchematicEnsurer + runner *task.Runner[machinetask.InfoChan, machinetask.CollectTaskSpec] + notifyCh chan machinetask.Info + generic.NamedController } -// Name implements controller.Controller interface. -func (ctrl *MachineStatusController) Name() string { - return "MachineStatusController" -} - -// Inputs implements controller.Controller interface. -func (ctrl *MachineStatusController) Inputs() []controller.Input { - return []controller.Input{ - { - Namespace: resources.DefaultNamespace, - Type: omni.MachineStatusType, - Kind: controller.InputDestroyReady, - }, - { - Namespace: resources.DefaultNamespace, - Type: omni.MachineType, - Kind: controller.InputStrong, - }, - { - Namespace: resources.DefaultNamespace, - Type: omni.ClusterMachineType, - Kind: controller.InputWeak, - }, - { - Namespace: resources.DefaultNamespace, - Type: omni.TalosConfigType, - Kind: controller.InputWeak, - }, - { - Namespace: resources.DefaultNamespace, - Type: omni.MachineStatusSnapshotType, - Kind: controller.InputWeak, - }, - { - Namespace: resources.DefaultNamespace, - Type: omni.MachineLabelsType, - Kind: controller.InputWeak, - }, - { - Namespace: resources.DefaultNamespace, - Type: siderolink.ConnectionParamsType, - Kind: controller.InputWeak, +// NewMachineStatusController initializes MachineStatusController. +func NewMachineStatusController(imageFactoryClient SchematicEnsurer) *MachineStatusController { + return &MachineStatusController{ + NamedController: generic.NamedController{ + ControllerName: MachineStatusControllerName, }, + notifyCh: make(chan machinetask.Info), + runner: task.NewEqualRunner[machinetask.CollectTaskSpec](), + ImageFactoryClient: imageFactoryClient, } } -// Outputs implements controller.Controller interface. -func (ctrl *MachineStatusController) Outputs() []controller.Output { - return []controller.Output{ - { - Type: omni.MachineStatusType, - Kind: controller.OutputExclusive, +// Settings implements controller.QController interface. +func (ctrl *MachineStatusController) Settings() controller.QSettings { + return controller.QSettings{ + Inputs: []controller.Input{ + { + Namespace: resources.DefaultNamespace, + Type: omni.MachineType, + Kind: controller.InputQPrimary, + }, + { + Namespace: resources.DefaultNamespace, + Type: omni.MachineStatusType, + Kind: controller.InputQMappedDestroyReady, + }, + { + Namespace: resources.DefaultNamespace, + Type: omni.MachineSetNodeType, + Kind: controller.InputQMapped, + }, + { + Namespace: resources.DefaultNamespace, + Type: omni.ClusterMachineStatusType, + Kind: controller.InputQMapped, + }, + { + Namespace: resources.DefaultNamespace, + Type: omni.TalosConfigType, + Kind: controller.InputQMapped, + }, + { + Namespace: resources.DefaultNamespace, + Type: omni.MachineStatusSnapshotType, + Kind: controller.InputQMapped, + }, + { + Namespace: resources.DefaultNamespace, + Type: omni.MachineLabelsType, + Kind: controller.InputQMapped, + }, + { + Namespace: resources.DefaultNamespace, + Type: siderolink.ConnectionParamsType, + Kind: controller.InputQMapped, + }, }, - } -} - -// Run implements controller.Controller interface. -func (ctrl *MachineStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { - ctrl.runner = task.NewEqualRunner[machine.CollectTaskSpec]() - defer ctrl.runner.Stop() - - notifyCh := make(chan machine.Info) - - for { - select { - case <-ctx.Done(): - return nil - case <-r.EventCh(): - if err := ctrl.reconcileCollectors(ctx, r, logger, notifyCh); err != nil { - return err - } - case event := <-notifyCh: - if err := ctrl.handleNotification(ctx, r, event); err != nil { - return err + Outputs: []controller.Output{ + { + Kind: controller.OutputExclusive, + Type: omni.MachineStatusType, + }, + }, + Concurrency: optional.Some[uint](4), + RunHook: func(ctx context.Context, _ *zap.Logger, r controller.QRuntime) error { + for { + select { + case <-ctx.Done(): + return nil + case event := <-ctrl.notifyCh: + if err := ctrl.handleNotification(ctx, r, event); err != nil { + return err + } + } } - } - - r.ResetRestartBackoff() + }, + ShutdownHook: func() { + ctrl.runner.Stop() + }, } } -//nolint:gocognit,gocyclo,cyclop -func (ctrl *MachineStatusController) reconcileCollectors(ctx context.Context, r controller.Runtime, logger *zap.Logger, notifyCh machine.InfoChan) error { - // list machines - list, err := safe.ReaderListAll[*omni.Machine](ctx, r) +// MapInput implements controller.QController interface. +func (ctrl *MachineStatusController) MapInput(ctx context.Context, _ *zap.Logger, + r controller.QRuntime, ptr resource.Pointer, +) ([]resource.Pointer, error) { + _, err := r.Get(ctx, ptr) if err != nil { - return fmt.Errorf("error listing machines: %w", err) - } - - tracker := trackResource(r, resources.DefaultNamespace, omni.MachineStatusType) - - tracker.beforeDestroyCallback = func(res resource.Resource) error { - return r.RemoveFinalizer(ctx, omni.NewMachine(resources.DefaultNamespace, res.Metadata().ID()).Metadata(), ctrl.Name()) + if state.IsNotFoundError(err) { + return nil, nil + } } - // figure out which collectors should run - shouldRun := map[string]machine.CollectTaskSpec{} - machines := map[resource.ID]*omni.Machine{} - machineLabels := map[resource.ID]*omni.MachineLabels{} - reportingEvents := map[string]struct{}{} - - for iter := list.Iterator(); iter.Next(); { - item := iter.Value() - machineSpec := item.TypedSpec().Value - - var labels *omni.MachineLabels - - labels, err = safe.ReaderGet[*omni.MachineLabels](ctx, r, - resource.NewMetadata(resources.DefaultNamespace, omni.MachineLabelsType, item.Metadata().ID(), resource.VersionUndefined)) + switch ptr.Type() { + case omni.MachineStatusType: + fallthrough + case omni.MachineType: + fallthrough + case omni.MachineSetNodeType: + fallthrough + case omni.ClusterMachineStatusType: + fallthrough + case omni.MachineLabelsType: + fallthrough + case omni.MachineStatusSnapshotType: + return []resource.Pointer{ + omni.NewMachineStatus(resources.DefaultNamespace, ptr.ID()).Metadata(), + }, nil + case omni.TalosConfigType: + machines, err := safe.ReaderListAll[*omni.ClusterMachineStatus](ctx, r, state.WithLabelQuery(resource.LabelEqual(omni.LabelCluster, ptr.ID()))) if err != nil { - if !cosistate.IsNotFoundError(err) { - return err - } + return nil, err } - machineLabels[item.Metadata().ID()] = labels + res := make([]resource.Pointer, 0, machines.Len()) - if item.Metadata().Phase() == resource.PhaseTearingDown { - continue - } + machines.ForEach(func(r *omni.ClusterMachineStatus) { + res = append(res, omni.NewMachineStatus(resources.DefaultNamespace, r.Metadata().ID()).Metadata()) + }) - if !item.Metadata().Finalizers().Has(ctrl.Name()) { - if err = r.AddFinalizer(ctx, item.Metadata(), ctrl.Name()); err != nil { - return err - } - } - - tracker.keep(item) - - var clusterMachine resource.Resource - - clusterMachine, err = r.Get(ctx, resource.NewMetadata(resources.DefaultNamespace, omni.ClusterMachineType, item.Metadata().ID(), resource.VersionUndefined)) - if err != nil && !cosistate.IsNotFoundError(err) { - return err - } - - var cluster string + return res, nil + case siderolink.ConnectionParamsType: + return nil, nil + } - if clusterMachine != nil { - var ok bool - cluster, ok = clusterMachine.Metadata().Labels().Get(omni.LabelCluster) + return nil, fmt.Errorf("unexpected resource type %q", ptr.Type()) +} - if !ok { - logger.Warn("The cluster machine is created without the cluster") - } +// Reconcile implements controller.QController interface. +func (ctrl *MachineStatusController) Reconcile(ctx context.Context, + logger *zap.Logger, r controller.QRuntime, ptr resource.Pointer, +) error { + machine, err := safe.ReaderGetByID[*omni.Machine](ctx, r, ptr.ID()) + if err != nil { + if state.IsNotFoundError(err) { + return nil } - var talosConfig *omni.TalosConfig - - if cluster != "" { - talosConfig, err = safe.ReaderGet[*omni.TalosConfig](ctx, r, resource.NewMetadata( - resources.DefaultNamespace, omni.TalosConfigType, cluster, resource.VersionUndefined, - )) + return err + } - if err != nil && !cosistate.IsNotFoundError(err) { - return err - } - } + if machine.Metadata().Phase() == resource.PhaseTearingDown { + return ctrl.reconcileTearingDown(ctx, r, logger, machine) + } - var machineStatusSnapshot *omni.MachineStatusSnapshot - machineStatusSnapshot, err = safe.ReaderGet[*omni.MachineStatusSnapshot](ctx, r, resource.NewMetadata( - resources.DefaultNamespace, - omni.MachineStatusSnapshotType, - item.Metadata().ID(), - resource.VersionUndefined, - )) + return ctrl.reconcileRunning(ctx, r, logger, machine) +} - if err != nil && !cosistate.IsNotFoundError(err) { +func (ctrl *MachineStatusController) reconcileRunning(ctx context.Context, r controller.QRuntime, logger *zap.Logger, machine *omni.Machine) error { + if !machine.Metadata().Finalizers().Has(ctrl.Name()) { + if err := r.AddFinalizer(ctx, machine.Metadata(), ctrl.Name()); err != nil { return err } - - var maintenanceStage bool - - if machineStatusSnapshot != nil { - maintenanceStage = machineStatusSnapshot.TypedSpec().Value.MachineStatus.Stage == machineapi.MachineStatusEvent_MAINTENANCE - reportingEvents[item.Metadata().ID()] = struct{}{} - } - - if machineSpec.Connected { - var params *siderolink.ConnectionParams - - params, err = safe.ReaderGetByID[*siderolink.ConnectionParams](ctx, r, siderolink.ConfigID) - if err != nil { - return fmt.Errorf("error reading connection params: %w", err) - } - - shouldRun[item.Metadata().ID()] = machine.CollectTaskSpec{ - Endpoint: machineSpec.ManagementAddress, - TalosConfig: talosConfig, - MaintenanceMode: talosConfig == nil || maintenanceStage, - MachineID: item.Metadata().ID(), - MachineLabels: labels, - DefaultSchematicKernelArgs: siderolink.KernelArgs(params), - } - } - - machines[item.Metadata().ID()] = item } - ctrl.runner.Reconcile(ctx, logger, shouldRun, notifyCh) - - if err = tracker.cleanup(ctx); err != nil { + inputs, err := ctrl.handleInputs(ctx, r, machine) + if err != nil { return err } - for id := range machines { - if err = safe.WriterModify(ctx, r, omni.NewMachineStatus(resources.DefaultNamespace, id), func(m *omni.MachineStatus) error { - spec := m.TypedSpec().Value + var ( + reportingEvents bool + maintenanceMode bool + ) - connected := machines[id].TypedSpec().Value.Connected + if inputs.machineStatusSnapshot != nil { + reportingEvents = true - spec.Connected = connected + maintenanceMode = inputs.machineStatusSnapshot.TypedSpec().Value.MachineStatus.Stage == machineapi.MachineStatusEvent_MAINTENANCE + } - if connected { - m.Metadata().Labels().Set(omni.MachineStatusLabelConnected, "") - m.Metadata().Labels().Delete(omni.MachineStatusLabelDisconnected) - } else { - m.Metadata().Labels().Delete(omni.MachineStatusLabelConnected) - m.Metadata().Labels().Set(omni.MachineStatusLabelDisconnected, "") - } + ctrl.runner.StopTask(logger, machine.Metadata().ID()) - if _, ok := reportingEvents[id]; ok { - m.Metadata().Labels().Set(omni.MachineStatusLabelReportingEvents, "") - } else { - m.Metadata().Labels().Delete(omni.MachineStatusLabelReportingEvents) - } + if machine.TypedSpec().Value.Connected { + var params *siderolink.ConnectionParams - if shouldRun[id].Endpoint != "" { - spec.ManagementAddress = shouldRun[id].Endpoint - } + params, err = safe.ReaderGetByID[*siderolink.ConnectionParams](ctx, r, siderolink.ConfigID) + if err != nil { + return fmt.Errorf("error reading connection params: %w", err) + } - var clusterMachine *omni.ClusterMachine + ctrl.runner.StartTask(ctx, logger, machine.Metadata().ID(), machinetask.CollectTaskSpec{ + Endpoint: machine.TypedSpec().Value.ManagementAddress, + TalosConfig: inputs.talosConfig, + MaintenanceMode: inputs.talosConfig == nil || maintenanceMode, + MachineID: machine.Metadata().ID(), + MachineLabels: inputs.machineLabels, + DefaultSchematicKernelArgs: siderolink.KernelArgs(params), + }, ctrl.notifyCh) + } - clusterMachine, err = safe.ReaderGet[*omni.ClusterMachine](ctx, r, resource.NewMetadata(resources.DefaultNamespace, omni.ClusterMachineType, id, resource.VersionUndefined)) - if err != nil { - if !cosistate.IsNotFoundError(err) { - return err - } - } + return safe.WriterModify(ctx, r, omni.NewMachineStatus(resources.DefaultNamespace, machine.Metadata().ID()), func(m *omni.MachineStatus) error { + spec := m.TypedSpec().Value - helpers.CopyUserLabels(m, ctrl.mergeLabels(m, machineLabels[m.Metadata().ID()])) + connected := machine.TypedSpec().Value.Connected - omni.MachineStatusReconcileLabels(m) + spec.Connected = connected - return ctrl.setClusterRelation(clusterMachine, m) - }); err != nil && !cosistate.IsPhaseConflictError(err) { - return err + if connected { + m.Metadata().Labels().Set(omni.MachineStatusLabelConnected, "") + m.Metadata().Labels().Delete(omni.MachineStatusLabelDisconnected) + } else { + m.Metadata().Labels().Delete(omni.MachineStatusLabelConnected) + m.Metadata().Labels().Set(omni.MachineStatusLabelDisconnected, "") } - } - - return nil -} -func (ctrl *MachineStatusController) mergeLabels(m *omni.MachineStatus, machineLabels *omni.MachineLabels) map[string]string { - labels := map[string]string{} - - if m.TypedSpec().Value.ImageLabels != nil { - for k, v := range m.TypedSpec().Value.ImageLabels { - labels[k] = v + if reportingEvents { + m.Metadata().Labels().Set(omni.MachineStatusLabelReportingEvents, "") + } else { + m.Metadata().Labels().Delete(omni.MachineStatusLabelReportingEvents) } - } - if machineLabels != nil { - for k, v := range machineLabels.Metadata().Labels().Raw() { - labels[k] = v + if machine.TypedSpec().Value.ManagementAddress != "" { + spec.ManagementAddress = machine.TypedSpec().Value.ManagementAddress } - } - return labels -} + helpers.CopyUserLabels(m, ctrl.mergeLabels(m, inputs.machineLabels)) -func (ctrl *MachineStatusController) setClusterRelation(clusterMachine *omni.ClusterMachine, machineStatus *omni.MachineStatus) error { - if clusterMachine == nil { - machineStatus.TypedSpec().Value.Cluster = "" - machineStatus.TypedSpec().Value.Role = specs.MachineStatusSpec_NONE + omni.MachineStatusReconcileLabels(m) - machineStatus.Metadata().Labels().Set(omni.MachineStatusLabelAvailable, "") + return ctrl.setClusterRelation(inputs, m) + }) +} - machineStatus.Metadata().Labels().Delete(omni.LabelCluster) +func (ctrl *MachineStatusController) reconcileTearingDown(ctx context.Context, r controller.QRuntime, logger *zap.Logger, machine *omni.Machine) error { + ctrl.runner.StopTask(logger, machine.Metadata().ID()) - return nil + _, err := ctrl.handleInputs(ctx, r, machine) + if err != nil { + return err } - labels := clusterMachine.Metadata().Labels() + md := omni.NewMachineStatus(resources.DefaultNamespace, machine.Metadata().ID()).Metadata() - cluster, clusterOk := labels.Get(omni.LabelCluster) - if !clusterOk { - return fmt.Errorf("malformed ClusterMachine resource: no %q label, cluster ownership unknown", omni.LabelCluster) + ready, err := r.Teardown(ctx, md) + if err != nil { + return err } - machineStatus.TypedSpec().Value.Cluster = cluster - - _, controlPlane := labels.Get(omni.LabelControlPlaneRole) - _, worker := labels.Get(omni.LabelWorkerRole) - - machineStatus.Metadata().Labels().Set(omni.LabelCluster, cluster) - - switch { - case controlPlane: - machineStatus.TypedSpec().Value.Role = specs.MachineStatusSpec_CONTROL_PLANE - case worker: - machineStatus.TypedSpec().Value.Role = specs.MachineStatusSpec_WORKER - default: - return fmt.Errorf("malformed ClusterMachine resource: no %q or %q label, role unknown", omni.LabelControlPlaneRole, omni.LabelWorkerRole) + if !ready { + return nil } - machineStatus.Metadata().Labels().Delete(omni.MachineStatusLabelAvailable) + if err = r.Destroy(ctx, md); err != nil { + return err + } - return nil + return r.RemoveFinalizer(ctx, machine.Metadata(), ctrl.Name()) } -//nolint:gocognit,gocyclo,cyclop -func (ctrl *MachineStatusController) handleNotification(ctx context.Context, r controller.Runtime, event machine.Info) error { +//nolint:gocyclo,cyclop,gocognit +func (ctrl *MachineStatusController) handleNotification(ctx context.Context, r controller.QRuntime, event machinetask.Info) error { if err := safe.WriterModify(ctx, r, omni.NewMachineStatus(resources.DefaultNamespace, event.MachineID), func(m *omni.MachineStatus) error { spec := m.TypedSpec().Value @@ -465,7 +411,7 @@ func (ctrl *MachineStatusController) handleNotification(ctx context.Context, r c omni.MachineStatusReconcileLabels(m) return nil - }); err != nil && !cosistate.IsPhaseConflictError(err) { + }); err != nil && !state.IsPhaseConflictError(err) { return fmt.Errorf("error modifying resource: %w", err) } @@ -485,3 +431,117 @@ func (ctrl *MachineStatusController) handleNotification(ctx context.Context, r c return nil } + +type inputs struct { + clusterMachineStatus *omni.ClusterMachineStatus + machineSetNode *omni.MachineSetNode + machineLabels *omni.MachineLabels + machineStatusSnapshot *omni.MachineStatusSnapshot + talosConfig *omni.TalosConfig +} + +func (ctrl *MachineStatusController) handleInputs(ctx context.Context, r controller.QRuntime, machine *omni.Machine) (inputs, error) { + var ( + in inputs + err error + ) + + in.machineStatusSnapshot, err = helpers.HandleInput[*omni.MachineStatusSnapshot](ctx, r, ctrl.Name(), machine) + if err != nil { + return in, err + } + + in.machineLabels, err = helpers.HandleInput[*omni.MachineLabels](ctx, r, ctrl.Name(), machine) + if err != nil { + return in, err + } + + in.clusterMachineStatus, err = helpers.HandleInput[*omni.ClusterMachineStatus](ctx, r, ctrl.Name(), machine) + if err != nil { + return in, err + } + + in.machineSetNode, err = safe.ReaderGetByID[*omni.MachineSetNode](ctx, r, machine.Metadata().ID()) + if err != nil && !state.IsNotFoundError(err) { + return in, err + } + + if in.clusterMachineStatus != nil { + clusterName, ok := in.clusterMachineStatus.Metadata().Labels().Get(omni.LabelCluster) + if ok { + in.talosConfig, err = safe.ReaderGetByID[*omni.TalosConfig](ctx, r, clusterName) + if err != nil && !state.IsNotFoundError(err) { + return in, err + } + } + } + + return in, nil +} + +func (ctrl *MachineStatusController) mergeLabels(m *omni.MachineStatus, machineLabels *omni.MachineLabels) map[string]string { + labels := map[string]string{} + + if m.TypedSpec().Value.ImageLabels != nil { + for k, v := range m.TypedSpec().Value.ImageLabels { + labels[k] = v + } + } + + if machineLabels != nil { + for k, v := range machineLabels.Metadata().Labels().Raw() { + labels[k] = v + } + } + + return labels +} + +func (ctrl *MachineStatusController) setClusterRelation(in inputs, machineStatus *omni.MachineStatus) error { + var md *resource.Metadata + + switch { + case in.clusterMachineStatus != nil: + md = in.clusterMachineStatus.Metadata() + case in.machineSetNode != nil: + md = in.machineSetNode.Metadata() + } + + if md == nil { + machineStatus.TypedSpec().Value.Cluster = "" + machineStatus.TypedSpec().Value.Role = specs.MachineStatusSpec_NONE + + machineStatus.Metadata().Labels().Set(omni.MachineStatusLabelAvailable, "") + + machineStatus.Metadata().Labels().Delete(omni.LabelCluster) + + return nil + } + + labels := md.Labels() + + cluster, clusterOk := labels.Get(omni.LabelCluster) + if !clusterOk { + return fmt.Errorf("malformed ClusterMachine resource: no %q label, cluster ownership unknown", omni.LabelCluster) + } + + machineStatus.TypedSpec().Value.Cluster = cluster + + _, controlPlane := labels.Get(omni.LabelControlPlaneRole) + _, worker := labels.Get(omni.LabelWorkerRole) + + machineStatus.Metadata().Labels().Set(omni.LabelCluster, cluster) + + switch { + case controlPlane: + machineStatus.TypedSpec().Value.Role = specs.MachineStatusSpec_CONTROL_PLANE + case worker: + machineStatus.TypedSpec().Value.Role = specs.MachineStatusSpec_WORKER + default: + return fmt.Errorf("malformed ClusterMachine resource: no %q or %q label, role unknown", omni.LabelControlPlaneRole, omni.LabelWorkerRole) + } + + machineStatus.Metadata().Labels().Delete(omni.MachineStatusLabelAvailable) + + return nil +} diff --git a/internal/backend/runtime/omni/controllers/omni/machine_status_link_test.go b/internal/backend/runtime/omni/controllers/omni/machine_status_link_test.go index 91237eec..a564f265 100644 --- a/internal/backend/runtime/omni/controllers/omni/machine_status_link_test.go +++ b/internal/backend/runtime/omni/controllers/omni/machine_status_link_test.go @@ -37,9 +37,9 @@ func (suite *MachineStatusLinkSuite) SetupTest() { suite.deltaCh = make(chan siderolinkmanager.LinkCounterDeltas) - suite.Require().NoError(suite.runtime.RegisterController(&omnictrl.MachineStatusController{ - ImageFactoryClient: &imageFactoryClientMock{}, - })) + suite.Require().NoError(suite.runtime.RegisterQController(omnictrl.NewMachineStatusController( + &imageFactoryClientMock{}, + ))) suite.Require().NoError(suite.runtime.RegisterController(omnictrl.NewMachineStatusLinkController(suite.deltaCh))) } diff --git a/internal/backend/runtime/omni/controllers/omni/machine_status_test.go b/internal/backend/runtime/omni/controllers/omni/machine_status_test.go index 47cb0afb..f84f690b 100644 --- a/internal/backend/runtime/omni/controllers/omni/machine_status_test.go +++ b/internal/backend/runtime/omni/controllers/omni/machine_status_test.go @@ -68,10 +68,6 @@ type MachineStatusSuite struct { func (suite *MachineStatusSuite) setup() { suite.startRuntime() - suite.Require().NoError(suite.runtime.RegisterController(&omnictrl.MachineStatusController{ - ImageFactoryClient: &imageFactoryClientMock{}, - })) - suite.Require().NoError( suite.machineService.state.Create(suite.ctx, runtime.NewSecurityStateSpec(runtime.NamespaceName)), ) @@ -80,6 +76,8 @@ func (suite *MachineStatusSuite) setup() { params.TypedSpec().Value.Args = testSchematicKernelArgs suite.Require().NoError(suite.state.Create(suite.ctx, params)) + + suite.Require().NoError(suite.runtime.RegisterQController(omnictrl.NewMachineStatusController(&imageFactoryClientMock{}))) } const testID = "testID" @@ -147,7 +145,7 @@ func (suite *MachineStatusSuite) TestMachineReportingEvents() { assert.Truef(ok, "reporting-events label not set") }) - suite.Assert().NoError(suite.state.Destroy(context.Background(), resource.NewMetadata(resources.DefaultNamespace, omni.MachineStatusSnapshotType, testID, resource.VersionUndefined))) + rtestutils.Destroy[*omni.MachineStatusSnapshot](suite.ctx, suite.T(), suite.state, []string{testID}) rtestutils.AssertResource(suite.ctx, suite.T(), suite.state, testID, func(status *omni.MachineStatus, assert *assert.Assertions) { _, ok := status.Metadata().Labels().Get(omni.MachineStatusLabelReportingEvents) @@ -235,7 +233,7 @@ func (suite *MachineStatusSuite) TestMachineUserLabels() { // reverts back to initial when the machine labels resource gets removed - suite.Require().NoError(suite.state.Destroy(ctx, machineLabels.Metadata())) + rtestutils.Destroy[*omni.MachineLabels](suite.ctx, suite.T(), suite.state, []string{testID}) rtestutils.AssertResource(ctx, suite.T(), suite.state, testID, func(status *omni.MachineStatus, assert *assert.Assertions) { val, ok := status.Metadata().Labels().Get("label1") diff --git a/internal/backend/runtime/omni/omni.go b/internal/backend/runtime/omni/omni.go index e9a1a5a1..5bbd5778 100644 --- a/internal/backend/runtime/omni/omni.go +++ b/internal/backend/runtime/omni/omni.go @@ -170,9 +170,6 @@ func New(talosClientFactory *talos.ClientFactory, dnsService *dns.Service, workl &omnictrl.LoadBalancerController{}, &omnictrl.MachineSetNodeController{}, &omnictrl.MachineSetDestroyStatusController{}, - &omnictrl.MachineStatusController{ - ImageFactoryClient: imageFactoryClient, - }, omnictrl.NewMachineCleanupController(), omnictrl.NewMachineStatusLinkController(linkCounterDeltaCh), &omnictrl.MachineStatusMetricsController{}, @@ -199,6 +196,7 @@ func New(talosClientFactory *talos.ClientFactory, dnsService *dns.Service, workl omnictrl.NewClusterMachineConfigController(config.Config.DefaultConfigGenOptions), omnictrl.NewClusterMachineTeardownController(discoveryClient), omnictrl.NewMachineConfigGenOptionsController(), + omnictrl.NewMachineStatusController(imageFactoryClient), omnictrl.NewClusterMachineConfigStatusController(), omnictrl.NewClusterMachineEncryptionKeyController(), omnictrl.NewClusterMachineStatusController(),