Skip to content

Commit

Permalink
Drop namespace replication task if it does not live in current cluster (
Browse files Browse the repository at this point in the history
#2842)

* Drop namespace replication task if namespace does not live in current cluster
  • Loading branch information
yiminc committed May 13, 2022
1 parent 690ad54 commit f959451
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 28 deletions.
37 changes: 27 additions & 10 deletions common/namespace/replicationTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,23 @@ type (
}

namespaceReplicationTaskExecutorImpl struct {
metadataManagerV2 persistence.MetadataManager
logger log.Logger
currentCluster string
metadataManager persistence.MetadataManager
logger log.Logger
}
)

// NewReplicationTaskExecutor create a new instance of namespace replicator
func NewReplicationTaskExecutor(
currentCluster string,
metadataManagerV2 persistence.MetadataManager,
logger log.Logger,
) ReplicationTaskExecutor {

return &namespaceReplicationTaskExecutorImpl{
metadataManagerV2: metadataManagerV2,
logger: logger,
currentCluster: currentCluster,
metadataManager: metadataManagerV2,
logger: logger,
}
}

Expand All @@ -98,6 +101,11 @@ func (h *namespaceReplicationTaskExecutorImpl) Execute(
return err
}

if !checkClusterIncludedInReplicationConfig(h.currentCluster, task.ReplicationConfig.Clusters) {
// namespace does not live in current cluster, ignore it
return nil
}

switch task.GetNamespaceOperation() {
case enumsspb.NAMESPACE_OPERATION_CREATE:
return h.handleNamespaceCreationReplicationTask(ctx, task)
Expand All @@ -108,6 +116,15 @@ func (h *namespaceReplicationTaskExecutorImpl) Execute(
}
}

func checkClusterIncludedInReplicationConfig(clusterName string, repCfg []*replicationpb.ClusterReplicationConfig) bool {
for _, cluster := range repCfg {
if clusterName == cluster.ClusterName {
return true
}
}
return false
}

// handleNamespaceCreationReplicationTask handles the namespace creation replication task
func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicationTask(
ctx context.Context,
Expand Down Expand Up @@ -146,14 +163,14 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicatio
IsGlobalNamespace: true, // local namespace will not be replicated
}

_, err = h.metadataManagerV2.CreateNamespace(ctx, request)
_, err = h.metadataManager.CreateNamespace(ctx, request)
if err != nil {
// SQL and Cassandra handle namespace UUID collision differently
// here, whenever seeing a error replicating a namespace
// do a check if there is a name / UUID collision

recordExists := true
resp, getErr := h.metadataManagerV2.GetNamespace(ctx, &persistence.GetNamespaceRequest{
resp, getErr := h.metadataManager.GetNamespace(ctx, &persistence.GetNamespaceRequest{
Name: task.Info.GetName(),
})
switch getErr.(type) {
Expand All @@ -169,7 +186,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicatio
return err
}

resp, getErr = h.metadataManagerV2.GetNamespace(ctx, &persistence.GetNamespaceRequest{
resp, getErr = h.metadataManager.GetNamespace(ctx, &persistence.GetNamespaceRequest{
ID: task.GetId(),
})
switch getErr.(type) {
Expand Down Expand Up @@ -207,15 +224,15 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationT
}

// first we need to get the current notification version since we need to it for conditional update
metadata, err := h.metadataManagerV2.GetMetadata(ctx)
metadata, err := h.metadataManager.GetMetadata(ctx)
if err != nil {
return err
}
notificationVersion := metadata.NotificationVersion

// plus, we need to check whether the config version is <= the config version set in the input
// plus, we need to check whether the failover version is <= the failover version set in the input
resp, err := h.metadataManagerV2.GetNamespace(ctx, &persistence.GetNamespaceRequest{
resp, err := h.metadataManager.GetNamespace(ctx, &persistence.GetNamespaceRequest{
Name: task.Info.GetName(),
})
if err != nil {
Expand Down Expand Up @@ -268,7 +285,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationT
return nil
}

return h.metadataManagerV2.UpdateNamespace(ctx, request)
return h.metadataManager.UpdateNamespace(ctx, request)
}

func (h *namespaceReplicationTaskExecutorImpl) validateNamespaceReplicationTask(task *replicationspb.NamespaceTaskAttributes) error {
Expand Down
7 changes: 6 additions & 1 deletion common/namespace/replicationTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *namespaceReplicationTaskExecutorSuite) SetupTest() {
s.TestBase = persistencetests.NewTestBaseWithCassandra(&persistencetests.TestBaseOptions{})
s.TestBase.Setup(nil)
logger := log.NewTestLogger()
s.namespaceReplicator = NewReplicationTaskExecutor(
s.namespaceReplicator = NewReplicationTaskExecutor("some random standby cluster name",
s.MetadataManager,
logger,
).(*namespaceReplicationTaskExecutorImpl)
Expand Down Expand Up @@ -128,6 +128,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_RegisterNamespaceTas
FailoverVersion: failoverVersion,
}

s.namespaceReplicator.currentCluster = clusterStandby
err := s.namespaceReplicator.Execute(context.Background(), task)
s.Nil(err)

Expand Down Expand Up @@ -410,6 +411,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_
metadata, err := s.MetadataManager.GetMetadata(context.Background())
s.Nil(err)
notificationVersion := metadata.NotificationVersion
s.namespaceReplicator.currentCluster = updateClusterStandby
err = s.namespaceReplicator.Execute(context.Background(), updateTask)
s.Nil(err)
resp, err := s.MetadataManager.GetNamespace(context.Background(), &persistence.GetNamespaceRequest{Name: name})
Expand Down Expand Up @@ -538,6 +540,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_
metadata, err := s.MetadataManager.GetMetadata(context.Background())
s.Nil(err)
notificationVersion := metadata.NotificationVersion
s.namespaceReplicator.currentCluster = updateClusterStandby
err = s.namespaceReplicator.Execute(context.Background(), updateTask)
s.Nil(err)
resp, err := s.MetadataManager.GetNamespace(context.Background(), &persistence.GetNamespaceRequest{Name: name})
Expand Down Expand Up @@ -662,6 +665,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_
metadata, err := s.MetadataManager.GetMetadata(context.Background())
s.Nil(err)
notificationVersion := metadata.NotificationVersion
s.namespaceReplicator.currentCluster = updateClusterStandby
err = s.namespaceReplicator.Execute(context.Background(), updateTask)
s.Nil(err)
resp, err := s.MetadataManager.GetNamespace(context.Background(), &persistence.GetNamespaceRequest{Name: name})
Expand Down Expand Up @@ -785,6 +789,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_
ConfigVersion: updateConfigVersion,
FailoverVersion: updateFailoverVersion,
}
s.namespaceReplicator.currentCluster = updateClusterStandby
err = s.namespaceReplicator.Execute(context.Background(), updateTask)
s.Nil(err)
resp, err := s.MetadataManager.GetNamespace(context.Background(), &persistence.GetNamespaceRequest{Name: name})
Expand Down
2 changes: 1 addition & 1 deletion host/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er
HistoryConfig: options.HistoryConfig,
WorkerConfig: options.WorkerConfig,
MockAdminClient: options.MockAdminClient,
NamespaceReplicationTaskExecutor: namespace.NewReplicationTaskExecutor(testBase.MetadataManager, logger),
NamespaceReplicationTaskExecutor: namespace.NewReplicationTaskExecutor(options.ClusterMetadata.CurrentClusterName, testBase.MetadataManager, logger),
}

err = newPProfInitializerImpl(logger, pprofTestPort).Start()
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ var (
func NewAdminHandler(
args NewAdminHandlerArgs,
) *AdminHandler {

namespaceReplicationTaskExecutor := namespace.NewReplicationTaskExecutor(
args.ClusterMetadata.GetCurrentClusterName(),
args.PersistenceMetadataManager,
args.Logger,
)
Expand Down
18 changes: 3 additions & 15 deletions service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *adminHandlerSuite) SetupTest() {
health.NewServer(),
serialization.NewSerializer(),
}
s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New()).AnyTimes()
s.handler = NewAdminHandler(args)
s.handler.Start()
}
Expand Down Expand Up @@ -767,7 +768,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success()
var clusterId = uuid.New()
var recordVersion int64 = 5

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
Expand Down Expand Up @@ -807,7 +807,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Success
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
Expand Down Expand Up @@ -844,17 +843,15 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Success

func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ClusterNameConflict() {
var rpcAddress = uuid.New()
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(clusterName)
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
s.mockAdminClient,
)
s.mockAdminClient.EXPECT().DescribeCluster(gomock.Any(), &adminservice.DescribeClusterRequest{}).Return(
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
ClusterName: s.mockMetadata.GetCurrentClusterName(),
HistoryShardCount: 0,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
Expand All @@ -870,7 +867,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Failov
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(1))
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
s.mockAdminClient,
Expand All @@ -894,7 +890,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardC
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
s.mockAdminClient,
Expand All @@ -918,7 +913,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Global
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
s.mockAdminClient,
Expand All @@ -942,7 +936,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Initia
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{
uuid.New(): {InitialFailoverVersion: 0},
Expand Down Expand Up @@ -983,7 +976,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_GetClusterMetadata_Err
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
Expand Down Expand Up @@ -1011,7 +1003,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_Er
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
Expand Down Expand Up @@ -1051,7 +1042,6 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New())
s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
s.mockClientFactory.EXPECT().NewAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
Expand Down Expand Up @@ -1088,10 +1078,8 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No
}

func (s *adminHandlerSuite) Test_DescribeCluster_CurrentCluster_Success() {
var clusterName = uuid.New()
var clusterId = uuid.New()

s.mockMetadata.EXPECT().GetCurrentClusterName().Return(clusterName)
clusterName := s.mockMetadata.GetCurrentClusterName()
s.mockResource.MembershipMonitor.EXPECT().WhoAmI().Return(&membership.HostInfo{}, nil)
s.mockResource.MembershipMonitor.EXPECT().GetReachableMembers().Return(nil, nil)
s.mockResource.HistoryServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{})
Expand Down
1 change: 1 addition & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func (s *Service) startScanner() {

func (s *Service) startReplicator() {
namespaceReplicationTaskExecutor := namespace.NewReplicationTaskExecutor(
s.clusterMetadata.GetCurrentClusterName(),
s.metadataManager,
s.logger,
)
Expand Down

0 comments on commit f959451

Please sign in to comment.