Skip to content

Commit

Permalink
Merge 95a5f79 into 4eaea13
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Dec 7, 2018
2 parents 4eaea13 + 95a5f79 commit 954b630
Showing 1 changed file with 186 additions and 0 deletions.
186 changes: 186 additions & 0 deletions server/gateway_test.go
Expand Up @@ -165,6 +165,15 @@ func natsQueueSub(t *testing.T, nc *nats.Conn, subj, queue string, cb nats.MsgHa
return sub
}

func natsQueueSubSync(t *testing.T, nc *nats.Conn, subj, queue string) *nats.Subscription {
t.Helper()
sub, err := nc.QueueSubscribeSync(subj, queue)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
return sub
}

func natsFlush(t *testing.T, nc *nats.Conn) {
t.Helper()
if err := nc.Flush(); err != nil {
Expand Down Expand Up @@ -3172,6 +3181,183 @@ func TestGatewayServiceImport(t *testing.T) {
})
}

func TestGatewayServiceImportWithQueue(t *testing.T) {
oa := testDefaultOptionsForGateway("A")
setAccountUserPassInOptions(oa, "$foo", "clientA", "password")
setAccountUserPassInOptions(oa, "$bar", "yyyyyyy", "password")
sa := runGatewayServer(oa)
defer sa.Shutdown()

ob := testGatewayOptionsFromToWithServers(t, "B", "A", sa)
setAccountUserPassInOptions(ob, "$foo", "xxxxxxx", "password")
setAccountUserPassInOptions(ob, "$bar", "clientB", "password")
sb := runGatewayServer(ob)
defer sb.Shutdown()

waitForOutboundGateways(t, sa, 1, time.Second)
waitForOutboundGateways(t, sb, 1, time.Second)
waitForInboundGateways(t, sa, 1, time.Second)
waitForInboundGateways(t, sb, 1, time.Second)

// Get accounts
fooA, _ := sa.LookupAccount("$foo")
barA, _ := sa.LookupAccount("$bar")
fooB, _ := sb.LookupAccount("$foo")
barB, _ := sb.LookupAccount("$bar")

// Add in the service export for the requests. Make it public.
fooA.AddServiceExport("test.request", nil)
fooB.AddServiceExport("test.request", nil)

// Add import abilities to server B's bar account from foo.
if err := barB.AddServiceImport(fooB, "foo.request", "test.request"); err != nil {
t.Fatalf("Error adding service import: %v", err)
}
// Same on A.
if err := barA.AddServiceImport(fooA, "foo.request", "test.request"); err != nil {
t.Fatalf("Error adding service import: %v", err)
}

// clientA will be connected to srvA and be the service endpoint and responder.
aURL := fmt.Sprintf("nats://clientA:password@127.0.0.1:%d", oa.Port)
clientA := natsConnect(t, aURL)
defer clientA.Close()

subA := natsQueueSubSync(t, clientA, "test.request", "queue")
natsFlush(t, clientA)

// Now setup client B on srvB who will do a sub from account $bar
// that should map account $foo's foo subject.
bURL := fmt.Sprintf("nats://clientB:password@127.0.0.1:%d", ob.Port)
clientB := natsConnect(t, bURL)
defer clientB.Close()

subB := natsQueueSubSync(t, clientB, "reply", "queue2")
natsFlush(t, clientB)

// Wait for queue interest on test.request from A to be registered
// on server B.
checkForRegisteredQSubInterest(t, sb, "A", "$foo", "test.request", 1, time.Second)

for i := 0; i < 1; i++ {
// Send the request from clientB on foo.request,
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
natsFlush(t, clientB)

// Expect the request on A
msg, err := subA.NextMsg(time.Second)
if err != nil {
t.Fatalf("subA failed to get request: %v", err)
}
if msg.Subject != "test.request" || string(msg.Data) != "hi" {
t.Fatalf("Unexpected message: %v", msg)
}
if msg.Reply == "reply" {
t.Fatalf("Expected randomized reply, but got original")
}

// Send reply
natsPub(t, clientA, msg.Reply, []byte("ok"))
natsFlush(t, clientA)

msg, err = subB.NextMsg(time.Second)
if err != nil {
t.Fatalf("subB failed to get reply: %v", err)
}
if msg.Subject != "reply" || string(msg.Data) != "ok" {
t.Fatalf("Unexpected message: %v", msg)
}

expected := int64(i + 3)
vz, _ := sa.Varz(nil)
if vz.OutMsgs != expected {
t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs)
}

if i == 0 {
expected = 3
} else {
expected = 4
}
vz, _ = sb.Varz(nil)
if vz.OutMsgs != expected {
t.Fatalf("Expected %d outMsgs for B, got %v", expected, vz.OutMsgs)
}
}

checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
if ts := fooA.TotalSubs(); ts != 1 {
return fmt.Errorf("Expected one sub to be left on fooA, but got %d", ts)
}
return nil
})

// Speed up exiration
fooA.SetAutoExpireTTL(10 * time.Millisecond)

// Send 100 requests from clientB on foo.request,
for i := 0; i < 100; i++ {
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
}
natsFlush(t, clientB)

// Consume the requests, but don't reply to them...
for i := 0; i < 100; i++ {
if _, err := subA.NextMsg(time.Second); err != nil {
t.Fatalf("subA did not receive request: %v", err)
}
}

// These reply subjects will be dangling off of $foo account on serverA.
// Remove our service endpoint and wait for the dangling replies to go to zero.
natsUnsub(t, subA)
natsFlush(t, clientA)

checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
if ts := fooA.TotalSubs(); ts != 0 {
return fmt.Errorf("Number of subs is %d, should be zero", ts)
}
return nil
})

// Repeat similar test but without the small TTL and verify
// that if B is shutdown, the dangling subs for replies are
// cleared from the account sublist.
fooA.SetAutoExpireTTL(10 * time.Second)

subA = natsQueueSubSync(t, clientA, "test.request", "queue")
natsFlush(t, clientA)

// Send 100 requests from clientB on foo.request,
for i := 0; i < 100; i++ {
natsPubReq(t, clientB, "foo.request", "reply", []byte("hi"))
}
natsFlush(t, clientB)

// Consume the requests, but don't reply to them...
for i := 0; i < 100; i++ {
if _, err := subA.NextMsg(time.Second); err != nil {
t.Fatalf("subA did not receive request: %v", err)
}
}

// Shutdown B
clientB.Close()
sb.Shutdown()

// Close our last sub
natsUnsub(t, subA)
natsFlush(t, clientA)

// Verify that they are gone before the 10 sec TTL
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
if ts := fooA.TotalSubs(); ts != 0 {
return fmt.Errorf("Number of subs is %d, should be zero", ts)
}
return nil
})
}

/*
func TestGatewayPermissions(t *testing.T) {
bo := testDefaultOptionsForGateway("B")
Expand Down

0 comments on commit 954b630

Please sign in to comment.