Skip to content

Commit

Permalink
More logging for raft/processInternalRaftRequest (#2389)
Browse files Browse the repository at this point in the history
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
  • Loading branch information
anshulpundir authored Sep 29, 2017
1 parent 941a018 commit 144ddc5
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,8 @@ func (n *Node) Run(ctx context.Context) error {
if rd.SoftState != nil {
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
log.G(ctx).Infof("soft state changed for node %x to not longer a leader, resetting and cancelling all waits", n.opts.ID)

if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Publish(IsFollower)
Expand Down Expand Up @@ -1679,6 +1681,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa

// Do this check after calling register to avoid a race.
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
log.G(ctx).Errorf("node %x is no longer leader, aborting propose", n.opts.ID)
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
Expand All @@ -1703,14 +1706,23 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
select {
case x, ok := <-ch:
if !ok {
// Wait notification channel was closed. This should only happen if the wait was cancelled.
log.G(ctx).Errorf("wait cancelled, likely because node %x lost leader position", n.opts.ID)
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Errorf("wait cancelled but node %x is still a leader.", n.opts.ID)
}
return nil, ErrLostLeadership
}
return x.(proto.Message), nil
case <-waitCtx.Done():
n.wait.cancel(r.ID)
// if channel is closed, wait item was canceled, otherwise it was triggered
// If we can read from the channel, wait item was triggered. Otherwise it was cancelled.
x, ok := <-ch
if !ok {
log.G(ctx).WithError(waitCtx.Err()).Errorf("wait context cancelled, likeyly because node %x lost leader position", n.opts.ID)
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Errorf("wait context cancelled but node %x is still a leader", n.opts.ID)
}
return nil, ErrLostLeadership
}
return x.(proto.Message), nil
Expand Down Expand Up @@ -1779,21 +1791,26 @@ func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error {
}

if !n.wait.trigger(r.ID, r) {
log.G(ctx).Errorf("wait not found for raft id %x", r.ID)

// There was no wait on this ID, meaning we don't have a
// transaction in progress that would be committed to the
// memory store by the "trigger" call. Either a different node
// wrote this to raft, or we wrote it before losing the leader
// position and cancelling the transaction. Create a new
// transaction to commit the data.
// position and cancelling the transaction. This entry still needs
// to be committed since other nodes have already committed it.
// Create a new transaction to commit this entry.

// It should not be possible for processInternalRaftRequest
// to be running in this situation, but out of caution we
// cancel any current invocations to avoid a deadlock.
// TODO(anshul) This call is likely redundant, remove after consideration.
n.wait.cancelAll()

err := n.memoryStore.ApplyStoreActions(r.Action)
if err != nil {
log.G(ctx).WithError(err).Error("failed to apply actions from raft")
// TODO(anshul) return err here ?
}
}
return nil
Expand Down

0 comments on commit 144ddc5

Please sign in to comment.