Skip to content

Commit

Permalink
liveness: allow registering callbacks after start
Browse files Browse the repository at this point in the history
I discovered[^1] a deadlock scenario when multiple nodes in the cluster restart
with additional stores that need to be bootstrapped. In that case, liveness
must be running when the StoreIDs are allocated, but it is not.

Trying to address this problem, I realized that when an auxiliary Store is bootstrapped,
it will create a new replicateQueue, which will register a new callback into NodeLiveness.

But if liveness must be started at this point to fix cockroachdb#106706, we'll run into the assertion
that checks that we don't register callbacks on a started node liveness.

Something's got to give: we will allow registering callbacks at any given point
in time, and they'll get an initial set of notifications synchronously. I
audited the few users of RegisterCallback and this seems OK with all of them.

[^1]: cockroachdb#106706 (comment)

Epic: None
Release note: None
  • Loading branch information
tbg committed Jul 20, 2023
1 parent 0c9f44f commit 7f9f94c
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,12 @@ type NodeLiveness struct {
nodeDialer *nodedialer.Dialer
engineSyncs *singleflight.Group

// onIsLive is a callback registered by stores prior to starting liveness.
// It fires when a node transitions from not live to live.
onIsLive []IsLiveCallback // see RegisterCallback
// onIsLiveMu holds callback registered by stores.
// They fire when a node transitions from not live to live.
onIsLiveMu struct {
syncutil.Mutex
callbacks []IsLiveCallback
} // see RegisterCallback

// onSelfHeartbeat is invoked after every successful heartbeat
// of the local liveness instance's heartbeat loop.
Expand Down Expand Up @@ -548,13 +551,16 @@ func (nl *NodeLiveness) cacheUpdated(old livenesspb.Liveness, new livenesspb.Liv
// Need to use a different signal to determine if liveness changed.
now := nl.clock.Now()
if !old.IsLive(now) && new.IsLive(now) {
// NB: If we are not started, we don't use the onIsLive callbacks since they
// NB: If we are not started, we don't use the onIsLiveMu callbacks since they
// can still change. This is a bit of a tangled mess since the startup of
// liveness requires the stores to be started, but stores can't start until
// liveness can run. Ideally we could cache all these updates and call
// onIsLive as part of start.
if nl.started.Get() {
for _, fn := range nl.onIsLive {
nl.onIsLiveMu.Lock()
fns := append([]IsLiveCallback(nil), nl.onIsLiveMu.callbacks...)
nl.onIsLiveMu.Unlock()
for _, fn := range fns {
fn(new)
}
}
Expand Down Expand Up @@ -641,13 +647,7 @@ func (nl *NodeLiveness) Start(ctx context.Context) {
nl.started.Set(true)
// We may have received some liveness records from Gossip prior to Start being
// called. We need to go through and notify all the callers of them now.
for _, entry := range nl.ScanNodeVitalityFromCache() {
if entry.IsLive(livenesspb.IsAliveNotification) {
for _, fn := range nl.onIsLive {
fn(entry.GetInternalLiveness())
}
}
}
nl.notifyIsAliveCallbacks()

_ = nl.stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "liveness-hb", SpanOpt: stop.SterileRootSpan}, func(context.Context) {
ambient := nl.ambientCtx
Expand Down Expand Up @@ -746,6 +746,16 @@ func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness livenesspb.Liven
return nl.heartbeatInternal(ctx, liveness, false /* increment epoch */)
}

func (nl *NodeLiveness) notifyIsAliveCallbacks(fns ...IsLiveCallback) {
for _, entry := range nl.ScanNodeVitalityFromCache() {
if entry.IsLive(livenesspb.IsAliveNotification) {
for _, fn := range fns {
fn(entry.GetInternalLiveness())
}
}
}
}

func (nl *NodeLiveness) heartbeatInternal(
ctx context.Context, oldLiveness livenesspb.Liveness, incrementEpoch bool,
) (err error) {
Expand Down Expand Up @@ -1077,13 +1087,15 @@ func (nl *NodeLiveness) Metrics() Metrics {
return nl.metrics
}

// RegisterCallback registers a callback to be invoked any time a
// node's IsLive() state changes to true. This must be called before Start.
// RegisterCallback registers a callback to be invoked any time a node's
// IsLive() state changes to true. The provided callback will be invoked
// synchronously from RegisterCallback if the node is currently live.
func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) {
if nl.started.Get() {
log.Fatalf(context.TODO(), "RegisterCallback called after Start")
}
nl.onIsLive = append(nl.onIsLive, cb)
nl.onIsLiveMu.Lock()
nl.onIsLiveMu.callbacks = append(nl.onIsLiveMu.callbacks, cb)
nl.onIsLiveMu.Unlock()

nl.notifyIsAliveCallbacks(cb)
}

// updateLiveness does a conditional put on the node liveness record for the
Expand Down

0 comments on commit 7f9f94c

Please sign in to comment.