Skip to content

Commit

Permalink
[FIXED] Clustering: channel first/last sequence may fall to zero
Browse files Browse the repository at this point in the history
This could happen if the leader took a snapshot while messages were
not yet expired, then a node is started without state and tries
to restore from this snapshot. If the messages have expired by then,
no message would be stored. If that node later did a snapshot itself,
it would persist in it the first/last being zero. If no message are
published and this node becomes leader, it would start storing
messages at the wrong sequence and would also send the bad snapshot
to other nodes.

Resolves #833

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed May 18, 2019
1 parent 7b9c7fe commit 13b493b
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 106 deletions.
55 changes: 46 additions & 9 deletions server/clustering.go
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/raft"
Expand Down Expand Up @@ -451,6 +452,32 @@ func (s *StanServer) getClusteringPeerAddr(raftName, nodeID string) string {
return fmt.Sprintf("%s.%s.%s", s.opts.ID, nodeID, raftName)
}

// Returns the message store first and last sequence.
// When in clustered mode, if the first and last are 0, returns the value of
// the last sequence that we possibly got from the last snapshot. If a node
// restores a snapshot that let's say has first=1 and last=100, but when it
// tries to get these messages from the leader, the leader does not send them
// back because they have all expired, the node will not store anything.
// If we just rely on store's first/last, this node would use and report 0
// for channel's first and last while when all messages have expired, it should
// be last+1/last.
func (s *StanServer) getChannelFirstAndlLastSeq(c *channel) (uint64, uint64, error) {
first, last, err := c.store.Msgs.FirstAndLastSequence()
if !s.isClustered {
return first, last, err
}
if err != nil {
return 0, 0, err
}
if first == 0 && last == 0 {
if fseq := atomic.LoadUint64(&c.firstSeq); fseq != 0 {
first = fseq
last = fseq - 1
}
}
return first, last, nil
}

// Apply log is invoked once a log entry is committed.
// It returns a value which will be made available in the
// ApplyFuture returned by Raft.Apply method if that
Expand All @@ -461,13 +488,16 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
if err := op.Unmarshal(l.Data); err != nil {
panic(err)
}
// We don't want snapshot Persist() and Apply() to execute concurrently,
// so use common lock.
r.Lock()
defer r.Unlock()
switch op.OpType {
case spb.RaftOperation_Publish:
// Message replication.
var (
c *channel
err error
lastSeq uint64
c *channel
err error
)
for _, msg := range op.PublishBatch.Messages {
// This is a batch for a given channel, so lookup channel once.
Expand All @@ -477,18 +507,25 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
// just bail out.
if err == ErrChanDelInProgress {
return nil
} else if err == nil && !c.lSeqChecked {
// If msg.Sequence is > 1, then make sure we have no gap.
if msg.Sequence > 1 {
// We pass `1` for the `first` sequence. The function we call
// will do the right thing when it comes to restore possible
// missing messages.
err = s.raft.fsm.restoreMsgsFromSnapshot(c, 1, msg.Sequence-1, true)
}
if err == nil {
c.lSeqChecked = true
}
}
lastSeq, err = c.store.Msgs.LastSequence()
}
if err == nil && lastSeq < msg.Sequence-1 {
err = s.raft.fsm.restoreMsgsFromSnapshot(c, lastSeq+1, msg.Sequence-1)
}
if err == nil {
_, err = c.store.Msgs.Store(msg)
}
if err != nil {
return fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, msg.Subject, err)
panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, msg.Subject, err))
}
}
return nil
Expand Down

0 comments on commit 13b493b

Please sign in to comment.