Skip to content

Commit

Permalink
Merge pull request #980 from nats-io/leafupdates
Browse files Browse the repository at this point in the history
Leafnode updates
  • Loading branch information
derekcollison committed May 2, 2019
2 parents 74c71f9 + 90211e5 commit dacf0a4
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 26 deletions.
3 changes: 3 additions & 0 deletions server/auth.go
Expand Up @@ -643,6 +643,9 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
return true
}

// For now this means we are binding the leafnode to the global account.
c.registerWithAccount(s.globalAccount())

// Snapshot server options.
opts := s.getOpts()

Expand Down
4 changes: 2 additions & 2 deletions server/client.go
Expand Up @@ -141,6 +141,7 @@ const (
ServerShutdown
AuthenticationExpired
WrongGateway
MissingAccount
)

// Some flags passed to processMsgResultsEx
Expand Down Expand Up @@ -549,9 +550,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) error {
}

c.mu.Lock()
defer c.mu.Unlock()
c.user = user

// Assign permissions.
if user.Permissions == nil {
// Reset perms to nil in case client previously had them.
Expand All @@ -560,6 +559,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) error {
} else {
c.setPermissions(user.Permissions)
}
c.mu.Unlock()
return nil
}

Expand Down
14 changes: 11 additions & 3 deletions server/events.go
Expand Up @@ -773,7 +773,7 @@ func (s *Server) accountConnectEvent(c *client) {
Start: c.start,
Host: c.host,
ID: c.cid,
Account: c.acc.Name,
Account: accForClient(c),
User: nameForClient(c),
Name: c.opts.Name,
Lang: c.opts.Lang,
Expand Down Expand Up @@ -812,7 +812,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
Stop: &now,
Host: c.host,
ID: c.cid,
Account: c.acc.Name,
Account: accForClient(c),
User: nameForClient(c),
Name: c.opts.Name,
Lang: c.opts.Lang,
Expand Down Expand Up @@ -853,7 +853,7 @@ func (s *Server) sendAuthErrorEvent(c *client) {
Stop: &now,
Host: c.host,
ID: c.cid,
Account: c.acc.Name,
Account: accForClient(c),
User: nameForClient(c),
Name: c.opts.Name,
Lang: c.opts.Lang,
Expand Down Expand Up @@ -941,6 +941,14 @@ func nameForClient(c *client) string {
return "N/A"
}

// Helper to grab account name for a client.
func accForClient(c *client) string {
if c.acc != nil {
return c.acc.Name
}
return "N/A"
}

// Helper to clear timers.
func clearTimer(tp **time.Timer) {
if t := *tp; t != nil {
Expand Down
56 changes: 38 additions & 18 deletions server/leafnode.go
Expand Up @@ -73,7 +73,7 @@ func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
return false
}

// Ensure that gateway is properly configured.
// Ensure that leafnode is properly configured.
func validateLeafNode(o *Options) error {
if o.LeafNode.Port == 0 {
return nil
Expand Down Expand Up @@ -443,9 +443,13 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
remote.LocalAccount = globalAccountName
}
// FIXME(dlc) - Make this resolve at startup.
c.acc, _ = s.LookupAccount(remote.LocalAccount)
// Make sure we register with the account here.
c.registerWithAccount(c.acc)
acc, err := s.LookupAccount(remote.LocalAccount)
if err != nil {
c.Debugf("Can not locate local account %q for leafnode", remote.LocalAccount)
c.closeConnection(MissingAccount)
return nil
}
c.acc = acc
c.leaf.remote = remote
}

Expand Down Expand Up @@ -541,6 +545,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {

c.sendLeafConnect(tlsRequired)
c.Debugf("Remote leaf node connect msg sent")

} else {
// Send our info to the other side.
// Remember the nonce we sent here for signatures, etc.
Expand Down Expand Up @@ -600,8 +605,12 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
c.mu.Unlock()

// Update server's accounting here if we solicited.
// Also send our local subs.
if solicited {
// Make sure we register with the account here.
c.registerWithAccount(c.acc)
s.addLeafNodeConnection(c)
c.sendAllAccountSubs()
}

return c
Expand Down Expand Up @@ -762,7 +771,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
c.opts.Pedantic = false

// Create and initialize the smap since we know our bound account now.
c.initLeafNodeSmap()
s.initLeafNodeSmap(c)

// We are good to go, send over all the bound account subscriptions.
s.startGoRoutine(func() {
Expand All @@ -782,17 +791,18 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro

// Snapshot the current subscriptions from the sublist into our smap which
// we will keep updated from now on.
func (c *client) initLeafNodeSmap() {
func (s *Server) initLeafNodeSmap(c *client) {
acc := c.acc
if acc == nil {
c.Debugf("Leaf node does not have an account bound")
return
}
// Collect all subs here.
// Collect all account subs here.
_subs := [32]*subscription{}
subs := _subs[:0]
ims := []string{}
acc.mu.RLock()
accName := acc.Name
acc.sl.All(&subs)
// Since leaf nodes only send on interest, if the bound
// account has import services we need to send those over.
Expand All @@ -801,6 +811,24 @@ func (c *client) initLeafNodeSmap() {
}
acc.mu.RUnlock()

// Now check for gateway interest. Leafnodes will put this into
// the proper mode to propagate, but they are not held in the account.
gwsa := [16]*client{}
gws := gwsa[:0]
s.getOutboundGatewayConnections(&gws)
for _, cgw := range gws {
cgw.mu.Lock()
gw := cgw.gw
cgw.mu.Unlock()
if gw != nil {
if ei, _ := gw.outsim.Load(accName); ei != nil {
if e := ei.(*outsie); e != nil && e.sl != nil {
e.sl.All(&subs)
}
}
}
}

// Now walk the results and add them to our smap
c.mu.Lock()
for _, sub := range subs {
Expand Down Expand Up @@ -900,23 +928,21 @@ func keyFromSub(sub *subscription) string {
// Send all subscriptions for this account that include local
// and all subscriptions besides our own.
func (c *client) sendAllAccountSubs() {
c.mu.Lock()
defer c.mu.Unlock()

// Hold all at once for now.
var b bytes.Buffer

c.mu.Lock()
for key, n := range c.leaf.smap {
c.writeLeafSub(&b, key, n)
}

// We will make sure we don't overflow here due to an max_pending.
chunks := protoChunks(b.Bytes(), MAX_PAYLOAD_SIZE)

for _, chunk := range chunks {
c.queueOutbound(chunk)
c.flushOutbound()
}
c.mu.Unlock()
}

func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
Expand Down Expand Up @@ -1075,13 +1101,7 @@ func (c *client) processLeafUnsub(arg []byte) error {
updateGWs = srv.gateway.enabled
}

// Treat leaf node subscriptions similar to a client subscription, meaning we
// send them to both routes and gateways and other leaf nodes. We also do
// the shadow subscriptions.
if err := c.addShadowSubscriptions(acc, sub); err != nil {
c.Errorf(err.Error())
}
// If we are routing add to the route map for the associated account.
// If we are routing subtract from the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, -1)
// Gateways
if updateGWs {
Expand Down
3 changes: 3 additions & 0 deletions server/monitor.go
Expand Up @@ -939,6 +939,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.MaxPending = opts.MaxPending
v.WriteDeadline = opts.WriteDeadline
// FIXME(dlc) - make this multi-account aware.
v.Subscriptions = s.gacc.sl.Count()
v.ConfigLoadTime = s.configTime
// Need a copy here since s.httpReqStats can change while doing
Expand Down Expand Up @@ -1045,6 +1046,8 @@ func (reason ClosedState) String() string {
return "Authentication Expired"
case WrongGateway:
return "Wrong Gateway"
case MissingAccount:
return "Missing Account"
}
return "Unknown State"
}
1 change: 1 addition & 0 deletions server/opts.go
Expand Up @@ -227,6 +227,7 @@ func (o *Options) Clone() *Options {
clone.Gateway.Gateways[i] = g.clone()
}
}
// FIXME(dlc) - clone leaf node stuff.
return clone
}

Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Expand Up @@ -975,7 +975,7 @@ func (s *Server) Start() {
<-ch
}

// Solict remote servers for leaf node connections.
// Solicit remote servers for leaf node connections.
if len(opts.LeafNode.Remotes) > 0 {
s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
}
Expand Down
47 changes: 45 additions & 2 deletions test/leafnode_test.go
Expand Up @@ -654,6 +654,51 @@ func TestLeafNodeGatewaySendsSystemEvent(t *testing.T) {
}
}

func TestLeafNodeGatewayInterestPropagation(t *testing.T) {
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
defer server.ResetGatewaysSolicitDelay()

ca := createClusterWithName(t, "A", 3)
defer shutdownCluster(ca)
cb := createClusterWithName(t, "B", 3, ca)
defer shutdownCluster(cb)

sl1, sl1Opts := runSolicitLeafServer(ca.opts[1])
defer sl1.Shutdown()

c := createClientConn(t, sl1Opts.Host, sl1Opts.Port)
defer c.Close()

send, expect := setupConn(t, c)
send("SUB foo 1\r\n")
send("PING\r\n")
expect(pongRe)

// Now we will create a new leaf node on cluster B, expect to get the
// interest for "foo".
opts := cb.opts[0]
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
_, leafExpect := setupConn(t, lc)
buf := leafExpect(lsubRe)
if !strings.Contains(string(buf), "foo") {
t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", buf)
}
}

func TestLeafNodeAuthSystemEventNoCrash(t *testing.T) {
ca := createClusterWithName(t, "A", 1)
defer shutdownCluster(ca)

opts := ca.opts[0]
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()

leafSend := sendCommand(t, lc)
leafSend("LS+ foo\r\n")
checkInfoMsg(t, lc)
}

func TestLeafNodeWithRouteAndGateway(t *testing.T) {
server.SetGatewaysSolicitDelay(50 * time.Millisecond)
defer server.ResetGatewaysSolicitDelay()
Expand Down Expand Up @@ -702,7 +747,6 @@ func TestLeafNodeWithRouteAndGateway(t *testing.T) {
expect(pongRe)
leafExpect(lsubRe)

//leafSend("LMSG foo + myreply bar 2\r\nOK\r\n")
leafSend("LMSG foo 2\r\nOK\r\n")
expectNothing(t, lc)

Expand Down Expand Up @@ -1641,7 +1685,6 @@ func TestLeafNodeConnectionLimitsSingleServer(t *testing.T) {
defer s4.Shutdown()

if nln := acc.NumLeafNodes(); nln != 2 {
fmt.Printf("Acc is %q\n", acc.Name)
t.Fatalf("Expected 2 leaf nodes, got %d", nln)
}

Expand Down

0 comments on commit dacf0a4

Please sign in to comment.