Skip to content

Commit

Permalink
[IMPROVED] Server pool is updated based on server notifications
Browse files Browse the repository at this point in the history
The server pool was only growing when new servers were discovered.
Now, the client library updates its server pool based on server's
INFO protocols (true for server 1.0.6+).
The DiscoveredServersCB is still invoked only when new servers
are added (as in never seen as opposed to added back after leaving
the cluster).

The code should work ok with older servers but will take advantage
of changes in the server (nats-io/nats-server#626)
  • Loading branch information
kozlovic committed Feb 27, 2018
1 parent 45807e5 commit 82fff71
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 107 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
@@ -1,9 +1,9 @@
language: go
sudo: false
go:
- 1.10.x
- 1.9.x
- 1.8.x
- 1.7.x
install:
- go get -t ./...
- go get github.com/nats-io/gnatsd
Expand All @@ -18,4 +18,4 @@ before_script:
- megacheck -ignore "$(cat staticcheck.ignore)" ./...
script:
- go test -i -race ./...
- if [[ "$TRAVIS_GO_VERSION" == 1.7.* ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race ./...; fi
- if [[ "$TRAVIS_GO_VERSION" == 1.9.* ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race ./...; fi
63 changes: 44 additions & 19 deletions nats.go
Expand Up @@ -1849,29 +1849,54 @@ func (nc *Conn) processInfo(info string) error {
if err := json.Unmarshal([]byte(info), &nc.info); err != nil {
return err
}
// Note about pool randomization: when the pool was first created,
// it was randomized (if allowed). We keep the order the same (removing
// implicit servers that are no longer sent to us). New URLs are sent
// to us in no specific order so don't need extra randomization.
hasNew := false
// This is what we got from the server we are connected to.
urls := nc.info.ConnectURLs
if len(urls) > 0 {
added := false
// If randomization is allowed, shuffle the received array, not the
// entire pool. We want to preserve the pool's order up to this point
// (this would otherwise be problematic for the (re)connect loop).
if !nc.Opts.NoRandomize {
for i := range urls {
j := rand.Intn(i + 1)
urls[i], urls[j] = urls[j], urls[i]
}
// Transform that to a map for easy lookups
tmp := make(map[string]struct{}, len(urls))
for _, curl := range urls {
tmp[curl] = struct{}{}
}
// Walk the pool and removed the implicit servers that are no longer in the
// given array/map
sp := nc.srvPool
for i := 0; i < len(sp); i++ {
srv := sp[i]
curl := srv.url.Host
// Check if this URL is in the INFO protocol
_, inInfo := tmp[curl]
// Remove from the temp map so that at the end we are left with only
// new (or restarted) servers that need to be added to the pool.
delete(tmp, curl)
// Keep the implicit one if we are currently connected to it.
if !srv.isImplicit || srv.url == nc.url {
delete(tmp, curl)
continue
}
for _, curl := range urls {
if _, present := nc.urls[curl]; !present {
if err := nc.addURLToPool(fmt.Sprintf("nats://%s", curl), true); err != nil {
continue
}
added = true
}
if !inInfo {
// Remove from server pool. Keep current order.
copy(sp[i:], sp[i+1:])
nc.srvPool = sp[:len(sp)-1]
sp = nc.srvPool
i--
}
if added && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
nc.ach <- func() { nc.Opts.DiscoveredServersCB(nc) }
}
// If there are any left in the tmp map, these are new (or restarted) servers
// and need to be added to the pool.
for curl := range tmp {
// Before adding, check if this is a new (as in never seen) URL.
// This is used to figure out if we invoke the DiscoveredServersCB
if _, present := nc.urls[curl]; !present {
hasNew = true
}
nc.addURLToPool(fmt.Sprintf("nats://%s", curl), true)
}
if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
nc.ach <- func() { nc.Opts.DiscoveredServersCB(nc) }
}
return nil
}
Expand Down
96 changes: 10 additions & 86 deletions nats_test.go
Expand Up @@ -40,7 +40,7 @@ func stackFatalf(t *testing.T, f string, args ...interface{}) {
lines = append(lines, msg)

// Generate the Stack of callers: Skip us and verify* frames.
for i := 2; true; i++ {
for i := 1; true; i++ {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
Expand Down Expand Up @@ -935,128 +935,52 @@ func TestAsyncINFO(t *testing.T) {
}
}

checkPool := func(inThatOrder bool, urls ...string) {
checkPool := func(urls ...string) {
// Check both pool and urls map
if len(c.srvPool) != len(urls) {
stackFatalf(t, "Pool should have %d elements, has %d", len(urls), len(c.srvPool))
}
if len(c.urls) != len(urls) {
stackFatalf(t, "Map should have %d elements, has %d", len(urls), len(c.urls))
}
for i, url := range urls {
if inThatOrder {
if c.srvPool[i].url.Host != url {
stackFatalf(t, "Pool should have %q at index %q, has %q", url, i, c.srvPool[i].url.Host)
}
} else {
if _, present := c.urls[url]; !present {
stackFatalf(t, "Pool should have %q", url)
}
for _, url := range urls {
if _, present := c.urls[url]; !present {
stackFatalf(t, "Pool should have %q", url)
}
}
}

// Now test the decoding of "connect_urls"

// No randomize for now
c.Opts.NoRandomize = true
// Reset the pool
c.setupServerPool()
// Reinitialize the parser
c.ps = &parseState{}

info = []byte("INFO {\"connect_urls\":[\"localhost:5222\"]}\r\n")
info = []byte("INFO {\"connect_urls\":[\"localhost:4222\", \"localhost:5222\"]}\r\n")
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Pool now should contain localhost:4222 (the default URL) and localhost:5222
checkPool(true, "localhost:4222", "localhost:5222")
checkPool("localhost:4222", "localhost:5222")

// Make sure that if client receives the same, it is not added again.
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Pool should still contain localhost:4222 (the default URL) and localhost:5222
checkPool(true, "localhost:4222", "localhost:5222")
checkPool("localhost:4222", "localhost:5222")

// Receive a new URL
info = []byte("INFO {\"connect_urls\":[\"localhost:6222\"]}\r\n")
info = []byte("INFO {\"connect_urls\":[\"localhost:4222\", \"localhost:5222\", \"localhost:6222\"]}\r\n")
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Pool now should contain localhost:4222 (the default URL) localhost:5222 and localhost:6222
checkPool(true, "localhost:4222", "localhost:5222", "localhost:6222")

// Receive more than 1 URL at once
info = []byte("INFO {\"connect_urls\":[\"localhost:7222\", \"localhost:8222\"]}\r\n")
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Pool now should contain localhost:4222 (the default URL) localhost:5222, localhost:6222
// localhost:7222 and localhost:8222
checkPool(true, "localhost:4222", "localhost:5222", "localhost:6222", "localhost:7222", "localhost:8222")

// Test with pool randomization now. Note that with randominzation,
// the initial pool is randomize, then each array of urls that the
// client gets from the INFO protocol is randomized, but added to
// the end of the pool.
c.Opts.NoRandomize = false
c.setupServerPool()

info = []byte("INFO {\"connect_urls\":[\"localhost:5222\"]}\r\n")
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Pool now should contain localhost:4222 (the default URL) and localhost:5222
checkPool(true, "localhost:4222", "localhost:5222")

// Make sure that if client receives the same, it is not added again.
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Pool should still contain localhost:4222 (the default URL) and localhost:5222
checkPool(true, "localhost:4222", "localhost:5222")

// Receive a new URL
info = []byte("INFO {\"connect_urls\":[\"localhost:6222\"]}\r\n")
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Pool now should contain localhost:4222 (the default URL) localhost:5222 and localhost:6222
checkPool(true, "localhost:4222", "localhost:5222", "localhost:6222")

// Receive more than 1 URL at once. Add more than 2 to increase the chance of
// the array being shuffled.
info = []byte("INFO {\"connect_urls\":[\"localhost:7222\", \"localhost:8222\", " +
"\"localhost:9222\", \"localhost:10222\", \"localhost:11222\"]}\r\n")
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Pool now should contain localhost:4222 (the default URL) localhost:5222, localhost:6222
// localhost:7222, localhost:8222, localhost:9222, localhost:10222 and localhost:11222
checkPool(false, "localhost:4222", "localhost:5222", "localhost:6222", "localhost:7222", "localhost:8222",
"localhost:9222", "localhost:10222", "localhost:11222")

// Finally, check that (part of) the pool should be randomized.
allUrls := []string{"localhost:4222", "localhost:5222", "localhost:6222", "localhost:7222", "localhost:8222",
"localhost:9222", "localhost:10222", "localhost:11222"}
same := 0
for i, url := range c.srvPool {
if url.url.Host == allUrls[i] {
same++
}
}
if same == len(allUrls) {
t.Fatal("Pool does not seem to be randomized")
}
checkPool("localhost:4222", "localhost:5222", "localhost:6222")

// Check that pool may be randomized on setup, but new URLs are always
// added at end of pool.
Expand Down

0 comments on commit 82fff71

Please sign in to comment.