Skip to content

Commit

Permalink
liveness: only call onSelfHeartbeat on self
Browse files Browse the repository at this point in the history
I think there was a bug here. This method was previously invoked in
`updateLiveness`, but that method is the general workhorse for updating
anyone's liveness. In particular, it is called by `IncrementEpoch`. So we were
invoking `onSelfHeartbeat` when we would increment other nodes' epochs. This
doesn't seem great.

Additionally, the code was trying to avoid invoking this callback before
liveness was officially "started". Heartbeating yourself before liveness is
started is unfortunately a thing due to the tangled start-up initialization
sequence; we may see heartbeats triggered by lease requests.

Avoid both complications by invoking `onSelfCallback` from the actual main
heartbeat loop, whose only job is to heartbeat the own liveness record.

I tried to adopt `TestNodeHeartbeatCallback` to give better coverage, but
it's a yak shave. A deterministic node liveness (i.e. a way to invoke the
main heartbeat loop manually) would make this a lot simpler. I filed an
issue to that effect:

cockroachdb#107452
  • Loading branch information
tbg committed Jul 24, 2023
1 parent f7e72f9 commit 3bf63a6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 45 deletions.
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ func (nl *NodeLiveness) Start(ctx context.Context) {
return nil
}); err != nil {
log.Warningf(ctx, heartbeatFailureLogFormat, err)
} else if nl.onSelfHeartbeat != nil {
nl.onSelfHeartbeat(ctx)
}

nl.heartbeatToken <- struct{}{}
Expand Down Expand Up @@ -1125,9 +1127,6 @@ func (nl *NodeLiveness) updateLiveness(
}
return Record{}, err
}
if nl.started.Get() && nl.onSelfHeartbeat != nil {
nl.onSelfHeartbeat(ctx)
}
return written, nil
}
if err := ctx.Err(); err != nil {
Expand Down
47 changes: 5 additions & 42 deletions pkg/kv/kvserver/node_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,63 +355,26 @@ func TestNodeHeartbeatCallback(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
manualClock := hlc.NewHybridManualClock()
expected := manualClock.UnixNano()
tc := testcluster.StartTestCluster(t, 3,
tc := testcluster.StartTestCluster(t, 1,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
},
},
})
defer tc.Stopper().Stop(ctx)

// Verify liveness of all nodes for all nodes.
verifyLiveness(t, tc)
pauseNodeLivenessHeartbeatLoops(tc)

// Verify that last update time has been set for all nodes.
verifyUptimes := func() error {
verifyUptimes := func() {
for i := range tc.Servers {
s := tc.GetFirstStoreFromServer(t, i)
uptm, err := s.ReadLastUpTimestamp(context.Background())
if err != nil {
return errors.Wrapf(err, "error reading last up time from store %d", i)
}
if a, e := uptm.WallTime, expected; a < e {
return errors.Errorf("store %d last uptime = %d; wanted %d", i, a, e)
}
require.NoError(t, err)
require.NotZero(t, uptm)
}
return nil
}

if err := verifyUptimes(); err != nil {
t.Fatal(err)
}

// Advance clock past the liveness threshold and force a manual heartbeat on
// all node liveness objects, which should update the last up time for each
// store.
manualClock.Increment(tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).TestingGetLivenessThreshold().Nanoseconds() + 1)
expected = manualClock.UnixNano()
for _, s := range tc.Servers {
nl := s.NodeLiveness().(*liveness.NodeLiveness)
l, ok := nl.Self()
assert.True(t, ok)
if err := nl.Heartbeat(context.Background(), l); err != nil {
t.Fatal(err)
}
}
// NB: since the heartbeat callback is invoked synchronously in
// `Heartbeat()` which this goroutine invoked, we don't need to wrap this in
// a retry.
if err := verifyUptimes(); err != nil {
t.Fatal(err)
}
verifyUptimes()
}

// TestNodeLivenessEpochIncrement verifies that incrementing the epoch
Expand Down

0 comments on commit 3bf63a6

Please sign in to comment.