Skip to content

Commit

Permalink
[FIXED] Cluster toplogy change possibly not sent to clients
Browse files Browse the repository at this point in the history
When a server accepts a route, it will keep track of that server
`connectURLs` array. However, if the server was creating a route
to that other server at the same time, it will promote the route
as a solicited one. The content of that array was not transfered,
which means that on a disconnect, it was possible that the cluster
topology change was not properly sent to clients.
  • Loading branch information
kozlovic committed Mar 4, 2018
1 parent 02dd205 commit aeca31c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
3 changes: 3 additions & 0 deletions server/route.go
Expand Up @@ -564,6 +564,9 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
remote.mu.Lock()
// r will be not nil if c.route.didSolicit was true
if r != nil {
// If we upgrade to solicited, we still want to keep the remote's
// connectURLs. So transfer those.
r.connectURLs = remote.route.connectURLs
remote.route = r
}
// This is to mitigate the issue where both sides add the route
Expand Down
55 changes: 55 additions & 0 deletions server/routes_test.go
Expand Up @@ -728,3 +728,58 @@ func TestRoutesToEachOther(t *testing.T) {
t.Log("Was not able to get duplicate route this time!")
}
}

func TestConnectULRsWithRoutesToEachOther(t *testing.T) {
optsA := DefaultOptions()
optsA.Host = "127.0.0.1"
optsA.Cluster.Port = 7246
optsA.Routes = RoutesFromStr("nats://127.0.0.1:7247")

optsB := DefaultOptions()
optsB.Host = "127.0.0.1"
optsB.Cluster.Port = 7247
optsB.Routes = RoutesFromStr("nats://127.0.0.1:7246")

// Start servers with go routines to increase change of
// each server connecting to each other at the same time.
srvA := New(optsA)
defer srvA.Shutdown()

srvB := New(optsB)
defer srvB.Shutdown()

go srvA.Start()
go srvB.Start()

// Wait for cluster to be formed
checkClusterFormed(t, srvA, srvB)

// Connect to serverB
url := fmt.Sprintf("nats://%s", srvB.Addr().String())
nc, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
ds := nc.Servers()
if len(ds) != 2 {
t.Fatalf("Expected 2 servers, got %v", ds)
}

// Shutdown server A and make sure that we are notfied
// that server A is no longer running.
srvA.Shutdown()
timeout := time.Now().Add(5 * time.Second)
ok := false
for time.Now().Before(timeout) {
ds = nc.Servers()
if len(ds) == 1 {
ok = true
break
}
time.Sleep(50 * time.Millisecond)
}
if !ok {
t.Fatalf("List of servers should be only 1, got %v", ds)
}
}
7 changes: 4 additions & 3 deletions server/server.go
Expand Up @@ -255,10 +255,11 @@ func (s *Server) logPid() error {
func (s *Server) Start() {
s.Noticef("Starting nats-server version %s", VERSION)
s.Debugf("Go build version %s", s.info.GoVersion)
if gitCommit == "" {
gitCommit = "not set"
gc := gitCommit
if gc == "" {
gc = "not set"
}
s.Noticef("Git commit [%s]", gitCommit)
s.Noticef("Git commit [%s]", gc)

// Avoid RACE between Start() and Shutdown()
s.mu.Lock()
Expand Down

0 comments on commit aeca31c

Please sign in to comment.