Skip to content

Commit

Permalink
Skip processing of INFO.ConnectURLs when the array is empty.
Browse files Browse the repository at this point in the history
Continuation of #344. We don't want to remove discovered servers
from the pool if we get an INFO with empty array. The new servers
will send arrays with at least their own URL, but older servers
or in some situations, the array could be empty (omitted) and we
should not treat this as if there were no server at all in the
cluster.
  • Loading branch information
kozlovic committed Mar 16, 2018
1 parent f90dee2 commit e270c49
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 1 deletion.
12 changes: 11 additions & 1 deletion nats.go
Expand Up @@ -1874,9 +1874,19 @@ func (nc *Conn) processInfo(info string) error {
if info == _EMPTY_ {
return nil
}
if err := json.Unmarshal([]byte(info), &nc.info); err != nil {
ncInfo := serverInfo{}
if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
return err
}
// Copy content into connection's info structure.
nc.info = ncInfo
// The array could be empty/not present on initial connect,
// if advertise is disabled on that server, or servers that
// did not include themselves in the async INFO protocol.
// If empty, do not remove the implicit servers from the pool.
if len(ncInfo.ConnectURLs) == 0 {
return nil
}
// Note about pool randomization: when the pool was first created,
// it was randomized (if allowed). We keep the order the same (removing
// implicit servers that are no longer sent to us). New URLs are sent
Expand Down
113 changes: 113 additions & 0 deletions test/conn_test.go
Expand Up @@ -1835,3 +1835,116 @@ func TestReceiveInfoRightAfterFirstPong(t *testing.T) {
t.Fatalf("Unexpected discovered servers: %v", ds)
}
}

func TestReceiveInfoWithEmptyConnectURLs(t *testing.T) {
ready := make(chan bool, 2)
ch := make(chan bool, 1)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

ports := []int{4222, 4223}
for i := 0; i < 2; i++ {
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", ports[i]))
if err != nil {
t.Fatalf("Error on listen: %v", err)
}
tl := l.(*net.TCPListener)
defer tl.Close()

ready <- true

c, err := tl.Accept()
if err != nil {
return
}
defer c.Close()

// Send the initial INFO
c.Write([]byte(fmt.Sprintf("INFO {\"server_id\":\"server%d\"}\r\n", (i + 1))))
buf := make([]byte, 0, 100)
b := make([]byte, 100)
for {
n, err := c.Read(b)
if err != nil {
return
}
buf = append(buf, b[:n]...)
if bytes.Contains(buf, []byte("PING\r\n")) {
break
}
}
if i == 0 {
// Send PONG and following INFO in one go (or at least try).
// The processing of PONG in sendConnect() should leave the
// rest for the readLoop to process.
c.Write([]byte("PONG\r\nINFO {\"server_id\":\"server1\",\"connect_urls\":[\"127.0.0.1:4222\", \"127.0.0.1:4223\", \"127.0.0.1:4224\"]}\r\n"))
// Wait for the notication
<-ch
// Close the connection in our side and go back into accept
c.Close()
} else {
// Send no connect ULRs (as if this was an older server that could in some cases
// send an empty array)
c.Write([]byte(fmt.Sprintf("PONG\r\nINFO {\"server_id\":\"server2\"}\r\n")))
// Wait for client to disconnect
for {
if _, err := c.Read(buf); err != nil {
return
}
}
}
}
}()

// Wait for listener to be up and running
if err := Wait(ready); err != nil {
t.Fatal("Listener not ready")
}

rch := make(chan bool)
nc, err := nats.Connect("nats://127.0.0.1:4222",
nats.ReconnectWait(50*time.Millisecond),
nats.ReconnectHandler(func(_ *nats.Conn) {
rch <- true
}))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
var (
ds []string
timeout = time.Now().Add(2 * time.Second)
ok = false
)
for time.Now().Before(timeout) {
ds = nc.DiscoveredServers()
if len(ds) == 2 {
if (ds[0] == "nats://127.0.0.1:4223" && ds[1] == "nats://127.0.0.1:4224") ||
(ds[0] == "nats://127.0.0.1:4224" && ds[1] == "nats://127.0.0.1:4223") {
ok = true
break
}
}
time.Sleep(50 * time.Millisecond)
}
if !ok {
t.Fatalf("Unexpected discovered servers: %v", ds)
}
// Make the server close our connection
ch <- true
// Wait for the reconnect
if err := Wait(rch); err != nil {
t.Fatal("Did not reconnect")
}
// Discovered servers should still contain nats://me:1
ds = nc.DiscoveredServers()
if len(ds) != 2 ||
!((ds[0] == "nats://127.0.0.1:4223" && ds[1] == "nats://127.0.0.1:4224") ||
(ds[0] == "nats://127.0.0.1:4224" && ds[1] == "nats://127.0.0.1:4223")) {
t.Fatalf("Unexpected discovered servers list: %v", ds)
}
nc.Close()
wg.Wait()
}

0 comments on commit e270c49

Please sign in to comment.