Skip to content

Commit

Permalink
Merge 51c57a4 into 59f923b
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Nov 27, 2018
2 parents 59f923b + 51c57a4 commit 7764c18
Show file tree
Hide file tree
Showing 3 changed files with 478 additions and 454 deletions.
99 changes: 82 additions & 17 deletions server/accounts.go
Expand Up @@ -78,12 +78,13 @@ type streamImport struct {

// Import service mapping struct
type serviceImport struct {
acc *Account
from string
to string
ae bool
ts int64
claim *jwt.Import
acc *Account
from string
to string
ae bool
ts int64
claim *jwt.Import
invalid bool
}

// exportAuth holds configured approvals or boolean indicating an
Expand Down Expand Up @@ -296,7 +297,7 @@ func (a *Account) addImplicitServiceImport(destination *Account, from, to string
if a.imports.services == nil {
a.imports.services = make(map[string]*serviceImport)
}
si := &serviceImport{destination, from, to, autoexpire, 0, claim}
si := &serviceImport{destination, from, to, autoexpire, 0, claim, false}
a.imports.services[from] = si
if autoexpire {
a.nae++
Expand Down Expand Up @@ -481,15 +482,13 @@ func fetchActivation(url string) string {
return string(body)
}

// Fires for expired activation tokens. We could track this with timers etc.
// Instead we just re-analyze where we are and if we need to act.
func (a *Account) activationExpired(subject string) {
// These are import stream specific versions for when an activation expires.
func (a *Account) streamActivationExpired(subject string) {
a.mu.RLock()
if a.expired || a.imports.streams == nil {
a.mu.RUnlock()
return
}
// FIXME(dlc) - check services too?
si := a.imports.streams[subject]
if si == nil || si.invalid {
a.mu.RUnlock()
Expand All @@ -515,6 +514,41 @@ func (a *Account) activationExpired(subject string) {
}
}

// These are import service specific versions for when an activation expires.
func (a *Account) serviceActivationExpired(subject string) {
a.mu.RLock()
if a.expired || a.imports.services == nil {
a.mu.RUnlock()
return
}
si := a.imports.services[subject]
if si == nil || si.invalid {
a.mu.RUnlock()
return
}
a.mu.RUnlock()

if si.acc.checkActivation(a, si.claim, false) {
// The token has been updated most likely and we are good to go.
return
}

a.mu.Lock()
si.invalid = true
a.mu.Unlock()
}

// Fires for expired activation tokens. We could track this with timers etc.
// Instead we just re-analyze where we are and if we need to act.
func (a *Account) activationExpired(subject string, kind jwt.ExportType) {
switch kind {
case jwt.Stream:
a.streamActivationExpired(subject)
case jwt.Service:
a.serviceActivationExpired(subject)
}
}

// checkActivation will check the activation token for validity.
func (a *Account) checkActivation(acc *Account, claim *jwt.Import, expTimer bool) bool {
if claim == nil || claim.Token == "" {
Expand Down Expand Up @@ -549,7 +583,7 @@ func (a *Account) checkActivation(acc *Account, claim *jwt.Import, expTimer bool
if expTimer {
expiresAt := time.Duration(act.Expires - tn)
time.AfterFunc(expiresAt*time.Second, func() {
acc.activationExpired(string(act.ImportSubject))
acc.activationExpired(string(act.ImportSubject), claim.Type)
})
}
}
Expand Down Expand Up @@ -594,12 +628,31 @@ func (a *Account) checkStreamExportsEqual(b *Account) bool {
return true
}

// Check if another account is authorized to route requests to this service.
func (a *Account) checkServiceExportsEqual(b *Account) bool {
if len(a.exports.services) != len(b.exports.services) {
return false
}
for subj, aea := range a.exports.services {
bea, ok := b.exports.services[subj]
if !ok {
return false
}
if !reflect.DeepEqual(aea, bea) {
return false
}
}
return true
}

func (a *Account) checkServiceImportAuthorized(account *Account, subject string, imClaim *jwt.Import) bool {
// Find the subject in the services list.
a.mu.RLock()
defer a.mu.RUnlock()
return a.checkServiceImportAuthorizedNoLock(account, subject, imClaim)
}

// Check if another account is authorized to route requests to this service.
func (a *Account) checkServiceImportAuthorizedNoLock(account *Account, subject string, imClaim *jwt.Import) bool {
// Find the subject in the services list.
if a.exports.services == nil || !IsValidLiteralSubject(subject) {
return false
}
Expand All @@ -610,7 +663,7 @@ func (a *Account) checkServiceImportAuthorized(account *Account, subject string,
}

if ae != nil && ae.tokenReq {
return a.checkActivation(account, imClaim, false)
return a.checkActivation(account, imClaim, true)
}

// Check to see if we are public or if we need to search for the account.
Expand Down Expand Up @@ -755,7 +808,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
c.processSubsOnConfigReload(awcsti)
}
}
// Now check if exports have changed.
// Now check if stream exports have changed.
if !a.checkStreamExportsEqual(old) {
clients := make([]*client, 0, 16)
// We need to check all accounts that have an import claim from this account.
Expand All @@ -770,7 +823,6 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
for _, c := range acc.clients {
clients = append(clients, c)
}
break
}
}
acc.mu.Unlock()
Expand All @@ -780,6 +832,19 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
c.processSubsOnConfigReload(awcsti)
}
}
// Now check if service exports have changed.
if !a.checkServiceExportsEqual(old) {
for _, acc := range s.accounts {
acc.mu.Lock()
for _, im := range acc.imports.services {
if im != nil && im.acc.Name == a.Name {
// Check for if we are still authorized for an import.
im.invalid = !a.checkServiceImportAuthorizedNoLock(im.acc, im.from, im.claim)
}
}
acc.mu.Unlock()
}
}

// Now do limits if they are present.
a.msubs = int(ac.Limits.Subs)
Expand Down
4 changes: 3 additions & 1 deletion server/client.go
Expand Up @@ -2050,9 +2050,11 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
}
acc.mu.RLock()
rm := acc.imports.services[string(c.pa.subject)]
invalid := rm != nil && rm.invalid
acc.mu.RUnlock()
// Get the results from the other account for the mapped "to" subject.
if rm != nil && rm.acc != nil && rm.acc.sl != nil {
// If we have been marked invalid simply return here.
if rm != nil && !invalid && rm.acc != nil && rm.acc.sl != nil {
var nrr []byte
if rm.ae {
acc.removeServiceImport(rm.from)
Expand Down

0 comments on commit 7764c18

Please sign in to comment.