Skip to content

Commit

Permalink
[CHANGED] Server notifies clients when server rejoins cluster
Browse files Browse the repository at this point in the history
When the option Cluster.NoAdvertise is false, a server will send
an INFO protocol message to its client when a server has joined
the cluster.

Previously, the protocol would be sent only if the
joining server's "client URLs" (the addresses where clients connect
to) were new. It will now be sent regardless if the server joins
(for the first time) or rejoins the cluster.

Clients are still by default invoking the DiscoveredServersCB callback
only if they themselves detect that new URLs were added. A separate
PR may be filled to client libraries repo to be able to invoke
the callback anytime an async INFO protocol is received.

Based on @madgrenadier PR #597.
  • Loading branch information
kozlovic committed Nov 17, 2017
1 parent 7a4f7bb commit fa86ff9
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 17 deletions.
9 changes: 6 additions & 3 deletions server/route.go
Expand Up @@ -142,6 +142,8 @@ func (c *client) processRouteInfo(info *Info) {
c.Debugf("Registering remote route %q", info.ID)
// Send our local subscriptions to this route.
s.sendLocalSubsToRoute(c)
// sendInfo will be false if the route that we just accepted
// is the only route there is.
if sendInfo {
// Need to get the remote IP address.
c.mu.Lock()
Expand All @@ -156,9 +158,10 @@ func (c *client) processRouteInfo(info *Info) {
// Now let the known servers know about this new route
s.forwardNewRouteInfoToKnownServers(info)
}
// If the server Info did not have these URLs, update and send an INFO
// protocol to all clients that support it (unless the feature is disabled).
if s.updateServerINFO(info.ClientConnectURLs) {
// Unless disabled, possibly update the server's INFO protcol
// and send to clients that know how to handle async INFOs.
if !s.getOpts().Cluster.NoAdvertise {
s.updateServerINFO(info.ClientConnectURLs)
s.sendAsyncInfoToClients()
}
} else {
Expand Down
12 changes: 2 additions & 10 deletions server/server.go
Expand Up @@ -767,18 +767,11 @@ func (s *Server) createClient(conn net.Conn) *client {

// updateServerINFO updates the server's Info object with the given
// array of URLs and re-generate the infoJSON byte array, only if the
// given URLs were not already recorded and if the feature is not
// disabled.
// Returns a boolean indicating if server's Info was updated.
func (s *Server) updateServerINFO(urls []string) bool {
// given URLs were not already recorded.
func (s *Server) updateServerINFO(urls []string) {
s.mu.Lock()
defer s.mu.Unlock()

// Feature disabled, do not update.
if s.getOpts().Cluster.NoAdvertise {
return false
}

// Will be set to true if we alter the server's Info object.
wasUpdated := false
for _, url := range urls {
Expand All @@ -792,7 +785,6 @@ func (s *Server) updateServerINFO(urls []string) bool {
if wasUpdated {
s.generateServerInfoJSON()
}
return wasUpdated
}

// Handle closing down a connection when the handshake has timedout.
Expand Down
8 changes: 4 additions & 4 deletions test/routes_test.go
Expand Up @@ -717,15 +717,15 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
rc, routeSend, routeExpect = createRoute()
defer rc.Close()

// Resend the same route INFO json, since there is no new URL,
// no client should receive an INFO
// Resend the same route INFO json. The server will now send
// the INFO even when there is no change.
sendRouteINFO(routeSend, routeExpect, routeConnectURLs)

// Expect nothing for old clients
expectNothing(t, oldClient)

// Expect nothing for new clients as well (no real update)
expectNothing(t, newClient)
// Expect new client to receive an INFO (unless disabled)
checkINFOReceived(newClient, newClientExpect, routeConnectURLs)

// Now stop the route and restart with an additional URL
rc.Close()
Expand Down

0 comments on commit fa86ff9

Please sign in to comment.