Skip to content

Commit

Permalink
Merge pull request #1004 from nats-io/more_cleanup_on_shutdown
Browse files Browse the repository at this point in the history
[FIXED] Stop subscription redelivery timer and signal handler on shutdown
  • Loading branch information
kozlovic authored Dec 31, 2019
2 parents 01ae38d + 2042f85 commit 5751408
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 29 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
language: go
sudo: false
go:
- 1.11.x
- 1.12.x
- 1.13.x
addons:
apt:
packages:
Expand Down Expand Up @@ -30,14 +30,14 @@ script:
- set -e
- mysql -u root -e "CREATE USER 'nss'@'localhost' IDENTIFIED BY 'password'; GRANT ALL PRIVILEGES ON *.* TO 'nss'@'localhost'; CREATE DATABASE test_nats_streaming;"
- go test -i ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.12 ]]; then ./scripts/cov.sh TRAVIS; else go test -failfast ./...; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.13 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -failfast -p=1 ./...; fi
- set +e

deploy:
provider: script
skip_cleanup: true
skip_cleanup: false
script: curl -sL http://git.io/goreleaser | bash
verbose: true
on:
tags: true
condition: $TRAVIS_GO_VERSION =~ 1.11
condition: $TRAVIS_GO_VERSION =~ 1.12
16 changes: 10 additions & 6 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4599,17 +4599,17 @@ func TestClusteringInstallSnapshotFailure(t *testing.T) {
sc := NewDefaultConnection(t)
defer sc.Close()

follower := followers[0]
for ns := 0; ns < 2; ns++ {
for i := 0; i < 25; i++ {
sc.Publish(fmt.Sprintf("foo.%d", ns*25+i), []byte("hello"))
}
if err := s2.raft.Snapshot().Error(); err != nil {
if err := follower.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error during snapshot: %v", err)
}
}

// Start by shuting down one of the follower
follower := followers[0]
follower.Shutdown()

remaining := followers[1]
Expand Down Expand Up @@ -4670,14 +4670,17 @@ func TestClusteringInstallSnapshotFailure(t *testing.T) {
s5 := restartSrv(follower)
defer s5.Shutdown()

getLeader(t, 10*time.Second, remaining, s4, s5)
newLeader := getLeader(t, 10*time.Second, remaining, s4, s5)

sc = NewDefaultConnection(t)
// explicitly close/shutdown to make test faster.
sc.Close()
s4.Shutdown()
s5.Shutdown()
remaining.Shutdown()
newLeader.Shutdown()
servers = []*StanServer{remaining, s4, s5}
servers = removeServer(servers, newLeader)
for _, s := range servers {
s.Shutdown()
}
}

func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) {
Expand Down Expand Up @@ -7221,6 +7224,7 @@ func TestClusteringRedeliveryCount(t *testing.T) {
atomic.StoreUint32(&rdlv, 0)
atomic.StoreInt32(&restarted, 1)
s1 = runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()
getLeader(t, 10*time.Second, s1, s2, s3)

select {
Expand Down
2 changes: 1 addition & 1 deletion server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ func TestMonitorClusterRole(t *testing.T) {
t.Fatalf("Got an error unmarshalling the body: %v", err)
}
if sz.Role != test.expectedRole {
t.Fatalf("Expected role to be %v, gt %v", test.expectedRole, sz.Role)
t.Fatalf("Expected role to be %v, got %v", test.expectedRole, sz.Role)
}
})
}
Expand Down
24 changes: 24 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3499,6 +3499,16 @@ func (s *StanServer) performDurableRedelivery(c *channel, sub *subState) {

// Redeliver all outstanding messages that have expired.
func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup bool) {
// If server has been shutdown, clear timer and be done (will happen only in tests)
s.mu.RLock()
shutdown := s.shutdown
s.mu.RUnlock()
if shutdown {
sub.Lock()
sub.clearAckTimer()
sub.Unlock()
return
}
// Sort our messages outstanding from acksPending, grab some state and unlock.
sub.Lock()
sortedPendingMsgs := sub.makeSortedPendingMsgs()
Expand Down Expand Up @@ -5520,8 +5530,22 @@ func (s *StanServer) Shutdown() {
if s.partitions != nil {
s.partitions.shutdown()
}
var channels map[string]*channel
if s.channels != nil {
channels = s.channels.getAll()
}
s.mu.Unlock()

// Stop all redelivery timers.
for _, c := range channels {
subs := c.ss.getAllSubs()
for _, sub := range subs {
sub.Lock()
sub.clearAckTimer()
sub.Unlock()
}
}

// Make sure the StoreIOLoop returns before closing the Store
if waitForIOStoreLoop {
s.ioChannelWG.Wait()
Expand Down
5 changes: 4 additions & 1 deletion server/server_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,10 @@ type queueGroupStalledMsgStore struct {
}

func (s *queueGroupStalledMsgStore) Lookup(seq uint64) (*pb.MsgProto, error) {
s.lookupCh <- struct{}{}
select {
case s.lookupCh <- struct{}{}:
default:
}
return s.MsgStore.Lookup(seq)
}

Expand Down
1 change: 1 addition & 0 deletions server/server_redelivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ func TestPersistentStoreRedeliveryCbPerSub(t *testing.T) {
if len(sub.acksPending) == 1 {
good++
}
sub.Unlock()
}
return "pending message per sub", good
})
Expand Down
2 changes: 2 additions & 0 deletions server/server_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ func TestGhostDurableSubs(t *testing.T) {
waitForNumClients(t, s, 0)

// Change store to simulate no flush on simulated crash
orgStore := s.store
s.store = &storeNoClose{Store: s.store}
s.Shutdown()

Expand All @@ -1027,6 +1028,7 @@ func TestGhostDurableSubs(t *testing.T) {
check(false)

sc.Close()
orgStore.Close()
}

func TestGetNATSOptions(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions server/server_storefailures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ func TestDeleteSubFailures(t *testing.T) {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
if err := Wait(ch); err != nil {
t.Fatal("Did not get our message")
}
// Create 2 more durable queue subs
dqsub2, err := sc.QueueSubscribe("foo", "dqueue", func(_ *stan.Msg) {},
stan.DurableName("dur"))
Expand Down
4 changes: 3 additions & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,10 +1124,12 @@ func TestDontSendEmptyMsgProto(t *testing.T) {
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sc, err := stan.Connect(clusterName, clientName, stan.NatsConn(nc))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc.Close()
// Since server is expected to crash, do not attempt to close sc
// because it would delay test by 2 seconds.

Expand All @@ -1148,8 +1150,8 @@ func TestDontSendEmptyMsgProto(t *testing.T) {

m := &pb.MsgProto{}
sub.Lock()
defer sub.Unlock()
s.sendMsgToSub(sub, m, false)
sub.Unlock()
}

func TestMsgsNotSentToSubBeforeSubReqResponse(t *testing.T) {
Expand Down
37 changes: 21 additions & 16 deletions server/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,27 @@ func (s *StanServer) handleSignals() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGHUP)
go func() {
for sig := range c {
// Notify will relay only the signals that we have
// registered, so we don't need a "default" in the
// switch statement.
switch sig {
case syscall.SIGINT:
s.Shutdown()
os.Exit(0)
case syscall.SIGTERM:
s.Shutdown()
os.Exit(143)
case syscall.SIGUSR1:
// File log re-open for rotating file logs.
s.log.ReopenLogFile()
case syscall.SIGHUP:
// Ignore for now
for {
select {
case sig := <-c:
// Notify will relay only the signals that we have
// registered, so we don't need a "default" in the
// switch statement.
switch sig {
case syscall.SIGINT:
s.Shutdown()
os.Exit(0)
case syscall.SIGTERM:
s.Shutdown()
os.Exit(143)
case syscall.SIGUSR1:
// File log re-open for rotating file logs.
s.log.ReopenLogFile()
case syscall.SIGHUP:
// Ignore for now
}
case <-s.shutdownCh:
return
}
}
}()
Expand Down

0 comments on commit 5751408

Please sign in to comment.