Skip to content

Commit

Permalink
Merge pull request #1517 from nats-io/fix_1515
Browse files Browse the repository at this point in the history
[FIXED] Handling of gossiped URLs
  • Loading branch information
kozlovic committed Jul 16, 2020
2 parents 8312f3f + b9764db commit 4a4a36f
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 65 deletions.
31 changes: 9 additions & 22 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type srvGateway struct {
outo []*client // outbound gateways maintained in an order suitable for sending msgs (currently based on RTT)
in map[uint64]*client // inbound gateways
remotes map[string]*gatewayCfg // Config of remote gateways
URLs map[string]struct{} // Set of all Gateway URLs in the cluster
URLs refCountedUrlSet // Set of all Gateway URLs in the cluster
URL string // This server gateway URL (after possible random port is resolved)
info *Info // Gateway Info protocol
infoJSON []byte // Marshal'ed Info protocol
Expand Down Expand Up @@ -298,7 +298,7 @@ func (s *Server) newGateway(opts *Options) error {
outo: make([]*client, 0, 4),
in: make(map[uint64]*client),
remotes: make(map[string]*gatewayCfg),
URLs: make(map[string]struct{}),
URLs: make(refCountedUrlSet),
resolver: opts.Gateway.resolver,
runknown: opts.Gateway.RejectUnknown,
oldHash: getOldHash(opts.Gateway.Name),
Expand Down Expand Up @@ -373,17 +373,6 @@ func (g *srvGateway) getName() string {
return n
}

// Returns the Gateway URLs of all servers in the local cluster.
// This is used to send to other cluster this server connects to.
// The gateway read-lock is held on entry
func (g *srvGateway) getURLs() []string {
a := make([]string, 0, len(g.URLs))
for u := range g.URLs {
a = append(a, u)
}
return a
}

// Returns if this server rejects connections from gateways that are not
// explicitly configured.
func (g *srvGateway) rejectUnknown() bool {
Expand Down Expand Up @@ -499,7 +488,7 @@ func (s *Server) setGatewayInfoHostPort(info *Info, o *Options) error {
gw := s.gateway
gw.Lock()
defer gw.Unlock()
delete(gw.URLs, gw.URL)
gw.URLs.removeUrl(gw.URL)
if o.Gateway.Advertise != "" {
advHost, advPort, err := parseHostPort(o.Gateway.Advertise, o.Gateway.Port)
if err != nil {
Expand Down Expand Up @@ -539,7 +528,7 @@ func (s *Server) setGatewayInfoHostPort(info *Info, o *Options) error {
} else {
s.Noticef("Address for gateway %q is %s", gw.name, gw.URL)
}
gw.URLs[gw.URL] = struct{}{}
gw.URLs[gw.URL]++
gw.info = info
info.GatewayURL = gw.URL
// (re)generate the gatewayInfoJSON byte array
Expand All @@ -556,7 +545,7 @@ func (g *srvGateway) generateInfoJSON() {
if !g.enabled {
return
}
g.info.GatewayURLs = g.getURLs()
g.info.GatewayURLs = g.URLs.getAsStringSlice()
b, err := json.Marshal(g.info)
if err != nil {
panic(err)
Expand Down Expand Up @@ -1466,13 +1455,12 @@ func (g *gatewayCfg) addURLs(infoURLs []string) {
// Server lock held on entry
func (s *Server) addGatewayURL(urlStr string) bool {
s.gateway.Lock()
_, present := s.gateway.URLs[urlStr]
if !present {
s.gateway.URLs[urlStr] = struct{}{}
added := s.gateway.URLs.addUrl(urlStr)
if added {
s.gateway.generateInfoJSON()
}
s.gateway.Unlock()
return !present
return added
}

// Removes this URL from the set of gateway URLs.
Expand All @@ -1483,9 +1471,8 @@ func (s *Server) removeGatewayURL(urlStr string) bool {
return false
}
s.gateway.Lock()
_, removed := s.gateway.URLs[urlStr]
removed := s.gateway.URLs.removeUrl(urlStr)
if removed {
delete(s.gateway.URLs, urlStr)
s.gateway.generateInfoJSON()
}
s.gateway.Unlock()
Expand Down
38 changes: 11 additions & 27 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,8 @@ func (s *Server) startLeafNodeAcceptLoop() {
s.mu.Unlock()
return
}
// Add our LeafNode URL to the list that we send to servers connecting
// to our LeafNode accept URL. This call also regenerates leafNodeInfoJSON.
s.addLeafNodeURL(s.leafNodeInfo.IP)
s.leafURLsMap[s.leafNodeInfo.IP]++
s.generateLeafNodeInfoJSON()

// Setup state that can enable shutdown
s.leafNodeListener = l
Expand Down Expand Up @@ -506,15 +505,11 @@ func (s *Server) copyLeafNodeInfo() *Info {
// Returns a boolean indicating if the URL was added or not.
// Server lock is held on entry
func (s *Server) addLeafNodeURL(urlStr string) bool {
// Make sure we already don't have it.
for _, url := range s.leafNodeInfo.LeafNodeURLs {
if url == urlStr {
return false
}
if s.leafURLsMap.addUrl(urlStr) {
s.generateLeafNodeInfoJSON()
return true
}
s.leafNodeInfo.LeafNodeURLs = append(s.leafNodeInfo.LeafNodeURLs, urlStr)
s.generateLeafNodeInfoJSON()
return true
return false
}

// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
Expand All @@ -527,27 +522,16 @@ func (s *Server) removeLeafNodeURL(urlStr string) bool {
if s.shutdown {
return false
}
removed := false
urls := s.leafNodeInfo.LeafNodeURLs
for i, url := range urls {
if url == urlStr {
// If not last, move last into the position we remove.
last := len(urls) - 1
if i != last {
urls[i] = urls[last]
}
s.leafNodeInfo.LeafNodeURLs = urls[0:last]
removed = true
break
}
}
if removed {
if s.leafURLsMap.removeUrl(urlStr) {
s.generateLeafNodeInfoJSON()
return true
}
return removed
return false
}

// Server lock is held on entry
func (s *Server) generateLeafNodeInfoJSON() {
s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
b, _ := json.Marshal(s.leafNodeInfo)
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
s.leafNodeInfoJSON = bytes.Join(pcs, []byte(" "))
Expand Down
1 change: 1 addition & 0 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
// If we upgrade to solicited, we still want to keep the remote's
// connectURLs. So transfer those.
r.connectURLs = remote.route.connectURLs
r.wsConnURLs = remote.route.wsConnURLs
remote.route = r
}
// This is to mitigate the issue where both sides add the route
Expand Down
27 changes: 12 additions & 15 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ type Server struct {
leafNodeListener net.Listener
leafNodeInfo Info
leafNodeInfoJSON []byte
leafURLsMap refCountedUrlSet
leafNodeOpts struct {
resolver netResolver
dialTimeout time.Duration
Expand Down Expand Up @@ -170,7 +171,7 @@ type Server struct {
clientConnectURLs []string

// Used internally for quick look-ups.
clientConnectURLsMap map[string]struct{}
clientConnectURLsMap refCountedUrlSet

lastCURLsUpdate int64

Expand Down Expand Up @@ -309,14 +310,13 @@ func NewServer(opts *Options) (*Server, error) {
defer s.mu.Unlock()

// Used internally for quick look-ups.
s.websocket.connectURLsMap = make(map[string]struct{})
s.clientConnectURLsMap = make(refCountedUrlSet)
s.websocket.connectURLsMap = make(refCountedUrlSet)
s.leafURLsMap = make(refCountedUrlSet)

// Ensure that non-exported options (used in tests) are properly set.
s.setLeafNodeNonExportedOptions()

// Used internally for quick look-ups.
s.clientConnectURLsMap = make(map[string]struct{})

// Call this even if there is no gateway defined. It will
// initialize the structure so we don't have to check for
// it to be nil or not in various places in the code.
Expand Down Expand Up @@ -2209,26 +2209,23 @@ func (s *Server) updateServerINFOAndSendINFOToClients(curls, wsurls []string, ad
s.mu.Lock()
defer s.mu.Unlock()

// Will be set to true if we alter the server's Info object.
remove := !add
checkMap := func(urls []string, m map[string]struct{}) bool {
// Will return true if we need alter the server's Info object.
updateMap := func(urls []string, m refCountedUrlSet) bool {
wasUpdated := false
for _, url := range urls {
_, present := m[url]
if add && !present {
m[url] = struct{}{}
if add && m.addUrl(url) {
wasUpdated = true
} else if remove && present {
delete(m, url)
} else if remove && m.removeUrl(url) {
wasUpdated = true
}
}
return wasUpdated
}
cliUpdated := checkMap(curls, s.clientConnectURLsMap)
wsUpdated := checkMap(wsurls, s.websocket.connectURLsMap)
cliUpdated := updateMap(curls, s.clientConnectURLsMap)
wsUpdated := updateMap(wsurls, s.websocket.connectURLsMap)

updateInfo := func(infoURLs *[]string, urls []string, m map[string]struct{}) {
updateInfo := func(infoURLs *[]string, urls []string, m refCountedUrlSet) {
// Recreate the info's slice from the map
*infoURLs = (*infoURLs)[:0]
// Add this server client connect ULRs first...
Expand Down
45 changes: 45 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,51 @@ func TestClientAdvertiseConnectURL(t *testing.T) {
s.Shutdown()
}

func TestClientAdvertiseInCluster(t *testing.T) {
optsA := DefaultOptions()
optsA.ClientAdvertise = "srvA:4222"
srvA := RunServer(optsA)
defer srvA.Shutdown()

nc := natsConnect(t, srvA.ClientURL())
defer nc.Close()

optsB := DefaultOptions()
optsB.ClientAdvertise = "srvBC:4222"
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Cluster.Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()

checkClusterFormed(t, srvA, srvB)

checkURLs := func(expected string) {
t.Helper()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
srvs := nc.DiscoveredServers()
for _, u := range srvs {
if u == expected {
return nil
}
}
return fmt.Errorf("Url %q not found in %q", expected, srvs)
})
}
checkURLs("nats://srvBC:4222")

optsC := DefaultOptions()
optsC.ClientAdvertise = "srvBC:4222"
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Cluster.Port))
srvC := RunServer(optsC)
defer srvC.Shutdown()

checkClusterFormed(t, srvA, srvB, srvC)
checkURLs("nats://srvBC:4222")

srvB.Shutdown()
checkNumRoutes(t, srvA, 1)
checkURLs("nats://srvBC:4222")
}

func TestClientAdvertiseErrorOnStartup(t *testing.T) {
opts := DefaultOptions()
// Set invalid address
Expand Down
38 changes: 38 additions & 0 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
"time"
)

// This map is used to store URLs string as the key with a reference count as
// the value. This is used to handle gossiped URLs such as connect_urls, etc..
type refCountedUrlSet map[string]int

// Ascii numbers 0-9
const (
asciiZero = 48
Expand Down Expand Up @@ -150,3 +154,37 @@ func comma(v int64) string {
parts[j] = strconv.Itoa(int(v))
return sign + strings.Join(parts[j:], ",")
}

// Adds urlStr to the given map. If the string was already present, simply
// bumps the reference count.
// Returns true only if it was added for the first time.
func (m refCountedUrlSet) addUrl(urlStr string) bool {
m[urlStr]++
return m[urlStr] == 1
}

// Removes urlStr from the given map. If the string is not present, nothing
// is done and false is returned.
// If the string was present, its reference count is decreased. Returns true
// if this was the last reference, false otherwise.
func (m refCountedUrlSet) removeUrl(urlStr string) bool {
removed := false
if ref, ok := m[urlStr]; ok {
if ref == 1 {
removed = true
delete(m, urlStr)
} else {
m[urlStr]--
}
}
return removed
}

// Returns the unique URLs in this map as a slice
func (m refCountedUrlSet) getAsStringSlice() []string {
a := make([]string, 0, len(m))
for u := range m {
a = append(a, u)
}
return a
}
2 changes: 1 addition & 1 deletion server/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type srvWebsocket struct {
allowedOrigins map[string]*allowedOrigin // host will be the key
sameOrigin bool
connectURLs []string
connectURLsMap map[string]struct{}
connectURLsMap refCountedUrlSet
users map[string]*User
nkeys map[string]*NkeyUser
authOverride bool // indicate if there is auth override in websocket config
Expand Down

0 comments on commit 4a4a36f

Please sign in to comment.