diff --git a/common/namespace/registry.go b/common/namespace/registry.go index 48ece732323..4225c71d88e 100644 --- a/common/namespace/registry.go +++ b/common/namespace/registry.go @@ -141,7 +141,7 @@ type ( Refresh() // Registers callback for namespace state changes. This is regrettably // different from the above RegisterNamespaceChangeCallback because we - // need different semantics (and that one is going away). + // need different semantics. RegisterStateChangeCallback(key any, cb StateChangeCallbackFn) UnregisterStateChangeCallback(key any) } @@ -489,7 +489,12 @@ UpdateLoop: newEntries = append(newEntries, namespace) } - if oldNSAnyVersion == nil || oldNSAnyVersion.State() != namespace.State() { + // this test should include anything that might affect whether a namespace is active on + // this cluster. + if oldNSAnyVersion == nil || + oldNSAnyVersion.State() != namespace.State() || + oldNSAnyVersion.IsGlobalNamespace() != namespace.IsGlobalNamespace() || + oldNSAnyVersion.ActiveClusterName() != namespace.ActiveClusterName() { stateChanged = append(stateChanged, namespace) } } diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index c782669df1b..e018b3fe525 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -42,6 +42,7 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -66,6 +67,7 @@ type ( NamespaceRegistry namespace.Registry HostName resource.HostName Config *Config + ClusterMetadata cluster.Metadata Components []workercommon.PerNSWorkerComponent `group:"perNamespaceWorkerComponent"` } @@ -83,6 +85,7 @@ type ( serviceResolver membership.ServiceResolver components []workercommon.PerNSWorkerComponent initialRetry time.Duration + thisClusterName string membershipChangedCh chan *membership.ChangedEvent @@ -118,6 +121,7 @@ func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *p config: params.Config, components: params.Components, initialRetry: 1 * time.Second, + thisClusterName: params.ClusterMetadata.GetCurrentClusterName(), membershipChangedCh: make(chan *membership.ChangedEvent), workers: make(map[namespace.ID]*perNamespaceWorker), } @@ -313,7 +317,9 @@ func (w *perNamespaceWorker) handleError(err error) { // means that we should retry creating/starting the worker. Returning noWorkerNeeded means any // existing worker should be stopped. func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { - if !w.wm.Running() || ns.State() == enumspb.NAMESPACE_STATE_DELETED { + if !w.wm.Running() || + ns.State() == enumspb.NAMESPACE_STATE_DELETED || + !ns.ActiveInCluster(w.wm.thisClusterName) { return errNoWorkerNeeded } diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 902e14655cf..20be02c9a13 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -37,6 +37,7 @@ import ( sdkworker "go.temporal.io/sdk/worker" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" "go.temporal.io/server/common/membership" "go.temporal.io/server/common/namespace" @@ -91,7 +92,8 @@ func (s *perNsWorkerManagerSuite) SetupTest() { return util.Max(1, map[string]int{"ns1": 1, "ns2": 2, "ns3": 3}[ns]) }, }, - Components: []workercommon.PerNSWorkerComponent{s.cmp1, s.cmp2}, + Components: []workercommon.PerNSWorkerComponent{s.cmp1, s.cmp2}, + ClusterMetadata: cluster.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(false, true)), }) s.manager.initialRetry = 1 * time.Millisecond @@ -122,6 +124,20 @@ func (s *perNsWorkerManagerSuite) TestDisabled() { time.Sleep(50 * time.Millisecond) } +func (s *perNsWorkerManagerSuite) TestInactive() { + ns := testInactiveNS("ns1", enumspb.NAMESPACE_STATE_REGISTERED) + + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, + }).AnyTimes() + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, + }).AnyTimes() + + s.manager.namespaceCallback(ns, false) + time.Sleep(50 * time.Millisecond) +} + func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) @@ -402,7 +418,24 @@ func testns(name string, state enumspb.NamespaceState) *namespace.Namespace { Name: name, }, nil, - "cluster", + cluster.TestCurrentClusterName, + ) +} + +func testInactiveNS(name string, state enumspb.NamespaceState) *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{ + Id: name, + State: state, + Name: name, + }, + nil, + true, + &persistencespb.NamespaceReplicationConfig{ + ActiveClusterName: cluster.TestAlternativeClusterName, // not active + Clusters: cluster.TestAllClusterNames, + }, + 0, ) }