Skip to content

Commit

Permalink
Merge 1b2a2d6 into 7bb3472
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed May 6, 2020
2 parents 7bb3472 + 1b2a2d6 commit bf262b0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
22 changes: 15 additions & 7 deletions nats.go
Expand Up @@ -1126,7 +1126,7 @@ func (nc *Conn) setupServerPool() error {

// Randomize if allowed to
if !nc.Opts.NoRandomize {
nc.shufflePool()
nc.shufflePool(0)
}

// Normally, if this one is set, Options.Servers should not be,
Expand Down Expand Up @@ -1223,14 +1223,16 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
}

// shufflePool swaps randomly elements in the server pool
func (nc *Conn) shufflePool() {
if len(nc.srvPool) <= 1 {
// The `offset` value indicates that the shuffling should start at
// this offset and leave the elements from [0..offset) intact.
func (nc *Conn) shufflePool(offset int) {
if len(nc.srvPool) <= offset+1 {
return
}
source := rand.NewSource(time.Now().UnixNano())
r := rand.New(source)
for i := range nc.srvPool {
j := r.Intn(i + 1)
for i := offset; i < len(nc.srvPool); i++ {
j := offset + r.Intn(i+1-offset)
nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
}
}
Expand Down Expand Up @@ -2431,8 +2433,14 @@ func (nc *Conn) processInfo(info string) error {
}
nc.addURLToPool(fmt.Sprintf("%s://%s", nc.connScheme(), curl), true, saveTLS)
}
if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
if hasNew {
// Randomize the pool if allowed but leave the first URL in place.
if !nc.Opts.NoRandomize {
nc.shufflePool(1)
}
if !nc.initc && nc.Opts.DiscoveredServersCB != nil {
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
}
}

return nil
Expand Down
36 changes: 20 additions & 16 deletions nats_test.go
Expand Up @@ -1118,28 +1118,32 @@ func TestAsyncINFO(t *testing.T) {
for _, srv := range c.srvPool {
urlsAfterPoolSetup = append(urlsAfterPoolSetup, srv.url.Host)
}
checkPoolOrderDidNotChange := func() {
checkNewURLsAddedRandomly := func() {
t.Helper()
var ok bool
for i := 0; i < len(urlsAfterPoolSetup); i++ {
if c.srvPool[i].url.Host != urlsAfterPoolSetup[i] {
stackFatalf(t, "Pool should have %q at index %q, has %q", urlsAfterPoolSetup[i], i, c.srvPool[i].url.Host)
ok = true
break
}
}
if !ok {
t.Fatalf("New URLs were not added randmonly: %q", c.Servers())
}
}
// Add new urls
newURLs := []string{
"localhost:6222",
"localhost:7222",
"localhost:8222\", \"localhost:9222",
"localhost:10222\", \"localhost:11222\", \"localhost:12222,",
}
for _, newURL := range newURLs {
info = []byte("INFO {\"connect_urls\":[\"" + newURL + "]}\r\n")
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
// Check that pool order does not change up to the new addition(s).
checkPoolOrderDidNotChange()
newURLs := "\"impA:4222\", \"impB:4222\", \"impC:4222\", " +
"\"impD:4222\", \"impE:4222\", \"impF:4222\", \"impG:4222\", " +
"\"impH:4222\", \"impI:4222\", \"impJ:4222\""
info = []byte("INFO {\"connect_urls\":[" + newURLs + "]}\r\n")
err = c.parse(info)
if err != nil || c.ps.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.ps.state, err)
}
checkNewURLsAddedRandomly()
// Check that we have not moved the first URL
if u := c.srvPool[0].url.Host; u != urlsAfterPoolSetup[0] {
t.Fatalf("Expected first URL to be %q, got %q", urlsAfterPoolSetup[0], u)
}
}

Expand Down

0 comments on commit bf262b0

Please sign in to comment.