Skip to content

Commit

Permalink
Merge 71358f4 into 8b28e84
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Mar 11, 2019
2 parents 8b28e84 + 71358f4 commit d7774c6
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 80 deletions.
171 changes: 128 additions & 43 deletions server/clustering_test.go
Expand Up @@ -82,6 +82,7 @@ func getTestDefaultOptsForClustering(id string, bootstrap bool) *Options {
opts.SQLStoreOpts.Source = testSQLSource + suffix
}
opts.Clustering.Clustered = true
opts.Clustering.NodeID = id
opts.Clustering.Bootstrap = bootstrap
opts.Clustering.RaftLogPath = filepath.Join(defaultRaftLog, id)
opts.Clustering.LogCacheSize = DefaultLogCacheSize
Expand Down Expand Up @@ -4051,49 +4052,6 @@ func TestClusteringWithCryptoStore(t *testing.T) {
check(t, "s2", fname2)
}

func TestClusteringDeleteChannelLeaksSnapshotSubs(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

maxInactivity := 10 * time.Millisecond

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.MaxInactivity = maxInactivity
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.MaxInactivity = maxInactivity
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

servers := []*StanServer{s1, s2}
// Wait for leader to be elected.
leader := getLeader(t, 10*time.Second, servers...)

sc := NewDefaultConnection(t)
defer sc.Close()

sc.Publish("foo", []byte("hello"))
c := leader.channels.get("foo")
time.Sleep(2 * maxInactivity)

leader.channels.RLock()
snapshotSubExists := c.snapshotSub != nil
leader.channels.RUnlock()
if snapshotSubExists {
t.Fatalf("Snapshot subscription for channel still exists")
}
}

func TestClusteringDeadlockOnChannelDelete(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
Expand Down Expand Up @@ -4446,3 +4404,130 @@ func TestClusteringNoPanicOnChannelDelete(t *testing.T) {
close(done)
wg.Wait()
}

func TestClusteringInstallSnapshotFailure(t *testing.T) {
if persistentStoreType != stores.TypeFile {
t.Skip("Test written for FILE stores only...")
}
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

barLimits := &stores.ChannelLimits{MaxInactivity: 50 * time.Millisecond}

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", false)
s1sOpts.AddPerChannel("bar.*", barLimits)
s1sOpts.Clustering.Peers = []string{"a", "b", "c"}
s1sOpts.FileStoreOpts.FileDescriptorsLimit = 5
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.AddPerChannel("bar.*", barLimits)
s2sOpts.Clustering.Peers = []string{"a", "b", "c"}
s2sOpts.FileStoreOpts.FileDescriptorsLimit = 5
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.AddPerChannel("bar.*", barLimits)
s3sOpts.Clustering.Peers = []string{"a", "b", "c"}
s3sOpts.FileStoreOpts.FileDescriptorsLimit = 5
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

servers := []*StanServer{s1, s2, s3}
leader := getLeader(t, 10*time.Second, s1, s2, s3)
followers := removeServer(servers, leader)

sc := NewDefaultConnection(t)
defer sc.Close()

for ns := 0; ns < 2; ns++ {
for i := 0; i < 25; i++ {
sc.Publish(fmt.Sprintf("foo.%d", ns*25+i), []byte("hello"))
}
if err := s2.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error during snapshot: %v", err)
}
}

// Start by shuting down one of the follower
follower := followers[0]
follower.Shutdown()

remaining := followers[1]

// Produce more data
for ns := 0; ns < 2; ns++ {
for i := 0; i < 25; i++ {
sc.Publish(fmt.Sprintf("bar.%d", ns*25+i), []byte("hello"))
}
if err := remaining.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error during snapshot: %v", err)
}
}
sc.Close()

time.Sleep(100 * time.Millisecond)

// Now shutdown the leader...
leader.Shutdown()

// Remove their state
removeState := func(s *StanServer) {
var nodeID string
switch s {
case s1:
nodeID = "a"
case s2:
nodeID = "b"
case s3:
nodeID = "c"
}
os.RemoveAll(filepath.Join(defaultDataStore, nodeID))
os.RemoveAll(filepath.Join(defaultRaftLog, nodeID))
}
removeState(leader)
removeState(follower)

time.Sleep(500 * time.Millisecond)

// Restart the 2 previously stopped servers.
restartSrv := func(s *StanServer) *StanServer {
var opts *Options
switch s {
case s1:
opts = s1sOpts
case s2:
opts = s2sOpts
case s3:
opts = s3sOpts
}
return runServerWithOpts(t, opts, nil)
}
s4 := restartSrv(leader)
defer s4.Shutdown()

time.Sleep(500 * time.Millisecond)

s5 := restartSrv(follower)
defer s5.Shutdown()

getLeader(t, 10*time.Second, remaining, s4, s5)

sc = NewDefaultConnection(t)
// explicitly close/shutdown to make test faster.
sc.Close()
s4.Shutdown()
s5.Shutdown()
remaining.Shutdown()
}
80 changes: 43 additions & 37 deletions server/server.go
Expand Up @@ -285,6 +285,17 @@ func (cs *channelStore) get(name string) *channel {
return c
}

func (cs *channelStore) getIfNotAboutToBeDeleted(name string) *channel {
cs.RLock()
c := cs.channels[name]
if c != nil && c.activity != nil && c.activity.deleteInProgress {
cs.RUnlock()
return nil
}
cs.RUnlock()
return c
}

func (cs *channelStore) createChannel(s *StanServer, name string) (*channel, error) {
cs.Lock()
c, err := cs.createChannelLocked(s, name)
Expand Down Expand Up @@ -313,15 +324,8 @@ func (cs *channelStore) createChannelLocked(s *StanServer, name string) (*channe
return nil, err
}
isStandaloneOrLeader := true
if s.isClustered {
if s.isLeader() {
if err := c.subToSnapshotRestoreRequests(); err != nil {
delete(cs.channels, name)
return nil, err
}
} else {
isStandaloneOrLeader = false
}
if s.isClustered && !s.isLeader() {
isStandaloneOrLeader = false
}
if isStandaloneOrLeader && c.activity != nil {
c.startDeleteTimer()
Expand Down Expand Up @@ -423,7 +427,6 @@ type channel struct {
store *stores.Channel
ss *subStore
lTimestamp int64
snapshotSub *nats.Subscription
stan *StanServer
activity *channelActivity
}
Expand Down Expand Up @@ -459,7 +462,7 @@ func (c *channel) stopDeleteTimer() {

// Resets the delete timer to the given duration.
// If the timer was not created, this call will create it.
// The channelStore's delMu mutex must be held on entry.
// The channelStore's mutex must be held on entry.
func (c *channel) resetDeleteTimer(newDuration time.Duration) {
a := c.activity
if a.timer == nil {
Expand Down Expand Up @@ -493,15 +496,22 @@ func (c *channel) pubMsgToMsgProto(pm *pb.PubMsg, seq uint64) *pb.MsgProto {
}

// Sets a subscription that will handle snapshot restore requests from followers.
func (c *channel) subToSnapshotRestoreRequests() error {
func (s *StanServer) subToSnapshotRestoreRequests() error {
var (
msgBuf []byte
buf []byte
snapshotRestoreSubj = fmt.Sprintf("%s.%s.%s", defaultSnapshotPrefix, c.stan.info.ClusterID, c.name)
msgBuf []byte
buf []byte
snapshotRestorePrefix = fmt.Sprintf("%s.%s.", defaultSnapshotPrefix, s.info.ClusterID)
prefixLen = len(snapshotRestorePrefix)
)
sub, err := c.stan.ncsr.Subscribe(snapshotRestoreSubj, func(m *nats.Msg) {
sub, err := s.ncsr.Subscribe(snapshotRestorePrefix+">", func(m *nats.Msg) {
if len(m.Data) != 16 {
c.stan.log.Errorf("Invalid snapshot request, data len=%v", len(m.Data))
s.log.Errorf("Invalid snapshot request, data len=%v", len(m.Data))
return
}
cname := m.Subject[prefixLen:]
c := s.channels.getIfNotAboutToBeDeleted(cname)
if c == nil {
s.ncsr.Publish(m.Reply, nil)
return
}
start := util.ByteOrder.Uint64(m.Data[:8])
Expand All @@ -510,7 +520,7 @@ func (c *channel) subToSnapshotRestoreRequests() error {
for seq := start; seq <= end; seq++ {
msg, err := c.store.Msgs.Lookup(seq)
if err != nil {
c.stan.log.Errorf("Snapshot restore request error for channel %q, error looking up message %v: %v", c.name, seq, err)
s.log.Errorf("Snapshot restore request error for channel %q, error looking up message %v: %v", c.name, seq, err)
return
}
if msg == nil {
Expand All @@ -525,14 +535,14 @@ func (c *channel) subToSnapshotRestoreRequests() error {
}
buf = msgBuf[:n]
}
if err := c.stan.ncsr.Publish(m.Reply, buf); err != nil {
c.stan.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err)
if err := s.ncsr.Publish(m.Reply, buf); err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err)
}
if buf == nil {
return
}
select {
case <-c.stan.shutdownCh:
case <-s.shutdownCh:
return
default:
}
Expand All @@ -541,8 +551,8 @@ func (c *channel) subToSnapshotRestoreRequests() error {
if err != nil {
return err
}
c.snapshotSub = sub
c.snapshotSub.SetPendingLimits(-1, -1)
sub.SetPendingLimits(-1, -1)
s.snapReqSub = sub
return nil
}

Expand Down Expand Up @@ -638,6 +648,7 @@ type StanServer struct {
raftLogging bool
isClustered bool
lazyRepl *lazyReplication
snapReqSub *nats.Subscription

// Our internal subscriptions
connectSub *nats.Subscription
Expand Down Expand Up @@ -1937,6 +1948,11 @@ func (s *StanServer) leadershipAcquired() error {
// Then, we will notify it back to unlock it when were are done here.
defer close(sdc)

// Start listening to snapshot restore requests here...
if err := s.subToSnapshotRestoreRequests(); err != nil {
return err
}

// Use a barrier to ensure all preceding operations are applied to the FSM
if err := s.raft.Barrier(0).Error(); err != nil {
return err
Expand Down Expand Up @@ -1975,10 +1991,6 @@ func (s *StanServer) leadershipAcquired() error {

var allSubs []*subState
for _, c := range channels {
// Subscribe to channel snapshot restore subject
if err := c.subToSnapshotRestoreRequests(); err != nil {
return err
}
subs := c.ss.getAllSubs()
if len(subs) > 0 {
allSubs = append(allSubs, subs...)
Expand Down Expand Up @@ -2030,10 +2042,6 @@ func (s *StanServer) leadershipLost() {
// Unsubscribe to the snapshot request per channel since we are no longer
// leader.
for _, c := range s.channels.getAll() {
if c.snapshotSub != nil {
c.snapshotSub.Unsubscribe()
c.snapshotSub = nil
}
if c.activity != nil {
s.channels.stopDeleteTimer(c)
}
Expand Down Expand Up @@ -2512,6 +2520,10 @@ func (s *StanServer) unsubscribeInternalSubs() {
s.cliPingSub.Unsubscribe()
s.cliPingSub = nil
}
if s.snapReqSub != nil {
s.snapReqSub.Unsubscribe()
s.snapReqSub = nil
}
}

func (s *StanServer) createSub(subj string, f nats.MsgHandler, errTxt string) (*nats.Subscription, error) {
Expand Down Expand Up @@ -2735,12 +2747,6 @@ func (s *StanServer) processDeleteChannel(channel string) {
}
return
}
// If there was a subscription for snapshots requests,
// we need to unsubscribe.
if c.snapshotSub != nil {
c.snapshotSub.Unsubscribe()
c.snapshotSub = nil
}
delete(s.channels.channels, channel)
s.log.Noticef("Channel %q has been deleted", channel)
}
Expand Down

0 comments on commit d7774c6

Please sign in to comment.