Skip to content

Commit

Permalink
Handle unknown cluster during replication (#3619)
Browse files Browse the repository at this point in the history
* Handle unknown cluster during replication
  • Loading branch information
yux0 committed Nov 18, 2022
1 parent b13ed16 commit 4483f10
Show file tree
Hide file tree
Showing 21 changed files with 91 additions and 49 deletions.
15 changes: 9 additions & 6 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
FakeClusterForEmptyVersion = "fake-cluster-for-empty-version"
)

var ErrUnknownCluster = fmt.Errorf("unknown cluster")

type (
Metadata interface {
common.Daemon
Expand All @@ -74,7 +76,7 @@ type (
// GetAllClusterInfo return the all cluster name -> corresponding info
GetAllClusterInfo() map[string]ClusterInformation
// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
ClusterNameForFailoverVersion(isGlobalNamespace bool, failoverVersion int64) string
ClusterNameForFailoverVersion(isGlobalNamespace bool, failoverVersion int64) (string, error)
// GetFailoverVersionIncrement return the Failover version increment value
GetFailoverVersionIncrement() int64
RegisterMetadataChangeCallback(callbackId any, cb CallbackFn)
Expand Down Expand Up @@ -342,15 +344,15 @@ func (m *metadataImpl) GetAllClusterInfo() map[string]ClusterInformation {
return result
}

func (m *metadataImpl) ClusterNameForFailoverVersion(isGlobalNamespace bool, failoverVersion int64) string {
func (m *metadataImpl) ClusterNameForFailoverVersion(isGlobalNamespace bool, failoverVersion int64) (string, error) {
if failoverVersion == common.EmptyVersion {
// Local namespace uses EmptyVersion. But local namespace could be promoted to global namespace. Once promoted,
// workflows with EmptyVersion could be replicated to other clusters. The receiving cluster needs to know that
// those workflows are not from their current cluster.
if isGlobalNamespace {
return FakeClusterForEmptyVersion
return FakeClusterForEmptyVersion, nil
}
return m.currentClusterName
return m.currentClusterName, nil
}

if !isGlobalNamespace {
Expand All @@ -370,14 +372,15 @@ func (m *metadataImpl) ClusterNameForFailoverVersion(isGlobalNamespace bool, fai
defer m.clusterLock.RUnlock()
clusterName, ok := m.versionToClusterName[initialFailoverVersion]
if !ok {
panic(fmt.Sprintf(
m.logger.Warn(fmt.Sprintf(
"Unknown initial failover version %v with given cluster initial failover version map: %v and failover version increment %v.",
initialFailoverVersion,
m.clusterInfo,
m.failoverVersionIncrement,
))
return "", ErrUnknownCluster
}
return clusterName
return clusterName, nil
}

func (m *metadataImpl) GetFailoverVersionIncrement() int64 {
Expand Down
5 changes: 3 additions & 2 deletions common/cluster/metadata_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions common/cluster/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,17 @@ func (s *metadataSuite) Test_IsVersionFromSameCluster() {
}

func (s *metadataSuite) Test_ClusterNameForFailoverVersion() {
s.Equal(s.clusterName, s.metadata.ClusterNameForFailoverVersion(true, 101))
s.Equal(s.secondClusterName, s.metadata.ClusterNameForFailoverVersion(true, 204))
clusterName, err := s.metadata.ClusterNameForFailoverVersion(true, 101)
s.NoError(err)
s.Equal(s.clusterName, clusterName)

clusterName2, err := s.metadata.ClusterNameForFailoverVersion(true, 204)
s.NoError(err)
s.Equal(s.secondClusterName, clusterName2)

clusterName3, err := s.metadata.ClusterNameForFailoverVersion(true, 217)
s.ErrorIs(err, ErrUnknownCluster)
s.Equal("", clusterName3)
}

func (s *metadataSuite) Test_RegisterMetadataChangeCallback() {
Expand Down
7 changes: 6 additions & 1 deletion service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -170,10 +171,14 @@ func NewWorkflowVersionCheck(
if prevLastWriteVersion > newMutableState.GetCurrentVersion() {
clusterMetadata := shard.GetClusterMetadata()
namespaceEntry := newMutableState.GetNamespaceEntry()
clusterName, err := clusterMetadata.ClusterNameForFailoverVersion(namespaceEntry.IsGlobalNamespace(), prevLastWriteVersion)
if err != nil && err != cluster.ErrUnknownCluster {
return err
}
return serviceerror.NewNamespaceNotActive(
namespaceEntry.Name().String(),
clusterMetadata.GetCurrentClusterName(),
clusterMetadata.ClusterNameForFailoverVersion(namespaceEntry.IsGlobalNamespace(), prevLastWriteVersion),
clusterName,
)
}
return nil
Expand Down
7 changes: 6 additions & 1 deletion service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -125,10 +126,14 @@ func Invoke(
prevLastWriteVersion = t.LastWriteVersion
if workflowContext.GetMutableState().GetCurrentVersion() < prevLastWriteVersion {
clusterMetadata := shard.GetClusterMetadata()
clusterName, err := clusterMetadata.ClusterNameForFailoverVersion(namespaceEntry.IsGlobalNamespace(), prevLastWriteVersion)
if err != nil && err != cluster.ErrUnknownCluster {
return nil, err
}
return nil, serviceerror.NewNamespaceNotActive(
request.GetNamespace(),
clusterMetadata.GetCurrentClusterName(),
clusterMetadata.ClusterNameForFailoverVersion(namespaceEntry.IsGlobalNamespace(), prevLastWriteVersion),
clusterName,
)
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (s *engine2Suite) SetupTest() {
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(false).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, tests.Version).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, tests.Version).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.workflowCache = workflow.NewCache(s.mockShard)
s.logger = log.NewMockLogger(s.controller)
s.logger.EXPECT().Debug(gomock.Any(), gomock.Any()).AnyTimes()
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine3_eventsv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *engine3Suite) SetupTest() {

s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(false).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
s.workflowCache = workflow.NewCache(s.mockShard)
s.logger = s.mockShard.GetLogger()
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *engineSuite) SetupTest() {
s.mockClusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestSingleDCClusterInfo).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.LocalNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespace(tests.Namespace).Return(tests.LocalNamespaceEntry, nil).AnyTimes()

Expand Down
2 changes: 1 addition & 1 deletion service/history/ndc/branch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (s *branchMgrSuite) TestFlushBufferedEvents() {
int64(0),
).Return(&historypb.HistoryEvent{}, nil)
s.mockMutableState.EXPECT().FlushBufferedEvents()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, lastWriteVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, lastWriteVersion).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

s.mockContext.EXPECT().UpdateWorkflowExecutionAsActive(gomock.Any(), gomock.Any()).Return(nil)
Expand Down
5 changes: 4 additions & 1 deletion service/history/ndc/replication_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ func newReplicationTask(
lastEvent := events[len(events)-1]
version := firstEvent.GetVersion()

sourceCluster := clusterMetadata.ClusterNameForFailoverVersion(true, version)
sourceCluster, err := clusterMetadata.ClusterNameForFailoverVersion(true, version)
if err != nil {
return nil, err
}

eventTime := time.Time{}
for _, event := range events {
Expand Down
14 changes: 7 additions & 7 deletions service/history/ndc/transaction_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Open()
targetWorkflow.EXPECT().GetReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName, nil).AnyTimes()

s.mockEventsReapplier.EXPECT().ReapplyEvents(ctx, mutableState, workflowEvents.Events, runID).Return(workflowEvents.Events, nil)

Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Closed
targetWorkflow.EXPECT().GetMutableState().Return(mutableState).AnyTimes()
targetWorkflow.EXPECT().GetReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(false).AnyTimes()
Expand Down Expand Up @@ -283,7 +283,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Closed_ResetF
targetWorkflow.EXPECT().GetMutableState().Return(mutableState).AnyTimes()
targetWorkflow.EXPECT().GetReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(false).AnyTimes()
Expand Down Expand Up @@ -351,7 +351,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_Open(
targetWorkflow.EXPECT().GetMutableState().Return(mutableState).AnyTimes()
targetWorkflow.EXPECT().GetReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestAlternativeClusterName).AnyTimes()

mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(true).AnyTimes()
Expand Down Expand Up @@ -388,7 +388,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_Close
targetWorkflow.EXPECT().GetMutableState().Return(mutableState).AnyTimes()
targetWorkflow.EXPECT().GetReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestAlternativeClusterName).AnyTimes()

mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(false).AnyTimes()
Expand Down Expand Up @@ -446,7 +446,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_NotCurrentWorkflow_Active() {
targetWorkflow.EXPECT().GetMutableState().Return(mutableState).AnyTimes()
targetWorkflow.EXPECT().GetReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(false).AnyTimes()
Expand Down Expand Up @@ -503,7 +503,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_NotCurrentWorkflow_Passive()
targetWorkflow.EXPECT().GetMutableState().Return(mutableState).AnyTimes()
targetWorkflow.EXPECT().GetReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.namespaceEntry.FailoverVersion()).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestAlternativeClusterName).AnyTimes()

mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(false).AnyTimes()
Expand Down
11 changes: 9 additions & 2 deletions service/history/ndc/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ func (r *WorkflowImpl) SuppressBy(
return workflow.TransactionPolicyPassive, nil
}

lastWriteCluster := r.clusterMetadata.ClusterNameForFailoverVersion(true, lastWriteVersion)
lastWriteCluster, err := r.clusterMetadata.ClusterNameForFailoverVersion(true, lastWriteVersion)
if err != nil {
return workflow.TransactionPolicyActive, err
}
currentCluster := r.clusterMetadata.GetCurrentClusterName()

if currentCluster == lastWriteCluster {
Expand All @@ -212,7 +215,11 @@ func (r *WorkflowImpl) FlushBufferedEvents() error {
return err
}

lastWriteCluster := r.clusterMetadata.ClusterNameForFailoverVersion(true, lastWriteVersion)
lastWriteCluster, err := r.clusterMetadata.ClusterNameForFailoverVersion(true, lastWriteVersion)
if err != nil {
return err
}

currentCluster := r.clusterMetadata.GetCurrentClusterName()

if lastWriteCluster != currentCluster {
Expand Down
4 changes: 2 additions & 2 deletions service/history/ndc/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
incomingMockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: incomingRunID,
}).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, lastEventVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, lastEventVersion).Return(cluster.TestCurrentClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

s.mockMutableState.EXPECT().UpdateCurrentVersion(lastEventVersion, true).Return(nil).AnyTimes()
Expand Down Expand Up @@ -364,7 +364,7 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Zombiefy() {
RunId: incomingRunID,
}).AnyTimes()

s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, lastEventVersion).Return(cluster.TestAlternativeClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, lastEventVersion).Return(cluster.TestAlternativeClusterName, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

// if workflow is in zombie or finished state, keep as is
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *timerQueueActiveTaskExecutorSuite) SetupTest() {
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(s.mockClusterMetadata.GetCurrentClusterName()).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(s.mockClusterMetadata.GetCurrentClusterName(), nil).AnyTimes()
s.workflowCache = workflow.NewCache(s.mockShard)
s.logger = s.mockShard.GetLogger()

Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() {
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(s.clusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(s.clusterName, nil).AnyTimes()
s.workflowCache = workflow.NewCache(s.mockShard)
s.logger = s.mockShard.GetLogger()

Expand Down
Loading

0 comments on commit 4483f10

Please sign in to comment.