Skip to content

Commit

Permalink
add subs zombie test
Browse files Browse the repository at this point in the history
Fix subscriptions zombie at autounsubscribe on cluster
  • Loading branch information
ingosus committed Jul 20, 2017
1 parent 4ccbd2e commit d0817bb
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/client.go
Expand Up @@ -1344,7 +1344,7 @@ func (c *client) closeConnection() {
// Forward on unsubscribes if we are not
// a router ourselves.
if c.typ != ROUTER {
srv.broadcastUnSubscribe(sub)
srv.broadcastForceUnSubscribe(sub)
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions server/route.go
Expand Up @@ -610,6 +610,13 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
s.broadcastInterestToRoutes(proto)
}

func (s *Server) broadcastForceUnSubscribe(sub *subscription) {
sub.client.mu.Lock()
sub.max = 0
sub.client.mu.Unlock()
s.broadcastUnSubscribe(sub)
}

func (s *Server) routeAcceptLoop(ch chan struct{}) {
// Snapshot server options.
opts := s.getOpts()
Expand Down
100 changes: 100 additions & 0 deletions server/routes_test.go
Expand Up @@ -53,6 +53,55 @@ func TestRouteConfig(t *testing.T) {
}
}

func TestClientRemoveSubsOnDisconnectFromCluster(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/srv_a.conf")
optsB, _ := ProcessConfigFile("./configs/srv_b.conf")

optsA.NoSigs, optsA.NoLog = true, true
optsB.NoSigs, optsB.NoLog = true, true

srvA := RunServer(optsA)
defer srvA.Shutdown()

srvB := RunServer(optsB)
defer srvB.Shutdown()

cluster := []*Server{srvA, srvB}

// Wait for route to form.
checkClusterFor(t, routesCheck(len(cluster)-1), 10*time.Second, cluster...)

urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port)

// Connect client to A
nc, err := nats.Connect(urlA)
if err != nil {
t.Fatalf("error creating client: %v\n", err)
}
defer nc.Close()

// No subscriptions. Ready to test.
checkClusterFor(t, subsCheck(0), 5*time.Second, cluster...)

sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {})
if err != nil {
t.Fatalf("error subscribing: %v\n", err)
}

sub.AutoUnsubscribe(1)

// Waiting cluster subs propagation
checkClusterFor(t, subsCheck(1), 5*time.Second, cluster...)

nc.Close()

// Waiting for client exit
checkClusterFor(t, clntsCheck(0), 5*time.Second, cluster...)

// No subs should be on the cluster when all clients is disconnected
checkClusterFor(t, subsCheck(0), 5*time.Second, cluster...)
}

func TestServerRoutesWithClients(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/srv_a.conf")
optsB, _ := ProcessConfigFile("./configs/srv_b.conf")
Expand Down Expand Up @@ -145,6 +194,57 @@ func TestServerRoutesWithAuthAndBCrypt(t *testing.T) {
}
}

func subsCheck(expected uint32) func(s *Server) string {
return func(s *Server) string {
if subs := s.NumSubscriptions(); subs != expected {
return fmt.Sprintf("Expected %d subscriptions for server %q, got %d", expected, s.ID(), subs)
}
return ""
}
}

func clntsCheck(expected int) func(s *Server) string {
return func(s *Server) string {
if clnts := s.NumClients(); clnts != expected {
return fmt.Sprintf("Expected %d clients for server %q, got %d", expected, s.ID(), clnts)
}
return ""
}
}

func routesCheck(expected int) func(s *Server) string {
return func(s *Server) string {
if numRoutes := s.NumRoutes(); numRoutes != expected {
return fmt.Sprintf("Expected %d routes for server %q, got %d", expected, s.ID(), numRoutes)
}
return ""
}
}

func checkClusterFor(t *testing.T, check func(s *Server) string, limit time.Duration, servers ...*Server) {

var err string

maxTime := time.Now().Add(limit)

for time.Now().Before(maxTime) {
err = ""
for _, s := range servers {
if err = check(s); err != "" {
break
}
}
if err != "" {
time.Sleep(100 * time.Millisecond)
} else {
break
}
}
if err != "" {
stackFatalf(t, "%s", err)
}
}

// Helper function to check that a cluster is formed
func checkClusterFormed(t *testing.T, servers ...*Server) {
// Wait for the cluster to form
Expand Down

0 comments on commit d0817bb

Please sign in to comment.