Skip to content

Commit

Permalink
kv: retry liveness heartbeat on race with insufficient expiration
Browse files Browse the repository at this point in the history
We suspect that this is the cause of the test failure in cockroachdb#124693.

Addresses a TODO added in 1dc18df.

Release note: None
  • Loading branch information
nvanbenschoten committed Jun 10, 2024
1 parent e09f535 commit 90e8aff
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
15 changes: 3 additions & 12 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,9 @@ var errNodeAlreadyLive = errors.New("node already live")
//
// If this method returns nil, the node's liveness has been extended,
// relative to the previous value. It may or may not still be alive
// when this method returns.
// when this method returns. It may also not have been extended as far
// as the livenessThreshold, because the caller may have raced with
// another heartbeater.
//
// On failure, this method returns ErrEpochIncremented, although this
// may not necessarily mean that the epoch was actually incremented.
Expand Down Expand Up @@ -839,17 +841,6 @@ func (nl *NodeLiveness) heartbeatInternal(
// expired while in flight, so maybe we don't have to care about
// that and only need to distinguish between same and different
// epochs in our return value.
//
// TODO(nvanbenschoten): Unlike the early return above, this doesn't
// guarantee that the resulting expiration is past minExpiration,
// only that it's different than our oldLiveness. Is that ok? It
// hasn't caused issues so far, but we might want to detect this
// case and retry, at least in the case of the liveness heartbeat
// loop. The downside of this is that a heartbeat that's intending
// to bump the expiration of a record out 9s into the future may
// return a success even if the expiration is only 5 seconds in the
// future. The next heartbeat will then start with only 0.5 seconds
// before expiration.
if actual.IsLive(nl.clock.Now()) && !incrementEpoch {
return errNodeAlreadyLive
}
Expand Down
37 changes: 24 additions & 13 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,20 +538,31 @@ func (p *pendingLeaseRequest) requestLease(
// interval are the same.
expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch
if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness)
if err != nil {
if logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
curLiveness := reqLeaseLiveness
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, curLiveness)
if err != nil {
if logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
// Assert that the liveness record expiration is now greater than the
// expiration of the lease we're promoting.
l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+
"expiration of the previous lease %s after liveness heartbeat", l, status.Lease)
// Check whether the liveness record expiration is now greater than the
// expiration of the lease we're promoting. If not, we may have raced with
// another liveness heartbeat which did not extend the liveness expiration
// far enough and we should try again.
l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
if !ok {
return errors.NewAssertionErrorWithWrappedErrf(liveness.ErrRecordCacheMiss, "after heartbeat")
}
if l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
log.Infof(ctx, "expiration of liveness record %s is not greater than "+
"expiration of the previous lease %s after liveness heartbeat, retrying...", l, status.Lease)
curLiveness = l.Liveness
continue
}
break
}
}

Expand Down

0 comments on commit 90e8aff

Please sign in to comment.