diff --git a/pkg/cluster/check/default.go b/pkg/cluster/check/default.go index fd5a63d2ac..5f6478410b 100644 --- a/pkg/cluster/check/default.go +++ b/pkg/cluster/check/default.go @@ -15,13 +15,6 @@ import ( // DefaultClusterChecks returns a set of default Talos cluster readiness checks. func DefaultClusterChecks() []ClusterCheck { return append(PreBootSequenceChecks(), []ClusterCheck{ - // wait for all nodes to finish booting - func(cluster ClusterInfo) conditions.Condition { - return conditions.PollingCondition("all nodes to finish boot sequence", func(ctx context.Context) error { - return AllNodesBootedAssertion(ctx, cluster) - }, 5*time.Minute, 5*time.Second) - }, - // wait for all the nodes to report in at k8s level func(cluster ClusterInfo) conditions.Condition { return conditions.PollingCondition("all k8s nodes to report", func(ctx context.Context) error { @@ -149,5 +142,12 @@ func PreBootSequenceChecks() []ClusterCheck { return ServiceHealthAssertion(ctx, cluster, "kubelet", WithNodeTypes(machine.TypeInit, machine.TypeControlPlane)) }, 5*time.Minute, 5*time.Second) }, + + // wait for all nodes to finish booting + func(cluster ClusterInfo) conditions.Condition { + return conditions.PollingCondition("all nodes to finish boot sequence", func(ctx context.Context) error { + return AllNodesBootedAssertion(ctx, cluster) + }, 5*time.Minute, 5*time.Second) + }, } } diff --git a/pkg/cluster/check/events.go b/pkg/cluster/check/events.go index 3e57fb66cf..d713de1aef 100644 --- a/pkg/cluster/check/events.go +++ b/pkg/cluster/check/events.go @@ -6,15 +6,17 @@ package check import ( "context" - "errors" "fmt" - "sort" + "slices" + "strings" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "github.com/cosi-project/runtime/pkg/state" + "github.com/siderolabs/gen/channel" + "github.com/siderolabs/gen/maps" + "github.com/siderolabs/gen/xslices" - machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine" "github.com/siderolabs/talos/pkg/machinery/client" + "github.com/siderolabs/talos/pkg/machinery/resources/runtime" ) // AllNodesBootedAssertion checks whether nodes reached end of 'Boot' sequence. @@ -27,69 +29,101 @@ func AllNodesBootedAssertion(ctx context.Context, cluster ClusterInfo) error { } nodes := cluster.Nodes() - nodeInternalIPs := mapIPsToStrings(mapNodeInfosToInternalIPs(nodes)) ctx, cancel := context.WithCancel(ctx) - nodesCtx := client.WithNodes(ctx, nodeInternalIPs...) - - nodesBootStarted := map[string]struct{}{} - nodesBootStopped := map[string]struct{}{} - - err = cli.EventsWatch(nodesCtx, func(ch <-chan client.Event) { - defer cancel() - - for event := range ch { - if msg, ok := event.Payload.(*machineapi.SequenceEvent); ok { - if msg.GetSequence() == "boot" { // can't use runtime constants as they're in `internal/` - switch msg.GetAction() { //nolint:exhaustive - case machineapi.SequenceEvent_START: - nodesBootStarted[event.Node] = struct{}{} - case machineapi.SequenceEvent_STOP: - nodesBootStopped[event.Node] = struct{}{} - } + defer cancel() + + type eventWithNode struct { + node string + event state.Event + } + + eventCh := make(chan eventWithNode) + + for _, nodeIP := range nodeInternalIPs { + nodeEventCh := make(chan state.Event) + + if err = cli.COSI.Watch(client.WithNode(ctx, nodeIP), runtime.NewMachineStatus().Metadata(), nodeEventCh); err != nil { + return err + } + + go func(nodeIP string) { + for { + select { + case <-ctx.Done(): + return + case ev := <-nodeEventCh: + channel.SendWithContext(ctx, eventCh, eventWithNode{node: nodeIP, event: ev}) } } + }(nodeIP) + } + + nodeStages := make(map[string]runtime.MachineStage, len(nodeInternalIPs)) + + for _, nodeIP := range nodeInternalIPs { + nodeStages[nodeIP] = runtime.MachineStageUnknown + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-eventCh: + switch ev.event.Type { + case state.Created, state.Updated: + machineStatus, ok := ev.event.Resource.(*runtime.MachineStatus) + if !ok { + return fmt.Errorf("unexpected resource type: %T", ev.event.Resource) + } + + nodeStages[ev.node] = machineStatus.TypedSpec().Stage + case state.Destroyed, state.Bootstrapped: + // nothing + case state.Errored: + return fmt.Errorf("error watching machine %s status: %w", ev.node, ev.event.Error) + } } - }, client.WithTailEvents(-1)) - if err != nil { - unwrappedErr := err - for { - if s, ok := status.FromError(unwrappedErr); ok && s.Code() == codes.DeadlineExceeded { - // ignore deadline exceeded as we've just exhausted events list - err = nil + allNodesRunning := true + allNodesReported := true + stageWithNodes := map[runtime.MachineStage][]string{} - break + for nodeIP, stage := range nodeStages { + if stage != runtime.MachineStageRunning { + allNodesRunning = false } - unwrappedErr = errors.Unwrap(unwrappedErr) - if unwrappedErr == nil { - break + if stage == runtime.MachineStageUnknown { + allNodesReported = false } - } - } - if err != nil { - return err - } + stageWithNodes[stage] = append(stageWithNodes[stage], nodeIP) + } - nodesNotFinishedBooting := []string{} + if !allNodesReported { + // keep waiting for data from all nodes + continue + } - // check for nodes which have Boot/Start event, but no Boot/Stop even - // if the node is up long enough, Boot/Start even might get out of the window, - // so we can't check such nodes reliably - for node := range nodesBootStarted { - if _, ok := nodesBootStopped[node]; !ok { - nodesNotFinishedBooting = append(nodesNotFinishedBooting, node) + if allNodesRunning { + return nil } - } - sort.Strings(nodesNotFinishedBooting) + // if we're here, not all nodes are running + delete(stageWithNodes, runtime.MachineStageRunning) - if len(nodesNotFinishedBooting) > 0 { - return fmt.Errorf("nodes %q are still in boot sequence", nodesNotFinishedBooting) - } + stages := maps.Keys(stageWithNodes) + slices.Sort(stages) - return nil + message := xslices.Map(stages, func(stage runtime.MachineStage) string { + nodeIPs := stageWithNodes[stage] + slices.Sort(nodeIPs) + + return fmt.Sprintf("%s: %v", stage, nodeIPs) + }) + + return fmt.Errorf("nodes are not running: %s", strings.Join(message, ", ")) + } }