Skip to content

Commit

Permalink
fix: use MachineStatus resource to check for boot done
Browse files Browse the repository at this point in the history
The previous implementation used old events API, which had several
issues:

* buffer overruns, and weird checks
* big timeout even if the all nodes are booted up

Replace that with direct reading of `MachineStatus` resource which is
available since Talos 1.2.0.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Feb 15, 2024
1 parent 15e8bca commit 559308e
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 59 deletions.
14 changes: 7 additions & 7 deletions pkg/cluster/check/default.go
Expand Up @@ -15,13 +15,6 @@ import (
// DefaultClusterChecks returns a set of default Talos cluster readiness checks.
func DefaultClusterChecks() []ClusterCheck {
return append(PreBootSequenceChecks(), []ClusterCheck{
// wait for all nodes to finish booting
func(cluster ClusterInfo) conditions.Condition {
return conditions.PollingCondition("all nodes to finish boot sequence", func(ctx context.Context) error {
return AllNodesBootedAssertion(ctx, cluster)
}, 5*time.Minute, 5*time.Second)
},

// wait for all the nodes to report in at k8s level
func(cluster ClusterInfo) conditions.Condition {
return conditions.PollingCondition("all k8s nodes to report", func(ctx context.Context) error {
Expand Down Expand Up @@ -149,5 +142,12 @@ func PreBootSequenceChecks() []ClusterCheck {
return ServiceHealthAssertion(ctx, cluster, "kubelet", WithNodeTypes(machine.TypeInit, machine.TypeControlPlane))
}, 5*time.Minute, 5*time.Second)
},

// wait for all nodes to finish booting
func(cluster ClusterInfo) conditions.Condition {
return conditions.PollingCondition("all nodes to finish boot sequence", func(ctx context.Context) error {
return AllNodesBootedAssertion(ctx, cluster)
}, 5*time.Minute, 5*time.Second)
},
}
}
138 changes: 86 additions & 52 deletions pkg/cluster/check/events.go
Expand Up @@ -6,15 +6,17 @@ package check

import (
"context"
"errors"
"fmt"
"sort"
"slices"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/maps"
"github.com/siderolabs/gen/xslices"

machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
)

// AllNodesBootedAssertion checks whether nodes reached end of 'Boot' sequence.
Expand All @@ -27,69 +29,101 @@ func AllNodesBootedAssertion(ctx context.Context, cluster ClusterInfo) error {
}

nodes := cluster.Nodes()

nodeInternalIPs := mapIPsToStrings(mapNodeInfosToInternalIPs(nodes))

ctx, cancel := context.WithCancel(ctx)
nodesCtx := client.WithNodes(ctx, nodeInternalIPs...)

nodesBootStarted := map[string]struct{}{}
nodesBootStopped := map[string]struct{}{}

err = cli.EventsWatch(nodesCtx, func(ch <-chan client.Event) {
defer cancel()

for event := range ch {
if msg, ok := event.Payload.(*machineapi.SequenceEvent); ok {
if msg.GetSequence() == "boot" { // can't use runtime constants as they're in `internal/`
switch msg.GetAction() { //nolint:exhaustive
case machineapi.SequenceEvent_START:
nodesBootStarted[event.Node] = struct{}{}
case machineapi.SequenceEvent_STOP:
nodesBootStopped[event.Node] = struct{}{}
}
defer cancel()

type eventWithNode struct {
node string
event state.Event
}

eventCh := make(chan eventWithNode)

for _, nodeIP := range nodeInternalIPs {
nodeEventCh := make(chan state.Event)

if err = cli.COSI.Watch(client.WithNode(ctx, nodeIP), runtime.NewMachineStatus().Metadata(), nodeEventCh); err != nil {
return err
}

go func(nodeIP string) {
for {
select {
case <-ctx.Done():
return
case ev := <-nodeEventCh:
channel.SendWithContext(ctx, eventCh, eventWithNode{node: nodeIP, event: ev})
}
}
}(nodeIP)
}

nodeStages := make(map[string]runtime.MachineStage, len(nodeInternalIPs))

for _, nodeIP := range nodeInternalIPs {
nodeStages[nodeIP] = runtime.MachineStageUnknown
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-eventCh:
switch ev.event.Type {
case state.Created, state.Updated:
machineStatus, ok := ev.event.Resource.(*runtime.MachineStatus)
if !ok {
return fmt.Errorf("unexpected resource type: %T", ev.event.Resource)
}

nodeStages[ev.node] = machineStatus.TypedSpec().Stage
case state.Destroyed, state.Bootstrapped:
// nothing
case state.Errored:
return fmt.Errorf("error watching machine %s status: %w", ev.node, ev.event.Error)
}
}
}, client.WithTailEvents(-1))
if err != nil {
unwrappedErr := err

for {
if s, ok := status.FromError(unwrappedErr); ok && s.Code() == codes.DeadlineExceeded {
// ignore deadline exceeded as we've just exhausted events list
err = nil
allNodesRunning := true
allNodesReported := true
stageWithNodes := map[runtime.MachineStage][]string{}

break
for nodeIP, stage := range nodeStages {
if stage != runtime.MachineStageRunning {
allNodesRunning = false
}

unwrappedErr = errors.Unwrap(unwrappedErr)
if unwrappedErr == nil {
break
if stage == runtime.MachineStageUnknown {
allNodesReported = false
}
}
}

if err != nil {
return err
}
stageWithNodes[stage] = append(stageWithNodes[stage], nodeIP)
}

nodesNotFinishedBooting := []string{}
if !allNodesReported {
// keep waiting for data from all nodes
continue
}

// check for nodes which have Boot/Start event, but no Boot/Stop even
// if the node is up long enough, Boot/Start even might get out of the window,
// so we can't check such nodes reliably
for node := range nodesBootStarted {
if _, ok := nodesBootStopped[node]; !ok {
nodesNotFinishedBooting = append(nodesNotFinishedBooting, node)
if allNodesRunning {
return nil
}
}

sort.Strings(nodesNotFinishedBooting)
// if we're here, not all nodes are running
delete(stageWithNodes, runtime.MachineStageRunning)

if len(nodesNotFinishedBooting) > 0 {
return fmt.Errorf("nodes %q are still in boot sequence", nodesNotFinishedBooting)
}
stages := maps.Keys(stageWithNodes)
slices.Sort(stages)

return nil
message := xslices.Map(stages, func(stage runtime.MachineStage) string {
nodeIPs := stageWithNodes[stage]
slices.Sort(nodeIPs)

return fmt.Sprintf("%s: %v", stage, nodeIPs)
})

return fmt.Errorf("nodes are not running: %s", strings.Join(message, ", "))
}
}

0 comments on commit 559308e

Please sign in to comment.