Skip to content

Commit

Permalink
[FIXED] Clustering: unable to complete leadrship actions
Browse files Browse the repository at this point in the history
In some cases, it was possible for a follower to become leader
but be unable to complete all the actions to fully become leader.
The underlying issue was that if there were snapshots to send
to the followers, those were unable to restore the snapshot
because the new leader was not listening to those requests yet.
Also, there could be a risk that a channel is part of the
snapshot sent to followers but then would be deleted while
the followers would try to install the snapshot.

Resolves #762

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Mar 11, 2019
1 parent 8b28e84 commit 71358f4
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 80 deletions.
171 changes: 128 additions & 43 deletions server/clustering_test.go
Expand Up @@ -82,6 +82,7 @@ func getTestDefaultOptsForClustering(id string, bootstrap bool) *Options {
opts.SQLStoreOpts.Source = testSQLSource + suffix
}
opts.Clustering.Clustered = true
opts.Clustering.NodeID = id
opts.Clustering.Bootstrap = bootstrap
opts.Clustering.RaftLogPath = filepath.Join(defaultRaftLog, id)
opts.Clustering.LogCacheSize = DefaultLogCacheSize
Expand Down Expand Up @@ -4051,49 +4052,6 @@ func TestClusteringWithCryptoStore(t *testing.T) {
check(t, "s2", fname2)
}

func TestClusteringDeleteChannelLeaksSnapshotSubs(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

maxInactivity := 10 * time.Millisecond

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.MaxInactivity = maxInactivity
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.MaxInactivity = maxInactivity
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

servers := []*StanServer{s1, s2}
// Wait for leader to be elected.
leader := getLeader(t, 10*time.Second, servers...)

sc := NewDefaultConnection(t)
defer sc.Close()

sc.Publish("foo", []byte("hello"))
c := leader.channels.get("foo")
time.Sleep(2 * maxInactivity)

leader.channels.RLock()
snapshotSubExists := c.snapshotSub != nil
leader.channels.RUnlock()
if snapshotSubExists {
t.Fatalf("Snapshot subscription for channel still exists")
}
}

func TestClusteringDeadlockOnChannelDelete(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
Expand Down Expand Up @@ -4446,3 +4404,130 @@ func TestClusteringNoPanicOnChannelDelete(t *testing.T) {
close(done)
wg.Wait()
}

func TestClusteringInstallSnapshotFailure(t *testing.T) {
if persistentStoreType != stores.TypeFile {
t.Skip("Test written for FILE stores only...")
}
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

barLimits := &stores.ChannelLimits{MaxInactivity: 50 * time.Millisecond}

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", false)
s1sOpts.AddPerChannel("bar.*", barLimits)
s1sOpts.Clustering.Peers = []string{"a", "b", "c"}
s1sOpts.FileStoreOpts.FileDescriptorsLimit = 5
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.AddPerChannel("bar.*", barLimits)
s2sOpts.Clustering.Peers = []string{"a", "b", "c"}
s2sOpts.FileStoreOpts.FileDescriptorsLimit = 5
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.AddPerChannel("bar.*", barLimits)
s3sOpts.Clustering.Peers = []string{"a", "b", "c"}
s3sOpts.FileStoreOpts.FileDescriptorsLimit = 5
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

servers := []*StanServer{s1, s2, s3}
leader := getLeader(t, 10*time.Second, s1, s2, s3)
followers := removeServer(servers, leader)

sc := NewDefaultConnection(t)
defer sc.Close()

for ns := 0; ns < 2; ns++ {
for i := 0; i < 25; i++ {
sc.Publish(fmt.Sprintf("foo.%d", ns*25+i), []byte("hello"))
}
if err := s2.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error during snapshot: %v", err)
}
}

// Start by shuting down one of the follower
follower := followers[0]
follower.Shutdown()

remaining := followers[1]

// Produce more data
for ns := 0; ns < 2; ns++ {
for i := 0; i < 25; i++ {
sc.Publish(fmt.Sprintf("bar.%d", ns*25+i), []byte("hello"))
}
if err := remaining.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error during snapshot: %v", err)
}
}
sc.Close()

time.Sleep(100 * time.Millisecond)

// Now shutdown the leader...
leader.Shutdown()

// Remove their state
removeState := func(s *StanServer) {
var nodeID string
switch s {
case s1:
nodeID = "a"
case s2:
nodeID = "b"
case s3:
nodeID = "c"
}
os.RemoveAll(filepath.Join(defaultDataStore, nodeID))
os.RemoveAll(filepath.Join(defaultRaftLog, nodeID))
}
removeState(leader)
removeState(follower)

time.Sleep(500 * time.Millisecond)

// Restart the 2 previously stopped servers.
restartSrv := func(s *StanServer) *StanServer {
var opts *Options
switch s {
case s1:
opts = s1sOpts
case s2:
opts = s2sOpts
case s3:
opts = s3sOpts
}
return runServerWithOpts(t, opts, nil)
}
s4 := restartSrv(leader)
defer s4.Shutdown()

time.Sleep(500 * time.Millisecond)

s5 := restartSrv(follower)
defer s5.Shutdown()

getLeader(t, 10*time.Second, remaining, s4, s5)

sc = NewDefaultConnection(t)
// explicitly close/shutdown to make test faster.
sc.Close()
s4.Shutdown()
s5.Shutdown()
remaining.Shutdown()
}
80 changes: 43 additions & 37 deletions server/server.go
Expand Up @@ -285,6 +285,17 @@ func (cs *channelStore) get(name string) *channel {
return c
}

func (cs *channelStore) getIfNotAboutToBeDeleted(name string) *channel {
cs.RLock()
c := cs.channels[name]
if c != nil && c.activity != nil && c.activity.deleteInProgress {
cs.RUnlock()
return nil
}
cs.RUnlock()
return c
}

func (cs *channelStore) createChannel(s *StanServer, name string) (*channel, error) {
cs.Lock()
c, err := cs.createChannelLocked(s, name)
Expand Down Expand Up @@ -313,15 +324,8 @@ func (cs *channelStore) createChannelLocked(s *StanServer, name string) (*channe
return nil, err
}
isStandaloneOrLeader := true
if s.isClustered {
if s.isLeader() {
if err := c.subToSnapshotRestoreRequests(); err != nil {
delete(cs.channels, name)
return nil, err
}
} else {
isStandaloneOrLeader = false
}
if s.isClustered && !s.isLeader() {
isStandaloneOrLeader = false
}
if isStandaloneOrLeader && c.activity != nil {
c.startDeleteTimer()
Expand Down Expand Up @@ -423,7 +427,6 @@ type channel struct {
store *stores.Channel
ss *subStore
lTimestamp int64
snapshotSub *nats.Subscription
stan *StanServer
activity *channelActivity
}
Expand Down Expand Up @@ -459,7 +462,7 @@ func (c *channel) stopDeleteTimer() {

// Resets the delete timer to the given duration.
// If the timer was not created, this call will create it.
// The channelStore's delMu mutex must be held on entry.
// The channelStore's mutex must be held on entry.
func (c *channel) resetDeleteTimer(newDuration time.Duration) {
a := c.activity
if a.timer == nil {
Expand Down Expand Up @@ -493,15 +496,22 @@ func (c *channel) pubMsgToMsgProto(pm *pb.PubMsg, seq uint64) *pb.MsgProto {
}

// Sets a subscription that will handle snapshot restore requests from followers.
func (c *channel) subToSnapshotRestoreRequests() error {
func (s *StanServer) subToSnapshotRestoreRequests() error {
var (
msgBuf []byte
buf []byte
snapshotRestoreSubj = fmt.Sprintf("%s.%s.%s", defaultSnapshotPrefix, c.stan.info.ClusterID, c.name)
msgBuf []byte
buf []byte
snapshotRestorePrefix = fmt.Sprintf("%s.%s.", defaultSnapshotPrefix, s.info.ClusterID)
prefixLen = len(snapshotRestorePrefix)
)
sub, err := c.stan.ncsr.Subscribe(snapshotRestoreSubj, func(m *nats.Msg) {
sub, err := s.ncsr.Subscribe(snapshotRestorePrefix+">", func(m *nats.Msg) {
if len(m.Data) != 16 {
c.stan.log.Errorf("Invalid snapshot request, data len=%v", len(m.Data))
s.log.Errorf("Invalid snapshot request, data len=%v", len(m.Data))
return
}
cname := m.Subject[prefixLen:]
c := s.channels.getIfNotAboutToBeDeleted(cname)
if c == nil {
s.ncsr.Publish(m.Reply, nil)
return
}
start := util.ByteOrder.Uint64(m.Data[:8])
Expand All @@ -510,7 +520,7 @@ func (c *channel) subToSnapshotRestoreRequests() error {
for seq := start; seq <= end; seq++ {
msg, err := c.store.Msgs.Lookup(seq)
if err != nil {
c.stan.log.Errorf("Snapshot restore request error for channel %q, error looking up message %v: %v", c.name, seq, err)
s.log.Errorf("Snapshot restore request error for channel %q, error looking up message %v: %v", c.name, seq, err)
return
}
if msg == nil {
Expand All @@ -525,14 +535,14 @@ func (c *channel) subToSnapshotRestoreRequests() error {
}
buf = msgBuf[:n]
}
if err := c.stan.ncsr.Publish(m.Reply, buf); err != nil {
c.stan.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err)
if err := s.ncsr.Publish(m.Reply, buf); err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err)
}
if buf == nil {
return
}
select {
case <-c.stan.shutdownCh:
case <-s.shutdownCh:
return
default:
}
Expand All @@ -541,8 +551,8 @@ func (c *channel) subToSnapshotRestoreRequests() error {
if err != nil {
return err
}
c.snapshotSub = sub
c.snapshotSub.SetPendingLimits(-1, -1)
sub.SetPendingLimits(-1, -1)
s.snapReqSub = sub
return nil
}

Expand Down Expand Up @@ -638,6 +648,7 @@ type StanServer struct {
raftLogging bool
isClustered bool
lazyRepl *lazyReplication
snapReqSub *nats.Subscription

// Our internal subscriptions
connectSub *nats.Subscription
Expand Down Expand Up @@ -1937,6 +1948,11 @@ func (s *StanServer) leadershipAcquired() error {
// Then, we will notify it back to unlock it when were are done here.
defer close(sdc)

// Start listening to snapshot restore requests here...
if err := s.subToSnapshotRestoreRequests(); err != nil {
return err
}

// Use a barrier to ensure all preceding operations are applied to the FSM
if err := s.raft.Barrier(0).Error(); err != nil {
return err
Expand Down Expand Up @@ -1975,10 +1991,6 @@ func (s *StanServer) leadershipAcquired() error {

var allSubs []*subState
for _, c := range channels {
// Subscribe to channel snapshot restore subject
if err := c.subToSnapshotRestoreRequests(); err != nil {
return err
}
subs := c.ss.getAllSubs()
if len(subs) > 0 {
allSubs = append(allSubs, subs...)
Expand Down Expand Up @@ -2030,10 +2042,6 @@ func (s *StanServer) leadershipLost() {
// Unsubscribe to the snapshot request per channel since we are no longer
// leader.
for _, c := range s.channels.getAll() {
if c.snapshotSub != nil {
c.snapshotSub.Unsubscribe()
c.snapshotSub = nil
}
if c.activity != nil {
s.channels.stopDeleteTimer(c)
}
Expand Down Expand Up @@ -2512,6 +2520,10 @@ func (s *StanServer) unsubscribeInternalSubs() {
s.cliPingSub.Unsubscribe()
s.cliPingSub = nil
}
if s.snapReqSub != nil {
s.snapReqSub.Unsubscribe()
s.snapReqSub = nil
}
}

func (s *StanServer) createSub(subj string, f nats.MsgHandler, errTxt string) (*nats.Subscription, error) {
Expand Down Expand Up @@ -2735,12 +2747,6 @@ func (s *StanServer) processDeleteChannel(channel string) {
}
return
}
// If there was a subscription for snapshots requests,
// we need to unsubscribe.
if c.snapshotSub != nil {
c.snapshotSub.Unsubscribe()
c.snapshotSub = nil
}
delete(s.channels.channels, channel)
s.log.Noticef("Channel %q has been deleted", channel)
}
Expand Down

0 comments on commit 71358f4

Please sign in to comment.