From e1dda3afc3d4f8eca70f9bda063a944994c38660 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 6 Dec 2018 18:18:04 -0700 Subject: [PATCH] Added Gateway test for service import with queue group Signed-off-by: Ivan Kozlovic --- server/gateway_test.go | 186 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) diff --git a/server/gateway_test.go b/server/gateway_test.go index bd2bcfdc4da..76142f27cd2 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -165,6 +165,15 @@ func natsQueueSub(t *testing.T, nc *nats.Conn, subj, queue string, cb nats.MsgHa return sub } +func natsQueueSubSync(t *testing.T, nc *nats.Conn, subj, queue string) *nats.Subscription { + t.Helper() + sub, err := nc.QueueSubscribeSync(subj, queue) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + return sub +} + func natsFlush(t *testing.T, nc *nats.Conn) { t.Helper() if err := nc.Flush(); err != nil { @@ -3172,6 +3181,183 @@ func TestGatewayServiceImport(t *testing.T) { }) } +func TestGatewayServiceImportWithQueue(t *testing.T) { + oa := testDefaultOptionsForGateway("A") + setAccountUserPassInOptions(oa, "$foo", "clientA", "password") + setAccountUserPassInOptions(oa, "$bar", "yyyyyyy", "password") + sa := runGatewayServer(oa) + defer sa.Shutdown() + + ob := testGatewayOptionsFromToWithServers(t, "B", "A", sa) + setAccountUserPassInOptions(ob, "$foo", "xxxxxxx", "password") + setAccountUserPassInOptions(ob, "$bar", "clientB", "password") + sb := runGatewayServer(ob) + defer sb.Shutdown() + + waitForOutboundGateways(t, sa, 1, time.Second) + waitForOutboundGateways(t, sb, 1, time.Second) + waitForInboundGateways(t, sa, 1, time.Second) + waitForInboundGateways(t, sb, 1, time.Second) + + // Get accounts + fooA, _ := sa.LookupAccount("$foo") + barA, _ := sa.LookupAccount("$bar") + fooB, _ := sb.LookupAccount("$foo") + barB, _ := sb.LookupAccount("$bar") + + // Add in the service export for the requests. Make it public. + fooA.AddServiceExport("test.request", nil) + fooB.AddServiceExport("test.request", nil) + + // Add import abilities to server B's bar account from foo. + if err := barB.AddServiceImport(fooB, "foo.request", "test.request"); err != nil { + t.Fatalf("Error adding service import: %v", err) + } + // Same on A. + if err := barA.AddServiceImport(fooA, "foo.request", "test.request"); err != nil { + t.Fatalf("Error adding service import: %v", err) + } + + // clientA will be connected to srvA and be the service endpoint and responder. + aURL := fmt.Sprintf("nats://clientA:password@127.0.0.1:%d", oa.Port) + clientA := natsConnect(t, aURL) + defer clientA.Close() + + subA := natsQueueSubSync(t, clientA, "test.request", "queue") + natsFlush(t, clientA) + + // Now setup client B on srvB who will do a sub from account $bar + // that should map account $foo's foo subject. + bURL := fmt.Sprintf("nats://clientB:password@127.0.0.1:%d", ob.Port) + clientB := natsConnect(t, bURL) + defer clientB.Close() + + subB := natsSubSync(t, clientB, "reply") + natsFlush(t, clientB) + + // Wait for queue interest on test.request from A to be registered + // on server B. + checkForRegisteredQSubInterest(t, sb, "A", "$foo", "test.request", 1, time.Second) + + for i := 0; i < 1; i++ { + // Send the request from clientB on foo.request, + natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) + natsFlush(t, clientB) + + // Expect the request on A + msg, err := subA.NextMsg(time.Second) + if err != nil { + t.Fatalf("subA failed to get request: %v", err) + } + if msg.Subject != "test.request" || string(msg.Data) != "hi" { + t.Fatalf("Unexpected message: %v", msg) + } + if msg.Reply == "reply" { + t.Fatalf("Expected randomized reply, but got original") + } + + // Send reply + natsPub(t, clientA, msg.Reply, []byte("ok")) + natsFlush(t, clientA) + + msg, err = subB.NextMsg(time.Second) + if err != nil { + t.Fatalf("subB failed to get reply: %v", err) + } + if msg.Subject != "reply" || string(msg.Data) != "ok" { + t.Fatalf("Unexpected message: %v", msg) + } + + expected := int64((i + 1) * 2) + vz, _ := sa.Varz(nil) + if vz.OutMsgs != expected { + t.Fatalf("Expected %d outMsgs for A, got %v", expected, vz.OutMsgs) + } + + if i == 0 { + expected = 3 + } else { + expected = 4 + } + vz, _ = sb.Varz(nil) + if vz.OutMsgs != expected { + t.Fatalf("Expected %d outMsgs for B, got %v", expected, vz.OutMsgs) + } + } + + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + if ts := fooA.TotalSubs(); ts != 1 { + return fmt.Errorf("Expected one sub to be left on fooA, but got %d", ts) + } + return nil + }) + + // Speed up exiration + fooA.SetAutoExpireTTL(10 * time.Millisecond) + + // Send 100 requests from clientB on foo.request, + for i := 0; i < 100; i++ { + natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) + } + natsFlush(t, clientB) + + // Consume the requests, but don't reply to them... + for i := 0; i < 100; i++ { + if _, err := subA.NextMsg(time.Second); err != nil { + t.Fatalf("subA did not receive request: %v", err) + } + } + + // These reply subjects will be dangling off of $foo account on serverA. + // Remove our service endpoint and wait for the dangling replies to go to zero. + natsUnsub(t, subA) + natsFlush(t, clientA) + + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + if ts := fooA.TotalSubs(); ts != 0 { + return fmt.Errorf("Number of subs is %d, should be zero", ts) + } + return nil + }) + + // Repeat similar test but without the small TTL and verify + // that if B is shutdown, the dangling subs for replies are + // cleared from the account sublist. + fooA.SetAutoExpireTTL(10 * time.Second) + + subA = natsQueueSubSync(t, clientA, "test.request", "queue") + natsFlush(t, clientA) + + // Send 100 requests from clientB on foo.request, + for i := 0; i < 100; i++ { + natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) + } + natsFlush(t, clientB) + + // Consume the requests, but don't reply to them... + for i := 0; i < 100; i++ { + if _, err := subA.NextMsg(time.Second); err != nil { + t.Fatalf("subA did not receive request: %v", err) + } + } + + // Shutdown B + clientB.Close() + sb.Shutdown() + + // Close our last sub + natsUnsub(t, subA) + natsFlush(t, clientA) + + // Verify that they are gone before the 10 sec TTL + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + if ts := fooA.TotalSubs(); ts != 0 { + return fmt.Errorf("Number of subs is %d, should be zero", ts) + } + return nil + }) +} + /* func TestGatewayPermissions(t *testing.T) { bo := testDefaultOptionsForGateway("B")