Skip to content

Commit

Permalink
Per-namespace workers should only run on active cluster (#3426)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Sep 29, 2022
1 parent 88d6fc9 commit 02e2edf
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
9 changes: 7 additions & 2 deletions common/namespace/registry.go
Expand Up @@ -141,7 +141,7 @@ type (
Refresh()
// Registers callback for namespace state changes. This is regrettably
// different from the above RegisterNamespaceChangeCallback because we
// need different semantics (and that one is going away).
// need different semantics.
RegisterStateChangeCallback(key any, cb StateChangeCallbackFn)
UnregisterStateChangeCallback(key any)
}
Expand Down Expand Up @@ -489,7 +489,12 @@ UpdateLoop:
newEntries = append(newEntries, namespace)
}

if oldNSAnyVersion == nil || oldNSAnyVersion.State() != namespace.State() {
// this test should include anything that might affect whether a namespace is active on
// this cluster.
if oldNSAnyVersion == nil ||
oldNSAnyVersion.State() != namespace.State() ||
oldNSAnyVersion.IsGlobalNamespace() != namespace.IsGlobalNamespace() ||
oldNSAnyVersion.ActiveClusterName() != namespace.ActiveClusterName() {
stateChanged = append(stateChanged, namespace)
}
}
Expand Down
8 changes: 7 additions & 1 deletion service/worker/pernamespaceworker.go
Expand Up @@ -42,6 +42,7 @@ import (

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand All @@ -66,6 +67,7 @@ type (
NamespaceRegistry namespace.Registry
HostName resource.HostName
Config *Config
ClusterMetadata cluster.Metadata
Components []workercommon.PerNSWorkerComponent `group:"perNamespaceWorkerComponent"`
}

Expand All @@ -83,6 +85,7 @@ type (
serviceResolver membership.ServiceResolver
components []workercommon.PerNSWorkerComponent
initialRetry time.Duration
thisClusterName string

membershipChangedCh chan *membership.ChangedEvent

Expand Down Expand Up @@ -118,6 +121,7 @@ func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *p
config: params.Config,
components: params.Components,
initialRetry: 1 * time.Second,
thisClusterName: params.ClusterMetadata.GetCurrentClusterName(),
membershipChangedCh: make(chan *membership.ChangedEvent),
workers: make(map[namespace.ID]*perNamespaceWorker),
}
Expand Down Expand Up @@ -313,7 +317,9 @@ func (w *perNamespaceWorker) handleError(err error) {
// means that we should retry creating/starting the worker. Returning noWorkerNeeded means any
// existing worker should be stopped.
func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
if !w.wm.Running() || ns.State() == enumspb.NAMESPACE_STATE_DELETED {
if !w.wm.Running() ||
ns.State() == enumspb.NAMESPACE_STATE_DELETED ||
!ns.ActiveInCluster(w.wm.thisClusterName) {
return errNoWorkerNeeded
}

Expand Down
37 changes: 35 additions & 2 deletions service/worker/pernamespaceworker_test.go
Expand Up @@ -37,6 +37,7 @@ import (
sdkworker "go.temporal.io/sdk/worker"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -91,7 +92,8 @@ func (s *perNsWorkerManagerSuite) SetupTest() {
return util.Max(1, map[string]int{"ns1": 1, "ns2": 2, "ns3": 3}[ns])
},
},
Components: []workercommon.PerNSWorkerComponent{s.cmp1, s.cmp2},
Components: []workercommon.PerNSWorkerComponent{s.cmp1, s.cmp2},
ClusterMetadata: cluster.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(false, true)),
})
s.manager.initialRetry = 1 * time.Millisecond

Expand Down Expand Up @@ -122,6 +124,20 @@ func (s *perNsWorkerManagerSuite) TestDisabled() {
time.Sleep(50 * time.Millisecond)
}

func (s *perNsWorkerManagerSuite) TestInactive() {
ns := testInactiveNS("ns1", enumspb.NAMESPACE_STATE_REGISTERED)

s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{
Enabled: true,
}).AnyTimes()
s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{
Enabled: true,
}).AnyTimes()

s.manager.namespaceCallback(ns, false)
time.Sleep(50 * time.Millisecond)
}

func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() {
ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED)

Expand Down Expand Up @@ -402,7 +418,24 @@ func testns(name string, state enumspb.NamespaceState) *namespace.Namespace {
Name: name,
},
nil,
"cluster",
cluster.TestCurrentClusterName,
)
}

func testInactiveNS(name string, state enumspb.NamespaceState) *namespace.Namespace {
return namespace.NewNamespaceForTest(
&persistencespb.NamespaceInfo{
Id: name,
State: state,
Name: name,
},
nil,
true,
&persistencespb.NamespaceReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName, // not active
Clusters: cluster.TestAllClusterNames,
},
0,
)
}

Expand Down

0 comments on commit 02e2edf

Please sign in to comment.