diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 575bfc5f99..5ba223b9e9 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -4846,3 +4846,223 @@ func TestLeafNodeDuplicateMsg(t *testing.T) { t.Run("sub_b2_pub_a1", func(t *testing.T) { check(t, b2, a1) }) t.Run("sub_b2_pub_a2", func(t *testing.T) { check(t, b2, a2) }) } + +func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithSeparateAccounts(t *testing.T) { + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 2) + defer sc.shutdown() + + // Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO. + var lnTmpl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + {{leaf}} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }} + ` + + var leafFrag = ` + leaf { + listen: 127.0.0.1:-1 + remotes [ + { urls: [ %s ] } + { urls: [ %s ] } + ] + }` + + // We want to have two leaf node connections that join to the same local account on the leafnode servers, + // but connect to different accounts in different clusters. + c1 := sc.clusters[0] // Will connect to account ONE + c2 := sc.clusters[1] // Will connect to account TWO + + genLeafTmpl := func(tmpl string) string { + t.Helper() + + var ln1, ln2 []string + for _, s := range c1.servers { + if s.ClusterName() != c1.name { + continue + } + ln := s.getOpts().LeafNode + ln1 = append(ln1, fmt.Sprintf("nats://one:p@%s:%d", ln.Host, ln.Port)) + } + + for _, s := range c2.servers { + if s.ClusterName() != c2.name { + continue + } + ln := s.getOpts().LeafNode + ln2 = append(ln2, fmt.Sprintf("nats://two:p@%s:%d", ln.Host, ln.Port)) + } + return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln1, ", "), strings.Join(ln2, ", ")), 1) + } + + tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1) + tmpl = genLeafTmpl(tmpl) + + ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false) + ln.waitOnClusterReady() + defer ln.shutdown() + + for _, s := range ln.servers { + checkLeafNodeConnectedCount(t, s, 2) + } + + // Now connect DQ subscribers to each cluster and they separate accounts, and make sure we get the right behavior, balanced between + // them when requests originate from the leaf cluster. + + // Create 5 clients for each cluster / account + var c1c, c2c []*nats.Conn + for i := 0; i < 5; i++ { + nc1, _ := jsClientConnect(t, c1.randomServer(), nats.UserInfo("one", "p")) + defer nc1.Close() + c1c = append(c1c, nc1) + nc2, _ := jsClientConnect(t, c2.randomServer(), nats.UserInfo("two", "p")) + defer nc2.Close() + c2c = append(c2c, nc2) + } + + createSubs := func(num int, conns []*nats.Conn) (subs []*nats.Subscription) { + for i := 0; i < num; i++ { + nc := conns[rand.Intn(len(conns))] + sub, err := nc.QueueSubscribeSync("REQUEST", "MC") + require_NoError(t, err) + subs = append(subs, sub) + nc.Flush() + } + // Let subs propagate. + time.Sleep(100 * time.Millisecond) + return subs + } + closeSubs := func(subs []*nats.Subscription) { + for _, sub := range subs { + sub.Unsubscribe() + } + } + + // Simple test first. + subs1 := createSubs(1, c1c) + defer closeSubs(subs1) + subs2 := createSubs(1, c2c) + defer closeSubs(subs2) + + sendRequests := func(num int) { + // Now connect to the leaf cluster and send some requests. + nc, _ := jsClientConnect(t, ln.randomServer()) + defer nc.Close() + + for i := 0; i < num; i++ { + require_NoError(t, nc.Publish("REQUEST", []byte("HELP"))) + } + nc.Flush() + } + + pending := func(subs []*nats.Subscription) (total int) { + for _, sub := range subs { + n, _, err := sub.Pending() + require_NoError(t, err) + total += n + } + return total + } + + num := 1000 + checkAllReceived := func() error { + total := pending(subs1) + pending(subs2) + if total == num { + return nil + } + return fmt.Errorf("Not all received: %d vs %d", total, num) + } + + checkBalanced := func(total, pc1, pc2 int) { + tf := float64(total) + e1 := tf * (float64(pc1) / 100.00) + e2 := tf * (float64(pc2) / 100.00) + delta := tf / 10 + p1 := float64(pending(subs1)) + if p1 < e1-delta || p1 > e1+delta { + t.Fatalf("Value out of range for subs1, expected %v got %v", e1, p1) + } + p2 := float64(pending(subs2)) + if p2 < e2-delta || p2 > e2+delta { + t.Fatalf("Value out of range for subs2, expected %v got %v", e2, p2) + } + } + + // Now connect to the leaf cluster and send some requests. + + // Simple 50/50 + sendRequests(num) + checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived) + checkBalanced(num, 50, 50) + + closeSubs(subs1) + closeSubs(subs2) + + // Now test unbalanced. 10/90 + subs1 = createSubs(1, c1c) + defer closeSubs(subs1) + subs2 = createSubs(9, c2c) + defer closeSubs(subs2) + + sendRequests(num) + checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived) + checkBalanced(num, 10, 90) + + // Now test draining the subs as we are sending from an initial balanced situation simulating a draining of a cluster. + + closeSubs(subs1) + closeSubs(subs2) + subs1, subs2 = nil, nil + + // These subs slightly different. + var r1, r2 atomic.Uint64 + for i := 0; i < 20; i++ { + nc := c1c[rand.Intn(len(c1c))] + sub, err := nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r1.Add(1) }) + require_NoError(t, err) + subs1 = append(subs1, sub) + nc.Flush() + + nc = c2c[rand.Intn(len(c2c))] + sub, err = nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r2.Add(1) }) + require_NoError(t, err) + subs2 = append(subs2, sub) + nc.Flush() + } + defer closeSubs(subs1) + defer closeSubs(subs2) + + nc, _ := jsClientConnect(t, ln.randomServer()) + defer nc.Close() + + for i, dindex := 0, 1; i < num; i++ { + require_NoError(t, nc.Publish("REQUEST", []byte("HELP"))) + // Check if we have more to simulate draining. + // Will drain within first ~100 requests using 20% rand test below. + // Will leave 1 behind. + if dindex < len(subs1)-1 && rand.Intn(6) > 4 { + sub := subs1[dindex] + dindex++ + sub.Drain() + } + } + nc.Flush() + + checkFor(t, time.Second, 200*time.Millisecond, func() error { + total := int(r1.Load() + r2.Load()) + if total == num { + return nil + } + return fmt.Errorf("Not all received: %d vs %d", total, num) + }) + require_True(t, r2.Load() > r1.Load()) +} diff --git a/server/sublist.go b/server/sublist.go index 47d45999fa..48375b6b48 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -615,7 +615,7 @@ func (s *Sublist) reduceCacheCount() { // Helper function for auto-expanding remote qsubs. func isRemoteQSub(sub *subscription) bool { - return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER + return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF) } // UpdateRemoteQSub should be called when we update the weight of an existing diff --git a/test/system_services_test.go b/test/system_services_test.go index a1bad16b2c..a925e7ba35 100644 --- a/test/system_services_test.go +++ b/test/system_services_test.go @@ -262,7 +262,7 @@ func TestSystemServiceSubscribersLeafNodesWithoutSystem(t *testing.T) { // For now we do not see all the details behind a leafnode if the leafnode is not enabled. checkDbgNumSubs(t, nc, "foo.bar.3", 2) - checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 11) + checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 12) } func runSolicitLeafServerWithSystemToURL(surl string) (*server.Server, *server.Options) {