Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] LeafNode reject duplicate remote #1738

Merged
merged 1 commit into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ const (
MsgHeaderViolation
NoRespondersRequiresHeaders
ClusterNameConflict
DuplicateRemoteLeafnodeConnection
)

// Some flags passed to processMsgResults
Expand Down
88 changes: 44 additions & 44 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
if solicited {
// Make sure we register with the account here.
c.registerWithAccount(acc)
s.addLeafNodeConnection(c)
s.addLeafNodeConnection(c, _EMPTY_, false)
s.initLeafNodeSmapAndSendSubs(c)
if sendSysConnectEvent {
s.sendLeafNodeConnect(acc)
Expand Down Expand Up @@ -1006,14 +1006,53 @@ func (s *Server) setLeafNodeInfoHostPortAndIP() error {
return nil
}

func (s *Server) addLeafNodeConnection(c *client) {
// Add the connection to the map of leaf nodes.
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
// if a connection already exists for the same server name (ID) and account.
// That can happen when the remote is attempting to reconnect while the accepting
// side did not detect the connection as broken yet.
// But it can also happen when there is a misconfiguration and the remote is
// creating two (or more) connections that bind to the same account on the accept
// side.
// When a duplicate is found, the new connection is accepted and the old is closed
// (this solves the stale connection situation). An error is returned to help the
// remote detect the misconfiguration when the duplicate is the result of that
// misconfiguration.
func (s *Server) addLeafNodeConnection(c *client, srvName string, checkForDup bool) {
var accName string
c.mu.Lock()
cid := c.cid
if c.acc != nil {
accName = c.acc.Name
}
c.mu.Unlock()

var old *client
s.mu.Lock()
if checkForDup {
for _, ol := range s.leafs {
ol.mu.Lock()
// We check for empty because in some test we may send empty CONNECT{}
if srvName != _EMPTY_ && ol.opts.Name == srvName && ol.acc.Name == accName {
old = ol
}
ol.mu.Unlock()
if old != nil {
break
}
}
}
// Store new connection in the map
s.leafs[cid] = c
s.mu.Unlock()
s.removeFromTempClients(cid)

// If applicable, evict the old one.
if old != nil {
old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
old.closeConnection(DuplicateRemoteLeafnodeConnection)
c.Warnf("Replacing connection from same server")
}
}

func (s *Server) removeLeafNodeConnection(c *client) {
Expand Down Expand Up @@ -1073,9 +1112,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
return ErrWrongGateway
}

// Check for stale connection from same server/account
c.replaceOldLeafNodeConnIfNeeded(s, proto)

// Leaf Nodes do not do echo or verbose or pedantic.
c.opts.Verbose = false
c.opts.Echo = false
Expand All @@ -1091,6 +1127,9 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
c.leaf.remoteCluster = proto.Cluster
}

// Add in the leafnode here since we passed through auth at this point.
s.addLeafNodeConnection(c, proto.Name, true)

// If we have permissions bound to this leafnode we need to send then back to the
// origin server for local enforcement.
s.sendPermsInfo(c)
Expand All @@ -1099,52 +1138,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
// This will send all registered subs too.
s.initLeafNodeSmapAndSendSubs(c)

// Add in the leafnode here since we passed through auth at this point.
s.addLeafNodeConnection(c)

// Announce the account connect event for a leaf node.
// This will no-op as needed.
s.sendLeafNodeConnect(c.acc)

return nil
}

// Invoked from a server accepting a leafnode connection. It looks for a possible
// existing leafnode connection from the same server with the same account, and
// if it finds one, closes it so that the new one is accepted and not reported as
// forming a cycle.
//
// This must be invoked for LEAF client types, and on the server accepting the connection.
//
// No server nor client lock held on entry.
func (c *client) replaceOldLeafNodeConnIfNeeded(s *Server, connInfo *leafConnectInfo) {
var accName string
c.mu.Lock()
if c.acc != nil {
accName = c.acc.Name
}
c.mu.Unlock()

var old *client
s.mu.Lock()
for _, ol := range s.leafs {
ol.mu.Lock()
// We check for empty because in some test we may send empty CONNECT{}
if ol.opts.Name == connInfo.Name && connInfo.Name != _EMPTY_ && ol.acc.Name == accName {
old = ol
}
ol.mu.Unlock()
if old != nil {
break
}
}
s.mu.Unlock()
if old != nil {
old.Warnf("Replacing connection from same server")
old.closeConnection(ReadError)
}
}

// Returns the remote cluster name. This is set only once so does not require a lock.
func (c *client) remoteCluster() string {
if c.leaf == nil {
Expand Down
60 changes: 60 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,66 @@ func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) {
checkLeafNodeConnected(t, sl)
}

func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) {
opts := DefaultOptions()
opts.LeafNode.Host = "127.0.0.1"
opts.LeafNode.Port = -1
s := RunServer(opts)
defer s.Shutdown()

conf := `
listen: 127.0.0.1:-1
cluster { name: ln22, listen: 127.0.0.1:-1 }
accounts {
a { users [ {user: a, password: a} ]}
b { users [ {user: b, password: b} ]}
}
leafnodes {
remotes = [
{
url: nats-leaf://127.0.0.1:%d
account: a
}
{
url: nats-leaf://127.0.0.1:%d
account: b
}
]
}
`
lconf := createConfFile(t, []byte(fmt.Sprintf(conf, opts.LeafNode.Port, opts.LeafNode.Port)))
defer os.Remove(lconf)

lopts, err := ProcessConfigFile(lconf)
if err != nil {
t.Fatalf("Error loading config file: %v", err)
}
lopts.NoLog = false
ln, err := NewServer(lopts)
if err != nil {
t.Fatalf("Error creating server: %v", err)
}
defer ln.Shutdown()
l := &captureErrorLogger{errCh: make(chan string, 10)}
ln.SetLogger(l, false, false)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ln.Start()
}()

select {
case err := <-l.errCh:
fmt.Printf("@@IK: err=%q\n", err)
case <-time.After(2 * time.Second):
t.Fatal("Did not get any error")
}
ln.Shutdown()
wg.Wait()
}

func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
// This set the cluster name to "abc"
oSrv1 := DefaultOptions()
Expand Down
2 changes: 2 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1913,6 +1913,8 @@ func (reason ClosedState) String() string {
return "No Responders Requires Headers"
case ClusterNameConflict:
return "Cluster Name Conflict"
case DuplicateRemoteLeafnodeConnection:
return "Duplicate Remote LeafNode Connection"
}

return "Unknown State"
Expand Down