Skip to content

Commit

Permalink
fix: ignore MachineStatus events timestamps as they're not reliable
Browse files Browse the repository at this point in the history
The timestamp can jump back and forths if the Talos node time is out of
sync, so we might ignore proper events.

So instead of reading the timestamp from the event, use Omni server time
at the point when the event comes to the system.

In the worst case we'll see the node going through several phases
quickly.

Signed-off-by: Artem Chernyshev <artem.chernyshev@talos-systems.com>
  • Loading branch information
Unix4ever committed May 28, 2024
1 parent ccca5b5 commit 9bce82a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
27 changes: 16 additions & 11 deletions internal/pkg/machinestatus/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/rs/xid"
"github.com/siderolabs/siderolink/pkg/events"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"go.uber.org/zap"
Expand All @@ -25,6 +24,17 @@ import (
"github.com/siderolabs/omni/internal/pkg/auth/actor"
)

// Clock is here to be able to mock time.Now() in the tests.
type Clock interface {
Now() time.Time
}

type clock struct{}

func (c *clock) Now() time.Time {
return time.Now()
}

type eventInfo struct {
timestamp time.Time
fromSideroLink bool
Expand All @@ -34,6 +44,7 @@ type eventInfo struct {
type Handler struct {
logger *zap.Logger
state state.State
Clock Clock

machineToLastEventInfo map[resource.ID]eventInfo

Expand All @@ -46,6 +57,7 @@ func NewHandler(state state.State, logger *zap.Logger) *Handler {
state: state,
logger: logger,
machineToLastEventInfo: make(map[resource.ID]eventInfo),
Clock: &clock{},
}
}

Expand Down Expand Up @@ -97,7 +109,7 @@ func (handler *Handler) handleMachineStatusResourceUpdate(ctx context.Context, m
}

if err := handler.handleMachineStatusEvent(ctx, talosMachineStatus.Status, machineStatus.Metadata().ID(),
talosMachineStatus.UpdatedAt.AsTime(), false); err != nil {
handler.Clock.Now(), false); err != nil {
handler.logger.Error("error handling machine status machineStatus", zap.Error(err))
}
}
Expand Down Expand Up @@ -140,14 +152,9 @@ func (handler *Handler) HandleEvent(ctx context.Context, event events.Event) err
return fmt.Errorf("no machines found for address %s", ip)
}

eventXID, err := xid.FromString(event.ID)
if err != nil {
return fmt.Errorf("error parsing event ID: %w", err)
}

switch event := event.Payload.(type) {
case *machineapi.MachineStatusEvent:
return handler.handleMachineStatusEvent(ctx, event, machines.Get(0).Metadata().ID(), eventXID.Time(), true)
return handler.handleMachineStatusEvent(ctx, event, machines.Get(0).Metadata().ID(), handler.Clock.Now(), true)
default: // nothing, we ignore other events
}

Expand Down Expand Up @@ -213,10 +220,8 @@ func (handler *Handler) acceptEvent(machineID resource.ID, timestamp time.Time,
// Both the source of the current event and the last one are from SideroLink
case lastEventInfo.fromSideroLink && currentEventInfo.fromSideroLink:
accept = true
// If the source of the current event differs from the last one and
// if the current event is newer than the last one, we accept the current one
case lastEventInfo.fromSideroLink != currentEventInfo.fromSideroLink &&
currentEventInfo.timestamp.After(lastEventInfo.timestamp):
case currentEventInfo.timestamp.After(lastEventInfo.timestamp):
accept = true
}

Expand Down
19 changes: 12 additions & 7 deletions internal/pkg/machinestatus/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/cosi-project/runtime/pkg/state/registry"
"github.com/jonboulle/clockwork"
"github.com/rs/xid"
"github.com/siderolabs/siderolink/pkg/events"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestHandler(t *testing.T) {
})

// send an event over siderolink & assert that it is stored
timestamp := time.Now()
timestamp := handler.Clock.Now()

sendEvent(ctx, t, handler, machineapi.MachineStatusEvent_BOOTING, timestamp)
assertStage(ctx, t, st, machineapi.MachineStatusEvent_BOOTING)
Expand All @@ -86,21 +87,21 @@ func TestHandler(t *testing.T) {
assertStage(ctx, t, st, machineapi.MachineStatusEvent_INSTALLING)

// send a machine status in the past - it should be ignored
sendMachineStatus(ctx, t, st, machineapi.MachineStatusEvent_BOOTING, timestamp.Add(-time.Second))
sendMachineStatus(ctx, t, handler, st, machineapi.MachineStatusEvent_BOOTING, timestamp.Add(-time.Second))
time.Sleep(100 * time.Millisecond)
assertStage(ctx, t, st, machineapi.MachineStatusEvent_INSTALLING)

// send a machine status in the future - it should be stored
sendMachineStatus(ctx, t, st, machineapi.MachineStatusEvent_REBOOTING, timestamp.Add(time.Second))
sendMachineStatus(ctx, t, handler, st, machineapi.MachineStatusEvent_REBOOTING, timestamp.Add(time.Second))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_REBOOTING)

// send an event in the past - it should be ignored
sendEvent(ctx, t, handler, machineapi.MachineStatusEvent_INSTALLING, timestamp.Add(-time.Second))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_REBOOTING)

// send an event in the future - it should be stored
sendEvent(ctx, t, handler, machineapi.MachineStatusEvent_REBOOTING, timestamp.Add(time.Second))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_REBOOTING)
sendEvent(ctx, t, handler, machineapi.MachineStatusEvent_MAINTENANCE, timestamp.Add(time.Second*2))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_MAINTENANCE)

// destroy and recreate the machine
require.NoError(t, st.Destroy(ctx, machine.Metadata()))
Expand All @@ -109,15 +110,17 @@ func TestHandler(t *testing.T) {
time.Sleep(100 * time.Millisecond)

// send a machine status in the past - it should be stored despite being in the past, as the state should be cleared on machine destroy
sendMachineStatus(ctx, t, st, machineapi.MachineStatusEvent_RESETTING, timestamp.Add(-time.Second))
sendMachineStatus(ctx, t, handler, st, machineapi.MachineStatusEvent_RESETTING, timestamp.Add(-time.Second))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_RESETTING)

cancel()

require.NoError(t, eg.Wait())
}

func sendMachineStatus(ctx context.Context, t *testing.T, st state.State, stage machineapi.MachineStatusEvent_MachineStage, timestamp time.Time) {
func sendMachineStatus(ctx context.Context, t *testing.T, handler *machinestatus.Handler, st state.State, stage machineapi.MachineStatusEvent_MachineStage, timestamp time.Time) {
handler.Clock = clockwork.NewFakeClockAt(timestamp)

machineStatus := omni.NewMachineStatus(resources.DefaultNamespace, machineID)

status := &specs.MachineStatusSpec_TalosMachineStatus{
Expand Down Expand Up @@ -146,6 +149,8 @@ func sendMachineStatus(ctx context.Context, t *testing.T, st state.State, stage
}

func sendEvent(ctx context.Context, t *testing.T, handler *machinestatus.Handler, stage machineapi.MachineStatusEvent_MachineStage, timestamp time.Time) {
handler.Clock = clockwork.NewFakeClockAt(timestamp)

err := handler.HandleEvent(ctx, events.Event{
Payload: &machineapi.MachineStatusEvent{
Stage: stage,
Expand Down

0 comments on commit 9bce82a

Please sign in to comment.