Skip to content

Commit

Permalink
Streams and Services
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 22, 2019
1 parent 99c5c49 commit a7472fc
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 11 deletions.
5 changes: 5 additions & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,11 @@ func (s *Server) AccountResolver() AccountResolver {
return s.accResolver
}

// UpdateAccountClaims will call updateAccountClaims.
func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
s.updateAccountClaims(a, ac)
}

// updateAccountClaims will update and existing account with new claims.
// This will replace any exports or imports previously defined.
func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
Expand Down
5 changes: 5 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2317,6 +2317,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
if (c.kind == ROUTER || c.kind == GATEWAY) && c.pa.queues == nil && len(rr.qsubs) > 0 {
c.makeQFilter(rr.qsubs)
}

sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM)
queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs)
// If this is not a gateway connection but gateway is enabled,
Expand Down Expand Up @@ -2540,6 +2541,10 @@ sendToRoutesOrLeafs:
} else {
// Leaf nodes are LMSG
mh[0] = 'L'
// Remap subject if its a shadow subscription, treat like a normal client.
if rt.sub.im != nil && rt.sub.im.prefix != "" {
mh = append(mh, rt.sub.im.prefix...)
}
}
mh = append(mh, subject...)
mh = append(mh, ' ')
Expand Down
11 changes: 11 additions & 0 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,14 @@ func (c *client) initLeafNodeSmap() {
// Collect all subs here.
_subs := [256]*subscription{}
subs := _subs[:0]
ims := []string{}
acc.mu.RLock()
acc.sl.All(&subs)
// Since leaf nodes only send on interest, if the bound
// account has import services we need to send those over.
for isubj := range acc.imports.services {
ims = append(ims, isubj)
}
acc.mu.RUnlock()

// Now walk the results and add them to our smap
Expand All @@ -604,6 +610,10 @@ func (c *client) initLeafNodeSmap() {
c.leaf.smap[keyFromSub(sub)] += 1
}
}
// FIXME(dlc) - We need to update appropriately on an account claims update.
for _, isubj := range ims {
c.leaf.smap[isubj] += 1
}
c.mu.Unlock()
}

Expand Down Expand Up @@ -1009,6 +1019,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {

// Check to see if we have a routed message with a service reply.
if isServiceReply(c.pa.reply) && acc != nil {
// TODO(dlc) - Figure out what this means.
panic("Not Implemented")
}

Expand Down
133 changes: 122 additions & 11 deletions test/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,6 @@ func TestLeafNodeOperatorModel(t *testing.T) {
func TestLeafNodeMultipleAccounts(t *testing.T) {
// So we will create a main server with two accounts. The remote server, acting as a leaf node, will simply have
// the $G global account and no auth. Make sure things work properly here.

s, opts, conf := runLeafNodeOperatorServer(t)
defer os.Remove(conf)
defer s.Shutdown()
Expand All @@ -971,7 +970,8 @@ func TestLeafNodeMultipleAccounts(t *testing.T) {
t.Fatalf("Error generating user JWT: %v", err)
}

_, akp2 := createAccount(t, s)
// Create second account.
createAccount(t, s)

// Create the leaf node server using the first account.
seed, _ := kp1.Seed()
Expand All @@ -993,12 +993,6 @@ func TestLeafNodeMultipleAccounts(t *testing.T) {
}
defer nc1.Close()

nc2, err := nats.Connect(url, createUserCreds(t, s, akp2))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()

// This is a client connected to the leaf node with no auth,
// binding to account1 via leafnode connection.
// To connect to leafnode server.
Expand All @@ -1022,11 +1016,128 @@ func TestLeafNodeMultipleAccounts(t *testing.T) {
// Now send from nc1 with account 1, should be received by our leafnode subscriber.
nc1.Publish("foo.test", nil)

// Wait for the message to arrive
_, err = lsub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Error during wait for next message: %s", err)
}
}

func TestLeafNodeExportsImports(t *testing.T) {
// So we will create a main server with two accounts. The remote server, acting as a leaf node, will simply have
// the $G global account and no auth. Make sure things work properly here.
s, opts, conf := runLeafNodeOperatorServer(t)
defer os.Remove(conf)
defer s.Shutdown()

// Setup the two accounts for this server.
okp, _ := nkeys.FromSeed(oSeed)

// Create second account with exports
acc2, akp2 := createAccount(t, s)
akp2Pub, _ := akp2.PublicKey()
akp2AC := jwt.NewAccountClaims(akp2Pub)
streamExport := &jwt.Export{Subject: "foo.stream", Type: jwt.Stream}
serviceExport := &jwt.Export{Subject: "req.echo", Type: jwt.Service}
akp2AC.Exports.Add(streamExport, serviceExport)
akp2ACJWT, err := akp2AC.Encode(okp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}

if err := s.AccountResolver().Store(akp2Pub, akp2ACJWT); err != nil {
t.Fatalf("Account Resolver returned an error: %v", err)
}
s.UpdateAccountClaims(acc2, akp2AC)

// Now create the first account and add on the imports. This will be what is used in the leafnode.
acc1, akp1 := createAccount(t, s)
akp1Pub, _ := akp1.PublicKey()
akp1AC := jwt.NewAccountClaims(akp1Pub)
streamImport := &jwt.Import{Account: akp2Pub, Subject: "foo.stream", To: "import", Type: jwt.Stream}
serviceImport := &jwt.Import{Account: akp2Pub, Subject: "import.request", To: "req.echo", Type: jwt.Service}
akp1AC.Imports.Add(streamImport, serviceImport)
akp1ACJWT, err := akp1AC.Encode(okp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
if err := s.AccountResolver().Store(akp1Pub, akp1ACJWT); err != nil {
t.Fatalf("Account Resolver returned an error: %v", err)
}
s.UpdateAccountClaims(acc1, akp1AC)

// Create the user will we use to connect the leafnode.
kp1, _ := nkeys.CreateUser()
pub1, _ := kp1.PublicKey()
nuc1 := jwt.NewUserClaims(pub1)
ujwt1, err := nuc1.Encode(akp1)
if err != nil {
t.Fatalf("Error generating user JWT: %v", err)
}

// Create the leaf node server using the first account.
seed, _ := kp1.Seed()
mycreds := genCredsFile(t, ujwt1, seed)
defer os.Remove(mycreds)

sl, lopts, lnconf := runSolicitWithCredentials(t, opts, mycreds)
defer os.Remove(lnconf)
defer sl.Shutdown()

checkLeafNodeConnected(t, s)

// To connect to main server.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)

// Imported
nc1, err := nats.Connect(url, createUserCreds(t, s, akp1))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()

// Exported
nc2, err := nats.Connect(url, createUserCreds(t, s, akp2))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()

// Leaf node connection.
lurl := fmt.Sprintf("nats://%s:%d", lopts.Host, lopts.Port)
ncl, err := nats.Connect(lurl)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncl.Close()

// So everything should be setup here. So let's test streams first.
lsub, _ := ncl.SubscribeSync("import.foo.stream")

// Wait for the sub to propagate.
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if msgs, _ := lsub.QueuedMsgs(); msgs != 1 {
return fmt.Errorf("Number of msgs is %d", msgs)
if subs := s.NumSubscriptions(); subs < 1 {
return fmt.Errorf("Number of subs is %d", subs)
}
return nil
})

// Pub to other account with export on original subject.
nc2.Publish("foo.stream", nil)

_, err = lsub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Error during wait for next message: %s", err)
}

// Services
// Create listener on nc1
nc2.Subscribe("req.echo", func(msg *nats.Msg) {
nc2.Publish(msg.Reply, []byte("WORKED"))
})
nc2.Flush()

// Now send the request on the leaf node client.
if _, err := ncl.Request("import.request", []byte("fingers crossed"), 500*time.Millisecond); err != nil {
t.Fatalf("Did not receive response: %v", err)
}
}

0 comments on commit a7472fc

Please sign in to comment.