From eb638e8c87ebe0283c0a7b23aff18f55e36afe6d Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 18 May 2020 15:29:08 -0400 Subject: [PATCH 1/2] [Adding] support for account_token_position Signed-off-by: Matthias Hanel --- go.sum | 8 +- server/accounts.go | 143 ++++++++++++++++++------------- server/accounts_test.go | 25 ++++++ server/jwt_test.go | 180 ++++++++++++++++++++++++++++++++++++++++ server/opts.go | 14 +++- server/opts_test.go | 118 ++++++++++++++++++++++++++ 6 files changed, 425 insertions(+), 63 deletions(-) diff --git a/go.sum b/go.sum index 5e4793ed44..39ea18ffe9 100644 --- a/go.sum +++ b/go.sum @@ -3,9 +3,11 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= @@ -27,10 +29,6 @@ github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5H github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI= -github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6 h1:cpS+9uyfHXvRG/Q+WcDd3KXRgPa9fo9tDbIeDHCxYAg= -github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= -github.com/nats-io/nats.go v1.10.1-0.20210123004354-58bf69ad2df8 h1:yxExhj0DStfAEN5lGy6pyL4WJE+J8aKn50xoKt9hFdA= -github.com/nats-io/nats.go v1.10.1-0.20210123004354-58bf69ad2df8/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a h1:EjwBk6T/arS7o0ZGdMgdzYrQHeUITT1GHf3cFQFtr3I= github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= @@ -54,10 +52,12 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= diff --git a/server/accounts.go b/server/accounts.go index 0b9e95d667..6413470919 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -171,8 +171,9 @@ func (rt ServiceRespType) String() string { // exportAuth holds configured approvals or boolean indicating an // auth token is required for import. type exportAuth struct { - tokenReq bool - approved map[string]*Account + tokenReq bool + accountPos uint + approved map[string]*Account } // streamExport @@ -877,13 +878,48 @@ func (a *Account) removeClient(c *client) int { return n } +func setExportAuth(ea *exportAuth, subject string, accounts []*Account, accountPos uint) error { + if accountPos > 0 { + if token := strings.Split(subject, "."); len(token) < int(accountPos) { + return ErrInvalidSubject + } else if token[accountPos-1] != "*" { + return ErrInvalidSubject + } + } + ea.accountPos = accountPos + // empty means auth required but will be import token. + if accounts == nil { + } else if len(accounts) == 0 { + ea.tokenReq = true + } else { + if ea.approved == nil { + ea.approved = make(map[string]*Account, len(accounts)) + } + for _, acc := range accounts { + ea.approved[acc.Name] = acc + } + } + return nil +} + // AddServiceExport will configure the account with the defined export. func (a *Account) AddServiceExport(subject string, accounts []*Account) error { - return a.AddServiceExportWithResponse(subject, Singleton, accounts) + return a.AddServiceExportWithResponseAndAccountPos(subject, Singleton, accounts, 0) +} + +// AddServiceExport will configure the account with the defined export. +func (a *Account) AddServiceExportWithAccountPos(subject string, accounts []*Account, accountPos uint) error { + return a.AddServiceExportWithResponseAndAccountPos(subject, Singleton, accounts, accountPos) } // AddServiceExportWithResponse will configure the account with the defined export and response type. func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceRespType, accounts []*Account) error { + return a.AddServiceExportWithResponseAndAccountPos(subject, respType, accounts, 0) +} + +// AddServiceExportWithresponse will configure the account with the defined export and response type. +func (a *Account) AddServiceExportWithResponseAndAccountPos( + subject string, respType ServiceRespType, accounts []*Account, accountPos uint) error { if a == nil { return ErrMissingAccount } @@ -895,33 +931,28 @@ func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceR a.exports.services = make(map[string]*serviceExport) } - se := a.exports.services[subject] + ea := a.exports.services[subject] // Always create a service export - if se == nil { - se = &serviceExport{} + if ea == nil { + ea = &serviceExport{} } if respType != Singleton { - se.respType = respType + ea.respType = respType } - if accounts != nil { - // empty means auth required but will be import token. - if len(accounts) == 0 { - se.tokenReq = true - } else { - if se.approved == nil { - se.approved = make(map[string]*Account, len(accounts)) - } - for _, acc := range accounts { - se.approved[acc.Name] = acc - } + if accounts != nil || accountPos > 0 { + if ea == nil { + ea = &serviceExport{} + } + if err := setExportAuth(&ea.exportAuth, subject, accounts, accountPos); err != nil { + return err } } lrt := a.lowestServiceExportResponseTime() - se.acc = a - se.respThresh = DEFAULT_SERVICE_EXPORT_RESPONSE_THRESHOLD - a.exports.services[subject] = se + ea.acc = a + ea.respThresh = DEFAULT_SERVICE_EXPORT_RESPONSE_THRESHOLD + a.exports.services[subject] = ea if nlrt := a.lowestServiceExportResponseTime(); nlrt != lrt { a.updateAllClientsServiceExportResponseTime(nlrt) } @@ -2280,8 +2311,16 @@ func (a *Account) AddStreamImport(account *Account, from, prefix string) error { var IsPublicExport = []*Account(nil) // AddStreamExport will add an export to the account. If accounts is nil -// it will signify a public export, meaning anyone can impoort. +// it will signify a public export, meaning anyone can import. func (a *Account) AddStreamExport(subject string, accounts []*Account) error { + return a.AddStreamExportWithAccountPos(subject, accounts, 0) +} + +// AddStreamExport will add an export to the account. If accounts is nil +// it will signify a public export, meaning anyone can import. +// if accountPos is > 0, all imports will be granted where the following holds: +// strings.Split(subject, ".")[accountPos] == account id will be granted. +func (a *Account) AddStreamExportWithAccountPos(subject string, accounts []*Account, accountPos uint) error { if a == nil { return ErrMissingAccount } @@ -2293,20 +2332,12 @@ func (a *Account) AddStreamExport(subject string, accounts []*Account) error { a.exports.streams = make(map[string]*streamExport) } ea := a.exports.streams[subject] - if accounts != nil { + if accounts != nil || accountPos > 0 { if ea == nil { ea = &streamExport{} } - // empty means auth required but will be import token. - if len(accounts) == 0 { - ea.tokenReq = true - } else { - if ea.approved == nil { - ea.approved = make(map[string]*Account, len(accounts)) - } - for _, acc := range accounts { - ea.approved[acc.Name] = acc - } + if err := setExportAuth(&ea.exportAuth, subject, accounts, accountPos); err != nil { + return err } } a.exports.streams[subject] = ea @@ -2329,11 +2360,17 @@ func (a *Account) checkStreamImportAuthorizedNoLock(account *Account, subject st return a.checkStreamExportApproved(account, subject, imClaim) } -func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Import) bool { +func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Import, tokens []string) bool { // if ea is nil or ea.approved is nil, that denotes a public export - if ea == nil || (ea.approved == nil && !ea.tokenReq) { + if ea == nil || (ea.approved == nil && !ea.tokenReq && ea.accountPos == 0) { return true } + // Check if the export is protected and enforces presence of importing account identity + if ea.accountPos > 0 && ea.accountPos <= uint(len(tokens)) { + if tokens[ea.accountPos-1] == account.Name { + return true + } + } // Check if token required if ea.tokenReq { return a.checkActivation(account, imClaim, true) @@ -2347,10 +2384,11 @@ func (a *Account) checkStreamExportApproved(account *Account, subject string, im // Check direct match of subject first ea, ok := a.exports.streams[subject] if ok { + // if ea is nil or eq.approved is nil, that denotes a public export if ea == nil { return true } - return a.checkAuth(&ea.exportAuth, account, imClaim) + return a.checkAuth(&ea.exportAuth, account, imClaim, nil) } // ok if we are here we did not match directly so we need to test each one. // The import subject arg has to take precedence, meaning the export @@ -2362,7 +2400,7 @@ func (a *Account) checkStreamExportApproved(account *Account, subject string, im if ea == nil { return true } - return a.checkAuth(&ea.exportAuth, account, imClaim) + return a.checkAuth(&ea.exportAuth, account, imClaim, tokens) } } return false @@ -2370,36 +2408,25 @@ func (a *Account) checkStreamExportApproved(account *Account, subject string, im func (a *Account) checkServiceExportApproved(account *Account, subject string, imClaim *jwt.Import) bool { // Check direct match of subject first - se, ok := a.exports.services[subject] + ea, ok := a.exports.services[subject] if ok { - // if se is nil or eq.approved is nil, that denotes a public export - if se == nil || (se.approved == nil && !se.tokenReq) { + // if ea is nil or eq.approved is nil, that denotes a public export + if ea == nil { return true } - // Check if token required - if se.tokenReq { - return a.checkActivation(account, imClaim, true) - } - // If we have a matching account we are authorized - _, ok := se.approved[account.Name] - return ok + return a.checkAuth(&ea.exportAuth, account, imClaim, nil) } // ok if we are here we did not match directly so we need to test each one. // The import subject arg has to take precedence, meaning the export // has to be a true subset of the import claim. We already checked for // exact matches above. tokens := strings.Split(subject, tsep) - for subj, se := range a.exports.services { + for subj, ea := range a.exports.services { if isSubsetMatch(tokens, subj) { - if se == nil || (se.approved == nil && !se.tokenReq) { + if ea == nil { return true } - // Check if token required - if se.tokenReq { - return a.checkActivation(account, imClaim, true) - } - _, ok := se.approved[account.Name] - return ok + return a.checkAuth(&ea.exportAuth, account, imClaim, tokens) } } return false @@ -2868,7 +2895,8 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim switch e.Type { case jwt.Stream: s.Debugf("Adding stream export %q for %s", e.Subject, a.Name) - if err := a.AddStreamExport(string(e.Subject), authAccounts(e.TokenReq)); err != nil { + if err := a.AddStreamExportWithAccountPos( + string(e.Subject), authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil { s.Debugf("Error adding stream export to account [%s]: %v", a.Name, err.Error()) } case jwt.Service: @@ -2880,7 +2908,8 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim case jwt.ResponseTypeChunked: rt = Chunked } - if err := a.AddServiceExportWithResponse(string(e.Subject), rt, authAccounts(e.TokenReq)); err != nil { + if err := a.AddServiceExportWithResponseAndAccountPos( + string(e.Subject), rt, authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil { s.Debugf("Error adding service export to account [%s]: %v", a.Name, err) continue } diff --git a/server/accounts_test.go b/server/accounts_test.go index 74c6dc94b4..6cbbe73876 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -594,6 +594,31 @@ func TestImportAuthorized(t *testing.T) { checkBool(foo.checkStreamImportAuthorized(bar, "*.*", nil), true, t) checkBool(foo.checkStreamImportAuthorized(bar, "*.>", nil), true, t) + _, foo, bar = simpleAccountServer(t) + foo.AddStreamExportWithAccountPos("foo.*", []*Account{}, 2) + foo.AddStreamExportWithAccountPos("bar.*.foo", []*Account{}, 2) + if err := foo.AddStreamExportWithAccountPos("baz.*.>", []*Account{}, 3); err == nil { + t.Fatal("expected error") + } + checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("foo.%s", bar.Name), nil), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("bar.%s.foo", bar.Name), nil), true, t) + checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("baz.foo.%s", bar.Name), nil), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "foo.X", nil), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "bar.X.foo", nil), false, t) + checkBool(foo.checkStreamImportAuthorized(bar, "baz.foo.X", nil), false, t) + + foo.AddServiceExportWithAccountPos("a.*", []*Account{}, 2) + foo.AddServiceExportWithAccountPos("b.*.a", []*Account{}, 2) + if err := foo.AddServiceExportWithAccountPos("c.*.>", []*Account{}, 3); err == nil { + t.Fatal("expected error") + } + checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("a.%s", bar.Name), nil), true, t) + checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("b.%s.a", bar.Name), nil), true, t) + checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("c.a.%s", bar.Name), nil), false, t) + checkBool(foo.checkServiceImportAuthorized(bar, "a.X", nil), false, t) + checkBool(foo.checkServiceImportAuthorized(bar, "b.X.a", nil), false, t) + checkBool(foo.checkServiceImportAuthorized(bar, "c.a.X", nil), false, t) + // Reset and test pwc and fwc s, foo, bar := simpleAccountServer(t) foo.AddStreamExport("foo.*.baz.>", []*Account{bar}) diff --git a/server/jwt_test.go b/server/jwt_test.go index 1816de3aa0..87c8cc81aa 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -5249,3 +5249,183 @@ func TestJWTStrictSigningKeys(t *testing.T) { connectTest(s.ClientURL()) }) } + +func TestJWTAccountProtectedImport(t *testing.T) { + srvFmt := ` + port: -1 + operator = %s + resolver: MEMORY + resolver_preload = { + %s : "%s" + %s : "%s" + } ` + setupAccounts := func(pass bool) (nkeys.KeyPair, string, string, string, nkeys.KeyPair, string, string, string, string) { + // Create accounts and imports/exports. + exportKP, _ := nkeys.CreateAccount() + exportPub, _ := exportKP.PublicKey() + exportAC := jwt.NewAccountClaims(exportPub) + exportAC.Exports.Add(&jwt.Export{Subject: "service.*", Type: jwt.Service, AccountTokenPosition: 2}) + exportAC.Exports.Add(&jwt.Export{Subject: "stream.*", Type: jwt.Stream, AccountTokenPosition: 2}) + exportJWT, err := exportAC.Encode(oKp) + require_NoError(t, err) + // create alternative exporter jwt without account token pos set + exportAC.Exports = jwt.Exports{} + exportAC.Exports.Add(&jwt.Export{Subject: "service.*", Type: jwt.Service}) + exportAC.Exports.Add(&jwt.Export{Subject: "stream.*", Type: jwt.Stream}) + exportJWTNoPos, err := exportAC.Encode(oKp) + require_NoError(t, err) + + importKP, _ := nkeys.CreateAccount() + importPub, _ := importKP.PublicKey() + importAc := jwt.NewAccountClaims(importPub) + srvcSub, strmSub := "service.foo", "stream.foo" + if pass { + srvcSub = fmt.Sprintf("service.%s", importPub) + strmSub = fmt.Sprintf("stream.%s", importPub) + } + importAc.Imports.Add(&jwt.Import{Account: exportPub, Subject: jwt.Subject(srvcSub), Type: jwt.Service}) + importAc.Imports.Add(&jwt.Import{Account: exportPub, Subject: jwt.Subject(strmSub), Type: jwt.Stream}) + importJWT, err := importAc.Encode(oKp) + require_NoError(t, err) + + return exportKP, exportPub, exportJWT, exportJWTNoPos, importKP, importPub, importJWT, srvcSub, strmSub + } + t.Run("pass", func(t *testing.T) { + exportKp, exportPub, exportJWT, _, importKp, importPub, importJWT, srvcSub, strmSub := setupAccounts(true) + cf := createConfFile(t, []byte(fmt.Sprintf(srvFmt, ojwt, exportPub, exportJWT, importPub, importJWT))) + defer os.Remove(cf) + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, exportKp)) + ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, importKp)) + t.Run("service", func(t *testing.T) { + sub, err := ncExp.Subscribe("service.*", func(msg *nats.Msg) { + msg.Respond([]byte("world")) + }) + defer sub.Unsubscribe() + require_NoError(t, err) + ncExp.Flush() + msg, err := ncImp.Request(srvcSub, []byte("hello"), time.Second) + require_NoError(t, err) + require_Equal(t, string(msg.Data), "world") + }) + t.Run("stream", func(t *testing.T) { + msgChan := make(chan *nats.Msg, 4) + defer close(msgChan) + sub, err := ncImp.ChanSubscribe(strmSub, msgChan) + defer sub.Unsubscribe() + require_NoError(t, err) + ncImp.Flush() + err = ncExp.Publish("stream.foo", []byte("hello")) + require_NoError(t, err) + err = ncExp.Publish(strmSub, []byte("hello")) + require_NoError(t, err) + msg := <-msgChan + require_Equal(t, string(msg.Data), "hello") + require_True(t, len(msgChan) == 0) + }) + }) + t.Run("fail", func(t *testing.T) { + exportKp, exportPub, exportJWT, _, importKp, importPub, importJWT, srvcSub, strmSub := setupAccounts(false) + cf := createConfFile(t, []byte(fmt.Sprintf(srvFmt, ojwt, exportPub, exportJWT, importPub, importJWT))) + defer os.Remove(cf) + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, exportKp)) + ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, importKp)) + t.Run("service", func(t *testing.T) { + sub, err := ncExp.Subscribe("service.*", func(msg *nats.Msg) { + msg.Respond([]byte("world")) + }) + defer sub.Unsubscribe() + require_NoError(t, err) + ncExp.Flush() + _, err = ncImp.Request(srvcSub, []byte("hello"), time.Second) + require_Error(t, err) + require_Contains(t, err.Error(), "no responders available for request") + }) + t.Run("stream", func(t *testing.T) { + msgChan := make(chan *nats.Msg, 4) + defer close(msgChan) + _, err := ncImp.ChanSubscribe(strmSub, msgChan) + require_NoError(t, err) + ncImp.Flush() + err = ncExp.Publish("stream.foo", []byte("hello")) + require_NoError(t, err) + err = ncExp.Publish(strmSub, []byte("hello")) + require_NoError(t, err) + select { + case <-msgChan: + t.Fatal("did not expect a message") + case <-time.After(250 * time.Millisecond): + } + require_True(t, len(msgChan) == 0) + }) + }) + t.Run("reload-off-2-on", func(t *testing.T) { + exportKp, exportPub, exportJWTOn, exportJWTOff, importKp, _, importJWT, srvcSub, strmSub := setupAccounts(false) + dirSrv := createDir(t, "srv") + defer os.RemoveAll(dirSrv) + // set up system account. Relying bootstrapping system account to not create JWT + sysAcc, err := nkeys.CreateAccount() + require_NoError(t, err) + sysPub, err := sysAcc.PublicKey() + require_NoError(t, err) + sysUsrCreds := newUserEx(t, sysAcc, false, sysPub) + defer os.Remove(sysUsrCreds) + cf := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + operator = %s + system_account = %s + resolver: { + type: full + dir: %s + }`, ojwt, sysPub, dirSrv))) + defer os.Remove(cf) + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + updateJwt(t, s.ClientURL(), sysUsrCreds, importJWT, 1) + updateJwt(t, s.ClientURL(), sysUsrCreds, exportJWTOff, 1) + ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, exportKp)) + ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, importKp)) + msgChan := make(chan *nats.Msg, 4) + defer close(msgChan) + // ensure service passes + subSrvc, err := ncExp.Subscribe("service.*", func(msg *nats.Msg) { + msg.Respond([]byte("world")) + }) + defer subSrvc.Unsubscribe() + require_NoError(t, err) + ncExp.Flush() + respMst, err := ncImp.Request(srvcSub, []byte("hello"), time.Second) + require_NoError(t, err) + require_Equal(t, string(respMst.Data), "world") + // ensure stream passes + subStrm, err := ncImp.ChanSubscribe(strmSub, msgChan) + defer subStrm.Unsubscribe() + require_NoError(t, err) + ncImp.Flush() + err = ncExp.Publish(strmSub, []byte("hello")) + require_NoError(t, err) + msg := <-msgChan + require_Equal(t, string(msg.Data), "hello") + require_True(t, len(msgChan) == 0) + + updateJwt(t, s.ClientURL(), sysUsrCreds, exportJWTOn, 1) + + // ensure service fails + _, err = ncImp.Request(srvcSub, []byte("hello"), time.Second) + require_Error(t, err) + require_Contains(t, err.Error(), "timeout") + s.AccountResolver().Store(exportPub, exportJWTOn) + // ensure stream fails + err = ncExp.Publish(strmSub, []byte("hello")) + require_NoError(t, err) + select { + case <-msgChan: + t.Fatal("did not expect a message") + case <-time.After(250 * time.Millisecond): + } + require_True(t, len(msgChan) == 0) + }) +} diff --git a/server/opts.go b/server/opts.go index dc047e816c..253be4874b 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1941,6 +1941,7 @@ type export struct { rt ServiceRespType lat *serviceLatency rthr time.Duration + tPos uint } type importStream struct { @@ -2278,7 +2279,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er } accounts = append(accounts, ta) } - if err := stream.acc.AddStreamExport(stream.sub, accounts); err != nil { + if err := stream.acc.AddStreamExportWithAccountPos(stream.sub, accounts, stream.tPos); err != nil { msg := fmt.Sprintf("Error adding stream export %q: %v", stream.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue @@ -2296,7 +2297,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er } accounts = append(accounts, ta) } - if err := service.acc.AddServiceExportWithResponse(service.sub, service.rt, accounts); err != nil { + if err := service.acc.AddServiceExportWithResponseAndAccountPos(service.sub, service.rt, accounts, service.tPos); err != nil { msg := fmt.Sprintf("Error adding service export %q: %v", service.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue @@ -2497,6 +2498,7 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo thresh time.Duration latToken token lt token + accTokPos uint ) defer convertPanicToErrorList(<, errors) @@ -2645,6 +2647,8 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo if curService != nil { curService.lat = lat } + case "account_token_position": + accTokPos = uint(mv.(int64)) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2657,6 +2661,12 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo } } } + if curStream != nil { + curStream.tPos = accTokPos + } + if curService != nil { + curService.tPos = accTokPos + } return curStream, curService, nil } diff --git a/server/opts_test.go b/server/opts_test.go index c1f70c5faf..36904354f3 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -24,6 +24,7 @@ import ( "reflect" "runtime" "strings" + "sync" "testing" "time" @@ -1815,6 +1816,123 @@ func TestParseServiceLatency(t *testing.T) { } } +func TestParseExport(t *testing.T) { + conf := ` + system_account: sys + accounts { + sys { + exports [{ + stream "$SYS.SERVER.ACCOUNT.*.CONNS" + account_token_position 4 + }] + } + accE { + exports [{ + service foo.* + account_token_position 2 + }] + users [{ + user ue + password pwd + }], + } + accI1 { + imports [{ + service { + account accE + subject foo.accI1 + } + to foo + },{ + stream { + account sys + subject "$SYS.SERVER.ACCOUNT.accI1.CONNS" + } + }], + users [{ + user u1 + password pwd + }], + } + accI2 { + imports [{ + service { + account accE + subject foo.accI2 + } + to foo + },{ + stream { + account sys + subject "$SYS.SERVER.ACCOUNT.accI2.CONNS" + } + }], + users [{ + user u2 + password pwd + }], + } + }` + f := createConfFile(t, []byte(conf)) + s, o := RunServerWithConfig(f) + if s == nil { + t.Fatal("Failed startup") + } + defer s.Shutdown() + defer os.Remove(f) + connect := func(user string) *nats.Conn { + nc, err := nats.Connect(fmt.Sprintf("nats://%s:pwd@%s:%d", user, o.Host, o.Port)) + require_NoError(t, err) + return nc + } + nc1 := connect("u1") + defer nc1.Close() + nc2 := connect("u2") + defer nc1.Close() + subscribe := func(nc *nats.Conn, msgs int, subj string) (*sync.WaitGroup, *nats.Subscription) { + wg := sync.WaitGroup{} + wg.Add(msgs) + sub, err := nc.Subscribe(subj, func(msg *nats.Msg) { + if msg.Reply != _EMPTY_ { + msg.Respond(msg.Data) + } + wg.Done() + }) + require_NoError(t, err) + nc.Flush() + return &wg, sub + } + //Subscribe to CONNS events + wg1, s1 := subscribe(nc1, 2, "$SYS.SERVER.ACCOUNT.accI1.CONNS") + defer s1.Unsubscribe() + wg2, s2 := subscribe(nc2, 2, "$SYS.SERVER.ACCOUNT.accI2.CONNS") + defer s2.Unsubscribe() + // Trigger 2 CONNS event + nc3 := connect("u1") + nc3.Close() + nc4 := connect("u2") + nc4.Close() + // test service + ncE := connect("ue") + defer ncE.Close() + wge, se := subscribe(ncE, 2, "foo.*") + defer se.Unsubscribe() + request := func(nc *nats.Conn, msg string) { + if m, err := nc.Request("foo", []byte(msg), time.Second); err != nil { + t.Fatal("Failed request ", msg, err) + } else if m == nil { + t.Fatal("No response msg") + } else if string(m.Data) != msg { + t.Fatal("Wrong response msg", string(m.Data)) + } + } + request(nc1, "1") + request(nc2, "1") + for _, wg := range []*sync.WaitGroup{wge, wg1, wg2} { + wg.Wait() + } +} + func TestAccountUsersLoadedProperly(t *testing.T) { conf := createConfFile(t, []byte(` listen: "127.0.0.1:-1" From 8fc62fc2f333f9bc2099f2f9356cf504b8e17d03 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 1 Feb 2021 19:32:02 -0500 Subject: [PATCH 2/2] Incorporating review comments Signed-off-by: Matthias Hanel --- server/accounts.go | 82 +++++++++++++++++++++-------------------- server/accounts_test.go | 12 +++--- server/opts.go | 4 +- 3 files changed, 50 insertions(+), 48 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 6413470919..6aa7a4e959 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -880,45 +880,46 @@ func (a *Account) removeClient(c *client) int { func setExportAuth(ea *exportAuth, subject string, accounts []*Account, accountPos uint) error { if accountPos > 0 { - if token := strings.Split(subject, "."); len(token) < int(accountPos) { - return ErrInvalidSubject - } else if token[accountPos-1] != "*" { + token := strings.Split(subject, ".") + if len(token) < int(accountPos) || token[accountPos-1] != "*" { return ErrInvalidSubject } } ea.accountPos = accountPos // empty means auth required but will be import token. if accounts == nil { - } else if len(accounts) == 0 { + return nil + } + if len(accounts) == 0 { ea.tokenReq = true - } else { - if ea.approved == nil { - ea.approved = make(map[string]*Account, len(accounts)) - } - for _, acc := range accounts { - ea.approved[acc.Name] = acc - } + return nil + } + if ea.approved == nil { + ea.approved = make(map[string]*Account, len(accounts)) + } + for _, acc := range accounts { + ea.approved[acc.Name] = acc } return nil } // AddServiceExport will configure the account with the defined export. func (a *Account) AddServiceExport(subject string, accounts []*Account) error { - return a.AddServiceExportWithResponseAndAccountPos(subject, Singleton, accounts, 0) + return a.addServiceExportWithResponseAndAccountPos(subject, Singleton, accounts, 0) } // AddServiceExport will configure the account with the defined export. -func (a *Account) AddServiceExportWithAccountPos(subject string, accounts []*Account, accountPos uint) error { - return a.AddServiceExportWithResponseAndAccountPos(subject, Singleton, accounts, accountPos) +func (a *Account) addServiceExportWithAccountPos(subject string, accounts []*Account, accountPos uint) error { + return a.addServiceExportWithResponseAndAccountPos(subject, Singleton, accounts, accountPos) } // AddServiceExportWithResponse will configure the account with the defined export and response type. func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceRespType, accounts []*Account) error { - return a.AddServiceExportWithResponseAndAccountPos(subject, respType, accounts, 0) + return a.addServiceExportWithResponseAndAccountPos(subject, respType, accounts, 0) } // AddServiceExportWithresponse will configure the account with the defined export and response type. -func (a *Account) AddServiceExportWithResponseAndAccountPos( +func (a *Account) addServiceExportWithResponseAndAccountPos( subject string, respType ServiceRespType, accounts []*Account, accountPos uint) error { if a == nil { return ErrMissingAccount @@ -931,28 +932,28 @@ func (a *Account) AddServiceExportWithResponseAndAccountPos( a.exports.services = make(map[string]*serviceExport) } - ea := a.exports.services[subject] + se := a.exports.services[subject] // Always create a service export - if ea == nil { - ea = &serviceExport{} + if se == nil { + se = &serviceExport{} } if respType != Singleton { - ea.respType = respType + se.respType = respType } if accounts != nil || accountPos > 0 { - if ea == nil { - ea = &serviceExport{} + if se == nil { + se = &serviceExport{} } - if err := setExportAuth(&ea.exportAuth, subject, accounts, accountPos); err != nil { + if err := setExportAuth(&se.exportAuth, subject, accounts, accountPos); err != nil { return err } } lrt := a.lowestServiceExportResponseTime() - ea.acc = a - ea.respThresh = DEFAULT_SERVICE_EXPORT_RESPONSE_THRESHOLD - a.exports.services[subject] = ea + se.acc = a + se.respThresh = DEFAULT_SERVICE_EXPORT_RESPONSE_THRESHOLD + a.exports.services[subject] = se if nlrt := a.lowestServiceExportResponseTime(); nlrt != lrt { a.updateAllClientsServiceExportResponseTime(nlrt) } @@ -2313,14 +2314,14 @@ var IsPublicExport = []*Account(nil) // AddStreamExport will add an export to the account. If accounts is nil // it will signify a public export, meaning anyone can import. func (a *Account) AddStreamExport(subject string, accounts []*Account) error { - return a.AddStreamExportWithAccountPos(subject, accounts, 0) + return a.addStreamExportWithAccountPos(subject, accounts, 0) } // AddStreamExport will add an export to the account. If accounts is nil // it will signify a public export, meaning anyone can import. // if accountPos is > 0, all imports will be granted where the following holds: // strings.Split(subject, ".")[accountPos] == account id will be granted. -func (a *Account) AddStreamExportWithAccountPos(subject string, accounts []*Account, accountPos uint) error { +func (a *Account) addStreamExportWithAccountPos(subject string, accounts []*Account, accountPos uint) error { if a == nil { return ErrMissingAccount } @@ -2366,15 +2367,16 @@ func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Impor return true } // Check if the export is protected and enforces presence of importing account identity - if ea.accountPos > 0 && ea.accountPos <= uint(len(tokens)) { - if tokens[ea.accountPos-1] == account.Name { - return true - } + if ea.accountPos > 0 { + return ea.accountPos <= uint(len(tokens)) && tokens[ea.accountPos-1] == account.Name } // Check if token required if ea.tokenReq { return a.checkActivation(account, imClaim, true) } + if ea.approved == nil { + return false + } // If we have a matching account we are authorized _, ok := ea.approved[account.Name] return ok @@ -2408,25 +2410,25 @@ func (a *Account) checkStreamExportApproved(account *Account, subject string, im func (a *Account) checkServiceExportApproved(account *Account, subject string, imClaim *jwt.Import) bool { // Check direct match of subject first - ea, ok := a.exports.services[subject] + se, ok := a.exports.services[subject] if ok { // if ea is nil or eq.approved is nil, that denotes a public export - if ea == nil { + if se == nil { return true } - return a.checkAuth(&ea.exportAuth, account, imClaim, nil) + return a.checkAuth(&se.exportAuth, account, imClaim, nil) } // ok if we are here we did not match directly so we need to test each one. // The import subject arg has to take precedence, meaning the export // has to be a true subset of the import claim. We already checked for // exact matches above. tokens := strings.Split(subject, tsep) - for subj, ea := range a.exports.services { + for subj, se := range a.exports.services { if isSubsetMatch(tokens, subj) { - if ea == nil { + if se == nil { return true } - return a.checkAuth(&ea.exportAuth, account, imClaim, tokens) + return a.checkAuth(&se.exportAuth, account, imClaim, tokens) } } return false @@ -2895,7 +2897,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim switch e.Type { case jwt.Stream: s.Debugf("Adding stream export %q for %s", e.Subject, a.Name) - if err := a.AddStreamExportWithAccountPos( + if err := a.addStreamExportWithAccountPos( string(e.Subject), authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil { s.Debugf("Error adding stream export to account [%s]: %v", a.Name, err.Error()) } @@ -2908,7 +2910,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim case jwt.ResponseTypeChunked: rt = Chunked } - if err := a.AddServiceExportWithResponseAndAccountPos( + if err := a.addServiceExportWithResponseAndAccountPos( string(e.Subject), rt, authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil { s.Debugf("Error adding service export to account [%s]: %v", a.Name, err) continue diff --git a/server/accounts_test.go b/server/accounts_test.go index 6cbbe73876..afda8a8307 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -595,9 +595,9 @@ func TestImportAuthorized(t *testing.T) { checkBool(foo.checkStreamImportAuthorized(bar, "*.>", nil), true, t) _, foo, bar = simpleAccountServer(t) - foo.AddStreamExportWithAccountPos("foo.*", []*Account{}, 2) - foo.AddStreamExportWithAccountPos("bar.*.foo", []*Account{}, 2) - if err := foo.AddStreamExportWithAccountPos("baz.*.>", []*Account{}, 3); err == nil { + foo.addStreamExportWithAccountPos("foo.*", []*Account{}, 2) + foo.addStreamExportWithAccountPos("bar.*.foo", []*Account{}, 2) + if err := foo.addStreamExportWithAccountPos("baz.*.>", []*Account{}, 3); err == nil { t.Fatal("expected error") } checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("foo.%s", bar.Name), nil), true, t) @@ -607,9 +607,9 @@ func TestImportAuthorized(t *testing.T) { checkBool(foo.checkStreamImportAuthorized(bar, "bar.X.foo", nil), false, t) checkBool(foo.checkStreamImportAuthorized(bar, "baz.foo.X", nil), false, t) - foo.AddServiceExportWithAccountPos("a.*", []*Account{}, 2) - foo.AddServiceExportWithAccountPos("b.*.a", []*Account{}, 2) - if err := foo.AddServiceExportWithAccountPos("c.*.>", []*Account{}, 3); err == nil { + foo.addServiceExportWithAccountPos("a.*", []*Account{}, 2) + foo.addServiceExportWithAccountPos("b.*.a", []*Account{}, 2) + if err := foo.addServiceExportWithAccountPos("c.*.>", []*Account{}, 3); err == nil { t.Fatal("expected error") } checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("a.%s", bar.Name), nil), true, t) diff --git a/server/opts.go b/server/opts.go index 253be4874b..0f288ca563 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2279,7 +2279,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er } accounts = append(accounts, ta) } - if err := stream.acc.AddStreamExportWithAccountPos(stream.sub, accounts, stream.tPos); err != nil { + if err := stream.acc.addStreamExportWithAccountPos(stream.sub, accounts, stream.tPos); err != nil { msg := fmt.Sprintf("Error adding stream export %q: %v", stream.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue @@ -2297,7 +2297,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er } accounts = append(accounts, ta) } - if err := service.acc.AddServiceExportWithResponseAndAccountPos(service.sub, service.rt, accounts, service.tPos); err != nil { + if err := service.acc.addServiceExportWithResponseAndAccountPos(service.sub, service.rt, accounts, service.tPos); err != nil { msg := fmt.Sprintf("Error adding service export %q: %v", service.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue