Skip to content

Commit

Permalink
Fixed panic on follower shutdown while replication in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Aug 30, 2017
1 parent 6ab2f38 commit 9fe9509
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 17 deletions.
72 changes: 66 additions & 6 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -61,9 +63,7 @@ func getChannelLeader(t *testing.T, channel string, timeout time.Duration, serve
if s.state == Shutdown {
continue
}
s.channels.Lock()
c := s.channels.channels[channel]
s.channels.Unlock()
c := s.channels.get(channel)
if c == nil || c.raft == nil {
continue
}
Expand Down Expand Up @@ -299,9 +299,7 @@ func TestClusteringBasic(t *testing.T) {

// Verify the server stores are consistent.
for _, server := range servers {
server.channels.Lock()
store := server.channels.channels[channel].store.Msgs
server.channels.Unlock()
store := server.channels.get(channel).store.Msgs
first, last, err := store.FirstAndLastSequence()
if err != nil {
t.Fatalf("Error getting sequence numbers: %v", err)
Expand All @@ -321,3 +319,65 @@ func TestClusteringBasic(t *testing.T) {
}
}
}

func TestClusteringNoPanicOnShutdown(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", []string{"b"})
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", []string{"a"})
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

servers := []*StanServer{s1, s2}

sc, err := stan.Connect(clusterName, clientName, stan.PubAckWait(time.Second))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc.Close()

sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {})
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}

leader := getChannelLeader(t, "foo", 5*time.Second, servers...)

// Unsubscribe since this is not about that
sub.Unsubscribe()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

for {
if err := sc.Publish("foo", []byte("msg")); err != nil {
return
}
}
}()

// Wait so that go-routine is in middle of sending messages
time.Sleep(time.Duration(rand.Intn(500)+100) * time.Millisecond)

// We shutdown the follower, it should not panic.
if s1 == leader {
s2.Shutdown()
} else {
s1.Shutdown()
}
wg.Wait()
}
27 changes: 16 additions & 11 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4129,24 +4129,19 @@ func (s *StanServer) Shutdown() {
if s.partitions != nil {
s.partitions.shutdown()
}
// Capture this under lock
channelStore := s.channels
s.mu.Unlock()

// Make sure the StoreIOLoop returns before closing the Store
if waitForIOStoreLoop {
s.ioChannelWG.Wait()
}

// Close/Shutdown resources. Note that unless one instantiates StanServer
// directly (instead of calling RunServer() and the like), these should
// not be nil.
if store != nil {
store.Close()
}
if ncs != nil {
ncs.Close()
}
if s.channels != nil {
for _, channel := range s.channels.channels {
// Close channels RAFT related resources before closing store.
if channelStore != nil {
channels := channelStore.getAll()
for _, channel := range channels {
if channel.raft != nil {
if err := channel.raft.Shutdown().Error(); err != nil {
s.log.Errorf("Failed to stop Raft node for channel %s: %v", channel.name, err)
Expand All @@ -4159,6 +4154,16 @@ func (s *StanServer) Shutdown() {
}
}
}

// Close/Shutdown resources. Note that unless one instantiates StanServer
// directly (instead of calling RunServer() and the like), these should
// not be nil.
if store != nil {
store.Close()
}
if ncs != nil {
ncs.Close()
}
if ncr != nil {
ncr.Close()
}
Expand Down

0 comments on commit 9fe9509

Please sign in to comment.