Skip to content

Commit

Permalink
Merge pull request #2261 from nats-io/leaf_fixups
Browse files Browse the repository at this point in the history
Changes to leafnodes and JetStream
  • Loading branch information
derekcollison committed Jun 7, 2021
2 parents 3ab2a1e + 30fae4f commit 59b27a7
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 84 deletions.
3 changes: 3 additions & 0 deletions server/jetstream.go
Expand Up @@ -1273,6 +1273,9 @@ func (js *jetStream) disableJetStream(jsa *jsAccount) error {
// jetStreamConfigured reports whether the account has JetStream configured, regardless of this
// servers JetStream status.
func (a *Account) jetStreamConfigured() bool {
if a == nil {
return false
}
a.mu.RLock()
defer a.mu.RUnlock()
return a.jsLimits != nil
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream_api.go
Expand Up @@ -830,7 +830,7 @@ const badAPIRequestT = "Malformed JetStream API Request: %q"
func (a *Account) checkJetStream() (enabled, shouldError bool) {
a.mu.RLock()
defer a.mu.RUnlock()
return a.js != nil, a.nleafs == 0
return a.js != nil, a.nleafs+a.nrleafs == 0
}

// Request for current usage and limits for this account.
Expand Down Expand Up @@ -876,6 +876,7 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, rep
if err != nil {
return
}

s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
}

Expand Down
52 changes: 52 additions & 0 deletions server/jetstream_cluster_test.go
Expand Up @@ -6460,6 +6460,34 @@ func TestJetStreamClusterDomainsAndSameNameSources(t *testing.T) {
}
}

// When a leafnode enables JS on an account that is not enabled on the remote cluster account this should
// still work. Early NGS beta testers etc.
func TestJetStreamClusterSingleLeafNodeEnablingJetStream(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 11322, true)
defer c.shutdown()

ln := c.createSingleLeafNodeNoSystemAccountAndEnablesJetStream()
defer ln.Shutdown()

// Check that we have JS in the $G account on the leafnode.
nc, js := jsClientConnect(t, ln)
defer nc.Close()

if _, err := js.AccountInfo(); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Connect our client to the "nojs" account in the cluster but make sure JS works since its enabled via the leafnode.
s := c.randomServer()
nc, js = jsClientConnect(t, s, nats.UserInfo("nojs", "p"))
defer nc.Close()

if _, err := js.AccountInfo(); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) {
c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 2, 33133, false)
defer c.shutdown()
Expand Down Expand Up @@ -7101,6 +7129,7 @@ var jsClusterAccountsTempl = `
accounts {
ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
TWO { users = [ { user: "two", pass: "p" } ]; jetstream: enabled }
NOJS { users = [ { user: "nojs", pass: "p" } ] }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
Expand Down Expand Up @@ -7508,6 +7537,29 @@ func (c *cluster) createSingleLeafNodeNoSystemAccount() *Server {
return s
}

// This is tied to jsClusterAccountsTempl, so changes there to users needs to be reflected here.
func (c *cluster) createSingleLeafNodeNoSystemAccountAndEnablesJetStream() *Server {
as := c.randomServer()
lno := as.getOpts().LeafNode
ln := fmt.Sprintf("nats://nojs:p@%s:%d", lno.Host, lno.Port)
conf := fmt.Sprintf(jsClusterSingleLeafNodeLikeNGSTempl, createDir(c.t, JetStreamStoreDir), ln)
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
c.servers = append(c.servers, s)
c.opts = append(c.opts, o)

checkLeafNodeConnectedCount(c.t, as, 1)

return s
}

var jsClusterSingleLeafNodeLikeNGSTempl = `
listen: 127.0.0.1:-1
server_name: LNJS
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
leaf { remotes [ { urls: [ %s ] } ] }
`

var jsClusterSingleLeafNodeTempl = `
listen: 127.0.0.1:-1
server_name: LNJS
Expand Down
176 changes: 93 additions & 83 deletions server/leafnode.go
Expand Up @@ -112,43 +112,54 @@ func (c *client) isHubLeafNode() bool {
// are sharing the system account and wanting to extend the JS domain.
// r lock should be held.
func (s *Server) addInJSDeny(r *leafNodeCfg) {
var hasDE, hasDI bool
s.addInJSDenyExport(r)
s.addInJSDenyImport(r)
}

// Will add in the deny export for JetStream on solicited connections if we
// detect we have multiple JetStream domains and we know our local account
// is JetStream enabled.
// r lock should be held.
func (s *Server) addInJSDenyExport(r *leafNodeCfg) {
for _, dsubj := range r.DenyExports {
if dsubj == jsAllAPI {
hasDE = true
break
return
}
}

s.Noticef("Adding deny export of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount)
r.DenyExports = append(r.DenyExports, jsAllAPI)

// We added in some deny clauses here so need to regenerate the permissions etc.
perms := &Permissions{}
perms.Publish = &SubjectPermission{Deny: r.DenyExports}
if len(r.DenyImports) > 0 {
perms.Subscribe = &SubjectPermission{Deny: r.DenyImports}
}
r.perms = perms
}

// Will add in the deny import for JetStream on solicited connections if we
// detect we have multiple JetStream domains and we know our local account
// is JetStream enabled.
// r lock should be held.
func (s *Server) addInJSDenyImport(r *leafNodeCfg) {
for _, dsubj := range r.DenyImports {
if dsubj == jsAllAPI {
hasDI = true
break
return
}
}

var addedDeny bool
if !hasDE {
s.Noticef("Adding deny export of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount)
r.DenyExports = append(r.DenyExports, jsAllAPI)
addedDeny = true
}
if !hasDI {
s.Noticef("Adding deny import of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount)
r.DenyImports = append(r.DenyImports, jsAllAPI)
addedDeny = true
}
s.Noticef("Adding deny import of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount)
r.DenyImports = append(r.DenyImports, jsAllAPI)

// We added in some deny clauses here so need to regenerate the permissions etc.
if addedDeny {
perms := &Permissions{}
if len(r.DenyExports) > 0 {
perms.Publish = &SubjectPermission{Deny: r.DenyExports}
}
if len(r.DenyImports) > 0 {
perms.Subscribe = &SubjectPermission{Deny: r.DenyImports}
}
r.perms = perms
perms := &Permissions{}
perms.Subscribe = &SubjectPermission{Deny: r.DenyImports}
if len(r.DenyExports) > 0 {
perms.Publish = &SubjectPermission{Deny: r.DenyExports}
}
r.perms = perms
}

// Used for $SYS accounts when sharing but using separate JS domains.
Expand Down Expand Up @@ -636,17 +647,18 @@ var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-
func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error {
// We support basic user/pass and operator based user JWT with signatures.
cinfo := leafConnectInfo{
TLS: tlsRequired,
ID: c.srv.info.ID,
Name: c.srv.info.Name,
Hub: c.leaf.remote.Hub,
Cluster: clusterName,
Headers: headers,
DenyPub: c.leaf.remote.DenyImports,
TLS: tlsRequired,
ID: c.srv.info.ID,
Name: c.srv.info.Name,
Hub: c.leaf.remote.Hub,
Cluster: clusterName,
Headers: headers,
JetStream: c.acc.jetStreamConfigured(),
DenyPub: c.leaf.remote.DenyImports,
}

// Check for credentials first, that will take precedence..
if creds := c.leaf.remote.Credentials; creds != "" {
if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
contents, err := ioutil.ReadFile(creds)
if err != nil {
Expand Down Expand Up @@ -996,28 +1008,34 @@ func (c *client) processLeafnodeInfo(info *Info) {
// This is so that if JetStream is enabled on both sides we can separately address both.
if remote, acc := c.leaf.remote, c.acc; remote != nil {
remote.Lock()

// JetStream checks for mappings and permissions updates.
hasJSDomain := opts.JetStreamDomain != _EMPTY_
if acc != sysAcc {
if hasSysShared {
s.addInJSDeny(remote)
} else {
// Here we want to suppress if this local account has JS enabled.
// This is regardless of whether or not this server is actually running JS.
// We do consider this if the other side is not running JetStream.
if acc != nil && acc.jetStreamConfigured() && info.JetStream {
s.addInJSDeny(remote)
// We only suppress export. But we do send an indication about our JetStream
// status in the connect and the hub side will suppress as well if the remote
// account also has JetStream enabled.
if acc.jetStreamConfigured() {
s.addInJSDenyExport(remote)
}
}
// If we have a specified JetStream domain we will want to add a mapping to
// allow access cross domain for each non-system account.
if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && acc.jetStreamConfigured() {
if hasJSDomain && acc.jetStreamConfigured() {
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
if err := acc.AddMapping(src, jsAllAPI); err != nil {
c.Debugf("Error adding JetStream domain mapping: %v", err)
}
}
} else if opts.JetStreamDomain != _EMPTY_ {
} else if hasJSDomain {
s.addInJSDenyAll(remote)
}

c.setPermissions(remote.perms)
remote.Unlock()
}
Expand All @@ -1039,9 +1057,15 @@ func (c *client) processLeafnodeInfo(info *Info) {
// Check if we have local deny clauses that we need to merge.
if remote := c.leaf.remote; remote != nil {
if len(remote.DenyExports) > 0 {
if perms.Publish == nil {
perms.Publish = &SubjectPermission{}
}
perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
}
if len(remote.DenyImports) > 0 {
if perms.Subscribe == nil {
perms.Subscribe = &SubjectPermission{}
}
perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
}
}
Expand Down Expand Up @@ -1228,21 +1252,24 @@ func (s *Server) removeLeafNodeConnection(c *client) {
s.removeFromTempClients(cid)
}

// Connect information for solicited leafnodes.
type leafConnectInfo struct {
JWT string `json:"jwt,omitempty"`
Sig string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
TLS bool `json:"tls_required"`
Comp bool `json:"compression,omitempty"`
ID string `json:"server_id,omitempty"`
Name string `json:"name,omitempty"`
Hub bool `json:"is_hub,omitempty"`
Cluster string `json:"cluster,omitempty"`
Headers bool `json:"headers,omitempty"`
JWT string `json:"jwt,omitempty"`
Sig string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
TLS bool `json:"tls_required"`
Comp bool `json:"compression,omitempty"`
ID string `json:"server_id,omitempty"`
Name string `json:"name,omitempty"`
Hub bool `json:"is_hub,omitempty"`
Cluster string `json:"cluster,omitempty"`
Headers bool `json:"headers,omitempty"`
JetStream bool `json:"jetstream,omitempty"`
DenyPub []string `json:"deny_pub,omitempty"`

// Just used to detect wrong connection attempts.
Gateway string `json:"gateway,omitempty"`
DenyPub []string `json:"deny_pub,omitempty"`
Gateway string `json:"gateway,omitempty"`
}

// processLeafNodeConnect will process the inbound connect args.
Expand All @@ -1265,7 +1292,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro

// Reject if this has Gateway which means that it would be from a gateway
// connection that incorrectly connects to the leafnode port.
if proto.Gateway != "" {
if proto.Gateway != _EMPTY_ {
errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
c.Errorf(errTxt)
c.sendErr(errTxt)
Expand Down Expand Up @@ -1303,7 +1330,14 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
}

// Check for JetStream domain
doDomainMappings := opts.JetStreamDomain != _EMPTY_ && c.acc != sysAcc && c.acc.jetStreamConfigured()
jsConfigured := c.acc.jetStreamConfigured()
doDomainMappings := opts.JetStreamDomain != _EMPTY_ && c.acc != sysAcc && jsConfigured

// If we have JS enabled and the other side does as well we need to add in an import deny clause.
if jsConfigured && proto.JetStream {
// We should never have existing perms here, if that changes this needs to be reworked.
c.setPermissions(&Permissions{Publish: &SubjectPermission{Deny: []string{jsAllAPI}}})
}

// Set the Ping timer
s.setFirstPingTimer(c)
Expand Down Expand Up @@ -2022,25 +2056,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {
c.in.msgs++
c.in.bytes += int32(len(msg) - LEN_CR_LF)

// Check pub permissions
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) {
subject := c.pa.subject
// If this subject was mapped we need to check the original subject, not the new one.
if len(c.pa.mapped) > 0 {
subject = c.pa.mapped
}
if !c.pubAllowed(string(subject)) {
if c.isHubLeafNode() {
c.leafPubPermViolation(subject)
} else {
c.Debugf("Not permitted to receive from %q", subject)
}
return
}
}

srv := c.srv
acc := c.acc
srv, acc, subject := c.srv, c.acc, string(c.pa.subject)

// Mostly under testing scenarios.
if srv == nil || acc == nil {
Expand All @@ -2054,7 +2070,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {

genid := atomic.LoadUint64(&c.acc.sl.genid)
if genid == c.in.genid && c.in.results != nil {
r, ok = c.in.results[string(c.pa.subject)]
r, ok = c.in.results[subject]
} else {
// Reset our L1 completely.
c.in.results = make(map[string]*SublistResult)
Expand All @@ -2063,13 +2079,13 @@ func (c *client) processInboundLeafMsg(msg []byte) {

// Go back to the sublist data structure.
if !ok {
r = c.acc.sl.Match(string(c.pa.subject))
c.in.results[string(c.pa.subject)] = r
r = c.acc.sl.Match(subject)
c.in.results[subject] = r
// Prune the results cache. Keeps us from unbounded growth. Random delete.
if len(c.in.results) > maxResultCacheSize {
n := 0
for subject := range c.in.results {
delete(c.in.results, subject)
for subj := range c.in.results {
delete(c.in.results, subj)
if n++; n > pruneSize {
break
}
Expand Down Expand Up @@ -2101,12 +2117,6 @@ func (c *client) processInboundLeafMsg(msg []byte) {
}
}

// Handles a publish permission violation.
// See leafPermViolation() for details.
func (c *client) leafPubPermViolation(subj []byte) {
c.leafPermViolation(true, subj)
}

// Handles a subscription permission violation.
// See leafPermViolation() for details.
func (c *client) leafSubPermViolation(subj []byte) {
Expand Down

0 comments on commit 59b27a7

Please sign in to comment.