diff --git a/locksordering.txt b/locksordering.txt index 89832611940..5e685d8dd1e 100644 --- a/locksordering.txt +++ b/locksordering.txt @@ -14,3 +14,5 @@ allow that lock to be held and the acquire a client lock which is not possible with the normal account lock. accountLeafList -> client + +AccountResolver interface has various implementations, but assume: AccountResolver -> Server diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index e7dbefe5357..4fc95340d16 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2734,25 +2734,6 @@ func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) { } } -var jsClusterAccountLimitsTempl = ` - listen: 127.0.0.1:-1 - server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} - - cluster { - name: %s - listen: 127.0.0.1:%d - routes = [%s] - } - - no_auth_user: js - - accounts { - $JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} } - $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } - } -` - func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) { // Purposely make this unbalanced. Without changes this will never form a quorum to elect the meta-leader. c := createMixedModeCluster(t, jsMixedModeGlobalAccountTempl, "MMCS5", _EMPTY_, 3, 4, false) @@ -5552,10 +5533,8 @@ func TestJetStreamClusterConsumerOverrides(t *testing.T) { o := mset.lookupConsumer("m") require_True(t, o != nil) - o.mu.RLock() st := o.store.Type() n := o.raftNode() - o.mu.RUnlock() require_True(t, n != nil) rn := n.(*raft) rn.RLock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index e5f4bbe68a2..ad9b4978f9b 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1372,7 +1372,7 @@ func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T) } // https://github.com/nats-io/nats-server/issues/3677 -func TestJetStreamParallelStreamCreation(t *testing.T) { +func TestJetStreamClusterParallelStreamCreation(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1414,7 +1414,7 @@ func TestJetStreamParallelStreamCreation(t *testing.T) { // In addition to test above, if streams were attempted to be created in parallel // it could be that multiple raft groups would be created for the same asset. -func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) { +func TestJetStreamClusterParallelStreamCreationDupeRaftGroups(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1463,19 +1463,19 @@ func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) { expected := 2 rg := make(map[string]struct{}) for _, s := range c.servers { - s.mu.RLock() + s.rnMu.RLock() for _, ni := range s.raftNodes { n := ni.(*raft) rg[n.Group()] = struct{}{} } - s.mu.RUnlock() + s.rnMu.RUnlock() } if len(rg) != expected { t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg)) } } -func TestJetStreamParallelConsumerCreation(t *testing.T) { +func TestJetStreamClusterParallelConsumerCreation(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1538,19 +1538,19 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) { expected := 3 rg := make(map[string]struct{}) for _, s := range c.servers { - s.mu.RLock() + s.rnMu.RLock() for _, ni := range s.raftNodes { n := ni.(*raft) rg[n.Group()] = struct{}{} } - s.mu.RUnlock() + s.rnMu.RUnlock() } if len(rg) != expected { t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg)) } } -func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) { +func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -3470,19 +3470,20 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) { }() defer close(qch) + s.mu.RLock() + gacc := s.gacc + s.mu.RUnlock() + if gacc == nil { + t.Fatalf("No global account") + } // Make sure we do not have any R1 assets placed on the lameduck server. for s.isRunning() { - s.mu.RLock() - if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil { - s.mu.RUnlock() - break - } - hasAsset := len(s.js.srv.gacc.streams()) > 0 - s.mu.RUnlock() - if hasAsset { + if len(gacc.streams()) > 0 { t.Fatalf("Server had an R1 asset when it should not due to lameduck mode") } + time.Sleep(15 * time.Millisecond) } + s.WaitForShutdown() } // If a consumer has not been registered (possible in heavily loaded systems with lots of assets) @@ -3999,9 +4000,14 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) { sa.Group.Cluster = _EMPTY_ sa.Group.Preferred = _EMPTY_ // Insert into meta layer. - s.mu.RLock() - s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa)) - s.mu.RUnlock() + if sjs := s.getJetStream(); sjs != nil { + sjs.mu.RLock() + meta := sjs.cluster.meta + sjs.mu.RUnlock() + if meta != nil { + meta.ForwardProposal(encodeUpdateStreamAssignment(sa)) + } + } // Make sure it got propagated.. checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { sa := mset.streamAssignment().copyGroup() @@ -4714,7 +4720,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { require_True(t, si.State.Msgs == uint64(toSend)) } -func TestJetStreamBinaryStreamSnapshotCapability(t *testing.T) { +func TestJetStreamClusterBinaryStreamSnapshotCapability(t *testing.T) { c := createJetStreamClusterExplicit(t, "NATS", 3) defer c.shutdown() @@ -4783,7 +4789,7 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) { } } -func TestJetStreamAccountUsageDrifts(t *testing.T) { +func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { tmpl := ` listen: 127.0.0.1:-1 server_name: %s diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 1d6813dd984..d80892d77d3 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -281,6 +281,25 @@ var jsMixedModeGlobalAccountTempl = ` var jsGWTempl = `%s{name: %s, urls: [%s]}` +var jsClusterAccountLimitsTempl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + no_auth_user: js + + accounts { + $JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + func createJetStreamTaggedSuperCluster(t *testing.T) *supercluster { return createJetStreamTaggedSuperClusterWithGWProxy(t, nil) } diff --git a/server/server.go b/server/server.go index 44551631e2d..8a340961990 100644 --- a/server/server.go +++ b/server/server.go @@ -1272,6 +1272,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) // Setup the account resolver. For memory resolver, make sure the JWTs are // properly formed but do not enforce expiration etc. +// Lock is held on entry, but may be released/reacquired during this call. func (s *Server) configureResolver() error { opts := s.getOpts() s.accResolver = opts.AccountResolver @@ -1286,7 +1287,12 @@ func (s *Server) configureResolver() error { } } if len(opts.resolverPreloads) > 0 { - if s.accResolver.IsReadOnly() { + // Lock ordering is account resolver -> server, so we need to release + // the lock and reacquire it when done with account resolver's calls. + ar := s.accResolver + s.mu.Unlock() + defer s.mu.Lock() + if ar.IsReadOnly() { return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR") } for k, v := range opts.resolverPreloads { @@ -1294,7 +1300,7 @@ func (s *Server) configureResolver() error { if err != nil { return fmt.Errorf("preload account error for %q: %v", k, err) } - s.accResolver.Store(k, v) + ar.Store(k, v) } } }