From a84ce61a93b24e932b3edcfc625f76d1ade23b23 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 25 Sep 2023 15:09:11 -0600 Subject: [PATCH] [FIXED] Account resolver lock inversion There was a lock inversion but low risk since it happened during server initialization. Still fixed it and added the ordering in locksordering.txt file. Also fixed multiple lock inversions that were caused by tests. Signed-off-by: Ivan Kozlovic --- locksordering.txt | 2 ++ server/jetstream_cluster_2_test.go | 21 ------------- server/jetstream_cluster_3_test.go | 48 +++++++++++++++++------------- server/jetstream_helpers_test.go | 19 ++++++++++++ server/server.go | 10 +++++-- 5 files changed, 56 insertions(+), 44 deletions(-) diff --git a/locksordering.txt b/locksordering.txt index 8983261194..5e685d8dd1 100644 --- a/locksordering.txt +++ b/locksordering.txt @@ -14,3 +14,5 @@ allow that lock to be held and the acquire a client lock which is not possible with the normal account lock. accountLeafList -> client + +AccountResolver interface has various implementations, but assume: AccountResolver -> Server diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index e7dbefe535..4fc95340d1 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2734,25 +2734,6 @@ func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) { } } -var jsClusterAccountLimitsTempl = ` - listen: 127.0.0.1:-1 - server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} - - cluster { - name: %s - listen: 127.0.0.1:%d - routes = [%s] - } - - no_auth_user: js - - accounts { - $JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} } - $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } - } -` - func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) { // Purposely make this unbalanced. Without changes this will never form a quorum to elect the meta-leader. c := createMixedModeCluster(t, jsMixedModeGlobalAccountTempl, "MMCS5", _EMPTY_, 3, 4, false) @@ -5552,10 +5533,8 @@ func TestJetStreamClusterConsumerOverrides(t *testing.T) { o := mset.lookupConsumer("m") require_True(t, o != nil) - o.mu.RLock() st := o.store.Type() n := o.raftNode() - o.mu.RUnlock() require_True(t, n != nil) rn := n.(*raft) rn.RLock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index e5f4bbe68a..ad9b4978f9 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1372,7 +1372,7 @@ func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T) } // https://github.com/nats-io/nats-server/issues/3677 -func TestJetStreamParallelStreamCreation(t *testing.T) { +func TestJetStreamClusterParallelStreamCreation(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1414,7 +1414,7 @@ func TestJetStreamParallelStreamCreation(t *testing.T) { // In addition to test above, if streams were attempted to be created in parallel // it could be that multiple raft groups would be created for the same asset. -func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) { +func TestJetStreamClusterParallelStreamCreationDupeRaftGroups(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1463,19 +1463,19 @@ func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) { expected := 2 rg := make(map[string]struct{}) for _, s := range c.servers { - s.mu.RLock() + s.rnMu.RLock() for _, ni := range s.raftNodes { n := ni.(*raft) rg[n.Group()] = struct{}{} } - s.mu.RUnlock() + s.rnMu.RUnlock() } if len(rg) != expected { t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg)) } } -func TestJetStreamParallelConsumerCreation(t *testing.T) { +func TestJetStreamClusterParallelConsumerCreation(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1538,19 +1538,19 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) { expected := 3 rg := make(map[string]struct{}) for _, s := range c.servers { - s.mu.RLock() + s.rnMu.RLock() for _, ni := range s.raftNodes { n := ni.(*raft) rg[n.Group()] = struct{}{} } - s.mu.RUnlock() + s.rnMu.RUnlock() } if len(rg) != expected { t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg)) } } -func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) { +func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -3470,19 +3470,20 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) { }() defer close(qch) + s.mu.RLock() + gacc := s.gacc + s.mu.RUnlock() + if gacc == nil { + t.Fatalf("No global account") + } // Make sure we do not have any R1 assets placed on the lameduck server. for s.isRunning() { - s.mu.RLock() - if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil { - s.mu.RUnlock() - break - } - hasAsset := len(s.js.srv.gacc.streams()) > 0 - s.mu.RUnlock() - if hasAsset { + if len(gacc.streams()) > 0 { t.Fatalf("Server had an R1 asset when it should not due to lameduck mode") } + time.Sleep(15 * time.Millisecond) } + s.WaitForShutdown() } // If a consumer has not been registered (possible in heavily loaded systems with lots of assets) @@ -3999,9 +4000,14 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) { sa.Group.Cluster = _EMPTY_ sa.Group.Preferred = _EMPTY_ // Insert into meta layer. - s.mu.RLock() - s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa)) - s.mu.RUnlock() + if sjs := s.getJetStream(); sjs != nil { + sjs.mu.RLock() + meta := sjs.cluster.meta + sjs.mu.RUnlock() + if meta != nil { + meta.ForwardProposal(encodeUpdateStreamAssignment(sa)) + } + } // Make sure it got propagated.. checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { sa := mset.streamAssignment().copyGroup() @@ -4714,7 +4720,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { require_True(t, si.State.Msgs == uint64(toSend)) } -func TestJetStreamBinaryStreamSnapshotCapability(t *testing.T) { +func TestJetStreamClusterBinaryStreamSnapshotCapability(t *testing.T) { c := createJetStreamClusterExplicit(t, "NATS", 3) defer c.shutdown() @@ -4783,7 +4789,7 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) { } } -func TestJetStreamAccountUsageDrifts(t *testing.T) { +func TestJetStreamClusterAccountUsageDrifts(t *testing.T) { tmpl := ` listen: 127.0.0.1:-1 server_name: %s diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 1d6813dd98..d80892d77d 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -281,6 +281,25 @@ var jsMixedModeGlobalAccountTempl = ` var jsGWTempl = `%s{name: %s, urls: [%s]}` +var jsClusterAccountLimitsTempl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + no_auth_user: js + + accounts { + $JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + func createJetStreamTaggedSuperCluster(t *testing.T) *supercluster { return createJetStreamTaggedSuperClusterWithGWProxy(t, nil) } diff --git a/server/server.go b/server/server.go index 44551631e2..8a34096199 100644 --- a/server/server.go +++ b/server/server.go @@ -1272,6 +1272,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) // Setup the account resolver. For memory resolver, make sure the JWTs are // properly formed but do not enforce expiration etc. +// Lock is held on entry, but may be released/reacquired during this call. func (s *Server) configureResolver() error { opts := s.getOpts() s.accResolver = opts.AccountResolver @@ -1286,7 +1287,12 @@ func (s *Server) configureResolver() error { } } if len(opts.resolverPreloads) > 0 { - if s.accResolver.IsReadOnly() { + // Lock ordering is account resolver -> server, so we need to release + // the lock and reacquire it when done with account resolver's calls. + ar := s.accResolver + s.mu.Unlock() + defer s.mu.Lock() + if ar.IsReadOnly() { return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR") } for k, v := range opts.resolverPreloads { @@ -1294,7 +1300,7 @@ func (s *Server) configureResolver() error { if err != nil { return fmt.Errorf("preload account error for %q: %v", k, err) } - s.accResolver.Store(k, v) + ar.Store(k, v) } } }