diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2cbfc8a943..52a20f516c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -956,6 +956,42 @@ type recoveryUpdates struct { updateConsumers map[string]*consumerAssignment } +// Called after recovery of the cluster on startup to check for any orphans. +// Streams and consumers are recovered from disk, and the meta layer's mappings +// should clean them up, but under crash scenarios there could be orphans. +func (js *jetStream) checkForOrphans() { + js.mu.Lock() + defer js.mu.Unlock() + + consumerName := func(o *consumer) string { + o.mu.RLock() + defer o.mu.RUnlock() + return o.name + } + + s, cc := js.srv, js.cluster + s.Debugf("JetStream cluster checking for orphans") + + for accName, jsa := range js.accounts { + asa := cc.streams[accName] + for stream, mset := range jsa.streams { + if sa := asa[stream]; sa == nil { + s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream) + mset.delete() + } else { + // This one is good, check consumers now. + for _, o := range mset.getConsumers() { + consumer := consumerName(o) + if sa.consumers[consumer] == nil { + s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + o.delete() + } + } + } + } + } +} + func (js *jetStream) monitorCluster() { s, n := js.server(), js.getMetaGroup() qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ() @@ -1001,7 +1037,7 @@ func (js *jetStream) monitorCluster() { if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else if err != errNoSnapAvailable { + } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } @@ -1022,7 +1058,7 @@ func (js *jetStream) monitorCluster() { return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. - n.InstallSnapshot(js.metaSnapshot()) + doSnapshot() // Return the signal back since shutdown will be waiting. close(qch) return @@ -1050,15 +1086,18 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") + js.checkForOrphans() continue } // FIXME(dlc) - Deal with errors. - if didSnap, didStreamRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { + if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() + } else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 { + doSnapshot() } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } @@ -1070,9 +1109,9 @@ func (js *jetStream) monitorCluster() { if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) - // Optionally install a snapshot as we become leader. - doSnapshot() + // Install a snapshot as we become leader. js.checkClusterSize() + doSnapshot() } case <-t.C: @@ -1274,7 +1313,6 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove // Do removals first. for _, sa := range saDel { js.setStreamAssignmentRecovering(sa) - if isRecovering { key := sa.recoveryKey() ru.removeStreams[key] = sa @@ -1539,8 +1577,8 @@ func (ca *consumerAssignment) recoveryKey() string { return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name } -func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, error) { - var didSnap, didRemove bool +func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) { + var didSnap, didRemoveStream, didRemoveConsumer bool isRecovering := js.isMetaRecovering() for _, e := range entries { @@ -1562,20 +1600,20 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo sa, err := decodeStreamAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemove, err + return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setStreamAssignmentRecovering(sa) delete(ru.removeStreams, sa.recoveryKey()) } if js.processStreamAssignment(sa) { - didRemove = true + didRemoveStream = true } case removeStreamOp: sa, err := decodeStreamAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemove, err + return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -1584,13 +1622,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.updateStreams, key) } else { js.processStreamRemoval(sa) - didRemove = true + didRemoveStream = true } case assignConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) - return didSnap, didRemove, err + return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -1604,7 +1642,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ca, err := decodeConsumerAssignmentCompressed(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:]) - return didSnap, didRemove, err + return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -1618,7 +1656,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) - return didSnap, didRemove, err + return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -1627,13 +1665,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.updateConsumers, key) } else { js.processConsumerRemoval(ca) - didRemove = true + didRemoveConsumer = true } case updateStreamOp: sa, err := decodeStreamAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemove, err + return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -1648,7 +1686,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } } } - return didSnap, didRemove, nil + return didSnap, didRemoveStream, didRemoveConsumer, nil } func (rg *raftGroup) isMember(id string) bool { @@ -1896,7 +1934,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else if err != errNoSnapAvailable { + } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } } @@ -2987,6 +3025,18 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss mset, err := acc.lookupStream(cfg.Name) if err == nil && mset != nil { + // Make sure we have not had a new group assigned to us. + if osa.Group.Name != sa.Group.Name { + s.Warnf("JetStream cluster detected stream remapping for '%s > %s' from %q to %q", + acc, cfg.Name, osa.Group.Name, sa.Group.Name) + mset.removeNode() + alreadyRunning, needsNode = false, true + // Make sure to clear from original. + js.mu.Lock() + osa.Group.node = nil + js.mu.Unlock() + } + var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { @@ -4062,7 +4112,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - } else if err != errNoSnapAvailable { + } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err) } } @@ -7607,7 +7657,6 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _ func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg - stype := mset.cfg.Storage isLeader := mset.isLeader() mset.mu.RUnlock() @@ -7617,9 +7666,9 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) { return } - // If we are here we are in a compromised state due to server limits let someone else answer if they can. - if !isLeader && js.limitsExceeded(stype) { - time.Sleep(100 * time.Millisecond) + // If we are not the leader let someone else possible respond first. + if !isLeader { + time.Sleep(200 * time.Millisecond) } si := &StreamInfo{ diff --git a/server/raft.go b/server/raft.go index b98db1976b..6a4ed96b78 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1481,6 +1481,7 @@ func (n *raft) shutdown(shouldDelete bool) { n.Unlock() return } + close(n.quit) if c := n.c; c != nil { var subs []*subscription @@ -2346,7 +2347,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.Unlock() return } - n.debug("Snapshot sent, reset first entry to %d", lastIndex) + n.debug("Snapshot sent, reset first catchup entry to %d", lastIndex) } } @@ -2646,14 +2647,13 @@ func (n *raft) runAsCandidate() { // if we observe a bigger term, we should start over again or risk forming a quorum fully knowing // someone with a better term exists. This is even the right thing to do if won == true. n.Lock() + n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term) n.term = vresp.term n.vote = noVote n.writeTermVote() - n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term) n.stepdown.push(noLeader) n.lxfer = false n.Unlock() - return } case <-n.reqs.ch: // Because of drain() it is possible that we get nil from popOne(). @@ -2746,6 +2746,13 @@ func (n *raft) truncateWAL(term, index uint64) { os.Remove(n.snapfile) n.snapfile = _EMPTY_ } + // Make sure to reset commit and applied if above + if n.commit > n.pindex { + n.commit = n.pindex + } + if n.applied > n.commit { + n.applied = n.commit + } }() if err := n.wal.Truncate(index); err != nil { @@ -2763,10 +2770,10 @@ func (n *raft) truncateWAL(term, index uint64) { // Set after we know we have truncated properly. n.pterm, n.pindex = term, index - } // Reset our WAL. +// Lock should be held. func (n *raft) resetWAL() { n.truncateWAL(0, 0) } @@ -2782,7 +2789,6 @@ func (n *raft) updateLeader(newLeader string) { // processAppendEntry will process an appendEntry. func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Lock() - // Don't reset here if we have been asked to assume leader position. if !n.lxfer { n.resetElectionTimeout() @@ -2912,22 +2918,17 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { var ar *appendEntryResponse var success bool - eae, err := n.loadEntry(ae.pindex) - // If terms mismatched, or we got an error loading, delete that entry and all others past it. - if eae != nil && ae.pterm > eae.pterm || err != nil { - // Truncate will reset our pterm and pindex. Only do so if we have an entry. - if eae != nil { - n.truncateWAL(ae.pterm, ae.pindex) - } - // Make sure to cancel any catchups in progress. - if catchingUp { - n.cancelCatchup() - } + if eae, _ := n.loadEntry(ae.pindex); eae == nil { + n.resetWAL() } else { - // Inherit regardless. - n.pterm = ae.pterm - success = true + // If terms mismatched, or we got an error loading, delete that entry and all others past it. + // Make sure to cancel any catchups in progress. + // Truncate will reset our pterm and pindex. Only do so if we have an entry. + n.truncateWAL(ae.pterm, ae.pindex) } + // Cancel regardless. + n.cancelCatchup() + // Create response. ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, success, _EMPTY_} n.Unlock() @@ -3384,7 +3385,12 @@ func (n *raft) setWriteErrLocked(err error) { } // Ignore non-write errors. if err != nil { - if err == ErrStoreClosed || err == ErrStoreEOF || err == ErrInvalidSequence || err == ErrStoreMsgNotFound || err == errNoPending { + if err == ErrStoreClosed || + err == ErrStoreEOF || + err == ErrInvalidSequence || + err == ErrStoreMsgNotFound || + err == errNoPending || + err == errPartialCache { return } // If this is a not found report but do not disable. @@ -3560,10 +3566,15 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { // Only way we get to yes is through here. voteOk := n.vote == noVote || n.vote == vr.candidate - if voteOk && vr.lastTerm >= n.pterm && vr.lastIndex >= n.pindex { + if voteOk && (vr.lastTerm > n.pterm || vr.lastTerm == n.pterm && vr.lastIndex >= n.pindex) { vresp.granted = true n.vote = vr.candidate n.writeTermVote() + } else { + if vr.term >= n.term && n.vote == noVote { + n.term = vr.term + n.resetElect(randCampaignTimeout()) + } } n.Unlock()