Skip to content

Commit

Permalink
Merge pull request #1816 from nats-io/fix_gw_hash
Browse files Browse the repository at this point in the history
Fixed gateway reply mapping following changes in JetStream clustering
  • Loading branch information
kozlovic committed Jan 16, 2021
2 parents 1874964 + ef38abe commit de1bf36
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 19 deletions.
5 changes: 5 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@ func (s *Server) startRemoteServerSweepTimer() {
// Length of our system hash used for server targeted messages.
const sysHashLen = 8

// Computes a hash of 8 characters for the name.
func getHash(name string) []byte {
return getHashSize(name, sysHashLen)
}

// This will setup our system wide tracking subs.
// For now we will setup one wildcard subscription to
// monitor all accounts for changes in number of connections.
Expand Down
63 changes: 50 additions & 13 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
// hash of origin cluster name and <server> is 6 characters hash of origin server pub key.
gwReplyPrefix = "_GR_."
gwReplyPrefixLen = len(gwReplyPrefix)
gwHashLen = sysHashLen
gwHashLen = 6
gwClusterOffset = gwReplyPrefixLen
gwServerOffset = gwClusterOffset + gwHashLen + 1
gwSubjectOffset = gwServerOffset + gwHashLen + 1
Expand Down Expand Up @@ -161,6 +161,10 @@ type srvGateway struct {
resolver netResolver // Used to resolve host name before calling net.Dial()
sqbsz int // Max buffer size to send queue subs protocol. Used for testing.
recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply

// These are used for routing of mapped replies.
sIDHash []byte // Server ID hash (6 bytes)
routesIDByHash sync.Map // Route's server ID is hashed (6 bytes) and stored in this map.
}

// Subject interest tally. Also indicates if the key in the map is a
Expand Down Expand Up @@ -273,16 +277,21 @@ func validateGatewayOptions(o *Options) error {
return nil
}

// Computes a hash of 8 characters for the name.
// This will be used for routing of replies.
func getHash(name string) []byte {
// Computes a hash for the given `name`. The result will be `size` characters long.
func getHashSize(name string, size int) []byte {
sha := sha256.New()
sha.Write([]byte(name))
b := sha.Sum(nil)
for i := 0; i < gwHashLen; i++ {
for i := 0; i < size; i++ {
b[i] = digits[int(b[i]%base)]
}
return b[:gwHashLen]
return b[:size]
}

// Computes a hash of 6 characters for the name.
// This will be used for routing of replies.
func getGWHash(name string) []byte {
return getHashSize(name, gwHashLen)
}

func getOldHash(name string) []byte {
Expand Down Expand Up @@ -311,13 +320,13 @@ func (s *Server) newGateway(opts *Options) error {
gateway.Lock()
defer gateway.Unlock()

s.hash = getHash(s.info.ID)
clusterHash := getHash(opts.Gateway.Name)
gateway.sIDHash = getGWHash(s.info.ID)
clusterHash := getGWHash(opts.Gateway.Name)
prefix := make([]byte, 0, gwSubjectOffset)
prefix = append(prefix, gwReplyPrefix...)
prefix = append(prefix, clusterHash...)
prefix = append(prefix, '.')
prefix = append(prefix, s.hash...)
prefix = append(prefix, gateway.sIDHash...)
prefix = append(prefix, '.')
gateway.replyPfx = prefix

Expand All @@ -341,7 +350,7 @@ func (s *Server) newGateway(opts *Options) error {
}
cfg := &gatewayCfg{
RemoteGatewayOpts: rgo.clone(),
hash: getHash(rgo.Name),
hash: getGWHash(rgo.Name),
oldHash: getOldHash(rgo.Name),
urls: make(map[string]*url.URL, len(rgo.URLs)),
}
Expand Down Expand Up @@ -1329,7 +1338,7 @@ func (s *Server) processImplicitGateway(info *Info) {
opts := s.getOpts()
cfg = &gatewayCfg{
RemoteGatewayOpts: &RemoteGatewayOpts{Name: gwName},
hash: getHash(gwName),
hash: getGWHash(gwName),
oldHash: getOldHash(gwName),
urls: make(map[string]*url.URL, len(info.GatewayURLs)),
implicit: true,
Expand Down Expand Up @@ -2650,6 +2659,34 @@ func (s *Server) getRouteByHash(srvHash []byte) *client {
return route
}

// Store this route in map with the key being the remote server's name hash
// and the remote server's ID hash used by gateway replies mapping routing.
func (s *Server) storeRouteByHash(srvNameHash, srvIDHash string, c *client) {
s.routesByHash.Store(srvNameHash, c)
if !s.gateway.enabled {
return
}
s.gateway.routesIDByHash.Store(srvIDHash, c)
}

// Remove the route with the given keys from the map.
func (s *Server) removeRouteByHash(srvNameHash, srvIDHash string) {
s.routesByHash.Delete(srvNameHash)
if !s.gateway.enabled {
return
}
s.gateway.routesIDByHash.Delete(srvIDHash)
}

// Returns the route with given hash or nil if not found.
// This is for gateways only.
func (g *srvGateway) getRouteByHash(hash []byte) *client {
if v, ok := g.routesIDByHash.Load(string(hash)); ok {
return v.(*client)
}
return nil
}

// Returns the subject from the routed reply
func getSubjectFromGWRoutedReply(reply []byte, isOldPrefix bool) []byte {
if isOldPrefix {
Expand Down Expand Up @@ -2705,8 +2742,8 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
var route *client

// If the origin is not this server, get the route this should be sent to.
if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.hash) {
route = c.srv.getRouteByHash(srvHash)
if c.kind == GATEWAY && srvHash != nil && !bytes.Equal(srvHash, c.srv.gateway.sIDHash) {
route = c.srv.gateway.getRouteByHash(srvHash)
// This will be possibly nil, and in this case we will try to process
// the interest from this server.
}
Expand Down
18 changes: 13 additions & 5 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type route struct {
gatewayURL string
leafnodeURL string
hash string
idHash string
}

type connectInfo struct {
Expand Down Expand Up @@ -611,8 +612,12 @@ func (c *client) processRouteInfo(info *Info) {
if len(info.LeafNodeURLs) == 1 {
c.route.leafnodeURL = info.LeafNodeURLs[0]
}
// Compute the hash of this route based on remoteID
// Compute the hash of this route based on remote server name
c.route.hash = string(getHash(info.Name))
// Same with remote server ID (used for GW mapped replies routing).
// Use getGWHash since we don't use the same hash len for that
// for backward compatibility.
c.route.idHash = string(getGWHash(info.ID))

// Copy over permissions as well.
c.opts.Import = info.Import
Expand Down Expand Up @@ -1457,11 +1462,12 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
c.route.connectURLs = info.ClientConnectURLs
c.route.wsConnURLs = info.WSConnectURLs
cid := c.cid
hash := string(c.route.hash)
hash := c.route.hash
idHash := c.route.idHash
c.mu.Unlock()

// Store this route using the hash as the key
s.routesByHash.Store(hash, c)
s.storeRouteByHash(hash, idHash, c)

// Now that we have registered the route, we can remove from the temp map.
s.removeFromTempClients(cid)
Expand Down Expand Up @@ -1499,7 +1505,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
// from our list.
c.route.leafnodeURL = _EMPTY_
// Same for the route hash otherwise it would be removed from s.routesByHash.
c.route.hash = _EMPTY_
c.route.hash, c.route.idHash = _EMPTY_, _EMPTY_
c.mu.Unlock()

remote.mu.Lock()
Expand Down Expand Up @@ -2022,13 +2028,15 @@ func (s *Server) removeRoute(c *client) {
var lnURL string
var gwURL string
var hash string
var idHash string
c.mu.Lock()
cid := c.cid
r := c.route
if r != nil {
rID = r.remoteID
lnURL = r.leafnodeURL
hash = r.hash
idHash = r.idHash
gwURL = r.gatewayURL
}
c.mu.Unlock()
Expand All @@ -2050,7 +2058,7 @@ func (s *Server) removeRoute(c *client) {
if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) {
s.sendAsyncLeafNodeInfo()
}
s.routesByHash.Delete(hash)
s.removeRouteByHash(hash, idHash)
}
s.removeFromTempClients(cid)
s.mu.Unlock()
Expand Down
1 change: 0 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ type Server struct {
clients map[uint64]*client
routes map[uint64]*client
routesByHash sync.Map
hash []byte
remotes map[string]*client
leafs map[uint64]*client
users map[string]*User
Expand Down

0 comments on commit de1bf36

Please sign in to comment.