Skip to content

Commit

Permalink
Merge 562f52e into 7b9c7fe
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed May 18, 2019
2 parents 7b9c7fe + 562f52e commit 1a3f407
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 107 deletions.
55 changes: 46 additions & 9 deletions server/clustering.go
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/raft"
Expand Down Expand Up @@ -451,6 +452,32 @@ func (s *StanServer) getClusteringPeerAddr(raftName, nodeID string) string {
return fmt.Sprintf("%s.%s.%s", s.opts.ID, nodeID, raftName)
}

// Returns the message store first and last sequence.
// When in clustered mode, if the first and last are 0, returns the value of
// the last sequence that we possibly got from the last snapshot. If a node
// restores a snapshot that let's say has first=1 and last=100, but when it
// tries to get these messages from the leader, the leader does not send them
// back because they have all expired, the node will not store anything.
// If we just rely on store's first/last, this node would use and report 0
// for channel's first and last while when all messages have expired, it should
// be last+1/last.
func (s *StanServer) getChannelFirstAndlLastSeq(c *channel) (uint64, uint64, error) {
first, last, err := c.store.Msgs.FirstAndLastSequence()
if !s.isClustered {
return first, last, err
}
if err != nil {
return 0, 0, err
}
if first == 0 && last == 0 {
if fseq := atomic.LoadUint64(&c.firstSeq); fseq != 0 {
first = fseq
last = fseq - 1
}
}
return first, last, nil
}

// Apply log is invoked once a log entry is committed.
// It returns a value which will be made available in the
// ApplyFuture returned by Raft.Apply method if that
Expand All @@ -461,13 +488,16 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
if err := op.Unmarshal(l.Data); err != nil {
panic(err)
}
// We don't want snapshot Persist() and Apply() to execute concurrently,
// so use common lock.
r.Lock()
defer r.Unlock()
switch op.OpType {
case spb.RaftOperation_Publish:
// Message replication.
var (
c *channel
err error
lastSeq uint64
c *channel
err error
)
for _, msg := range op.PublishBatch.Messages {
// This is a batch for a given channel, so lookup channel once.
Expand All @@ -477,18 +507,25 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
// just bail out.
if err == ErrChanDelInProgress {
return nil
} else if err == nil && !c.lSeqChecked {
// If msg.Sequence is > 1, then make sure we have no gap.
if msg.Sequence > 1 {
// We pass `1` for the `first` sequence. The function we call
// will do the right thing when it comes to restore possible
// missing messages.
err = s.raft.fsm.restoreMsgsFromSnapshot(c, 1, msg.Sequence-1, true)
}
if err == nil {
c.lSeqChecked = true
}
}
lastSeq, err = c.store.Msgs.LastSequence()
}
if err == nil && lastSeq < msg.Sequence-1 {
err = s.raft.fsm.restoreMsgsFromSnapshot(c, lastSeq+1, msg.Sequence-1)
}
if err == nil {
_, err = c.store.Msgs.Store(msg)
}
if err != nil {
return fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, msg.Subject, err)
panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, msg.Subject, err))
}
}
return nil
Expand Down

0 comments on commit 1a3f407

Please sign in to comment.