Skip to content

Commit

Permalink
Namespace replication for failover history (#3439)
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 authored and dnr committed Sep 29, 2022
1 parent adc4cc8 commit 70ca71b
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 108 deletions.
270 changes: 175 additions & 95 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

25 changes: 17 additions & 8 deletions common/namespace/handler.go
Expand Up @@ -290,6 +290,7 @@ func (d *HandlerImpl) RegisterNamespace(
namespaceRequest.Namespace.ConfigVersion,
namespaceRequest.Namespace.FailoverVersion,
namespaceRequest.IsGlobalNamespace,
nil,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -330,8 +331,8 @@ func (d *HandlerImpl) ListNamespaces(
IsGlobalNamespace: namespace.IsGlobalNamespace,
FailoverVersion: namespace.Namespace.FailoverVersion,
}
desc.NamespaceInfo, desc.Config, desc.ReplicationConfig =
d.createResponse(ctx,
desc.NamespaceInfo, desc.Config, desc.ReplicationConfig, desc.FailoverHistory =
d.createResponse(
namespace.Namespace.Info,
namespace.Namespace.Config,
namespace.Namespace.ReplicationConfig)
Expand Down Expand Up @@ -366,8 +367,8 @@ func (d *HandlerImpl) DescribeNamespace(
IsGlobalNamespace: resp.IsGlobalNamespace,
FailoverVersion: resp.Namespace.FailoverVersion,
}
response.NamespaceInfo, response.Config, response.ReplicationConfig =
d.createResponse(ctx, resp.Namespace.Info, resp.Namespace.Config, resp.Namespace.ReplicationConfig)
response.NamespaceInfo, response.Config, response.ReplicationConfig, response.FailoverHistory =
d.createResponse(resp.Namespace.Info, resp.Namespace.Config, resp.Namespace.ReplicationConfig)
return response, nil
}

Expand Down Expand Up @@ -615,6 +616,7 @@ func (d *HandlerImpl) UpdateNamespace(
configVersion,
failoverVersion,
isGlobalNamespace,
failoverHistory,
)
if err != nil {
return nil, err
Expand All @@ -624,7 +626,7 @@ func (d *HandlerImpl) UpdateNamespace(
IsGlobalNamespace: isGlobalNamespace,
FailoverVersion: failoverVersion,
}
response.NamespaceInfo, response.Config, response.ReplicationConfig = d.createResponse(ctx, info, config, replicationConfig)
response.NamespaceInfo, response.Config, response.ReplicationConfig, _ = d.createResponse(info, config, replicationConfig)

d.logger.Info("Update namespace succeeded",
tag.WorkflowNamespace(info.Name),
Expand Down Expand Up @@ -682,11 +684,10 @@ func (d *HandlerImpl) DeprecateNamespace(
}

func (d *HandlerImpl) createResponse(
ctx context.Context,
info *persistencespb.NamespaceInfo,
config *persistencespb.NamespaceConfig,
replicationConfig *persistencespb.NamespaceReplicationConfig,
) (*namespacepb.NamespaceInfo, *namespacepb.NamespaceConfig, *replicationpb.NamespaceReplicationConfig) {
) (*namespacepb.NamespaceInfo, *namespacepb.NamespaceConfig, *replicationpb.NamespaceReplicationConfig, []*replicationpb.FailoverStatus) {

infoResult := &namespacepb.NamespaceInfo{
Name: info.Name,
Expand Down Expand Up @@ -719,7 +720,15 @@ func (d *HandlerImpl) createResponse(
Clusters: clusters,
}

return infoResult, configResult, replicationConfigResult
var failoverHistory []*replicationpb.FailoverStatus
for _, entry := range replicationConfig.GetFailoverHistory() {
failoverHistory = append(failoverHistory, &replicationpb.FailoverStatus{
FailoverTime: entry.GetFailoverTime(),
FailoverVersion: entry.GetFailoverVersion(),
})
}

return infoResult, configResult, replicationConfigResult, failoverHistory
}

func (d *HandlerImpl) mergeBadBinaries(
Expand Down
12 changes: 12 additions & 0 deletions common/namespace/replicationTaskExecutor.go
Expand Up @@ -296,6 +296,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationT
request.Namespace.ReplicationConfig.ActiveClusterName = task.ReplicationConfig.GetActiveClusterName()
request.Namespace.FailoverVersion = task.GetFailoverVersion()
request.Namespace.FailoverNotificationVersion = notificationVersion
request.Namespace.ReplicationConfig.FailoverHistory = convertFailoverHistoryToPersistenceProto(task.GetFailoverHistory())
}

if !recordUpdated {
Expand Down Expand Up @@ -333,6 +334,17 @@ func convertClusterReplicationConfigFromProto(
return output
}

func convertFailoverHistoryToPersistenceProto(failoverHistory []*replicationpb.FailoverStatus) []*persistencespb.FailoverStatus {
var persistencePb []*persistencespb.FailoverStatus
for _, status := range failoverHistory {
persistencePb = append(persistencePb, &persistencespb.FailoverStatus{
FailoverTime: status.GetFailoverTime(),
FailoverVersion: status.GetFailoverVersion(),
})
}
return persistencePb
}

func (h *namespaceReplicationTaskExecutorImpl) validateNamespaceStatus(input enumspb.NamespaceState) error {
switch input {
case enumspb.NAMESPACE_STATE_REGISTERED, enumspb.NAMESPACE_STATE_DEPRECATED:
Expand Down
9 changes: 9 additions & 0 deletions common/namespace/replicationTaskExecutor_test.go
Expand Up @@ -399,6 +399,13 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_
updateClusterStandby := "other random standby cluster name"
updateConfigVersion := int64(1)
updateFailoverVersion := int64(59)
failoverTime := time.Now()
failoverHistory := []*replicationpb.FailoverStatus{
{
FailoverTime: &failoverTime,
FailoverVersion: 999,
},
}
updateClusters := []*replicationpb.ClusterReplicationConfig{
{
ClusterName: updateClusterActive,
Expand Down Expand Up @@ -430,6 +437,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_
},
ConfigVersion: updateConfigVersion,
FailoverVersion: updateFailoverVersion,
FailoverHistory: failoverHistory,
}

s.namespaceReplicator.currentCluster = updateClusterStandby
Expand Down Expand Up @@ -464,6 +472,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{
ActiveClusterName: updateTask.ReplicationConfig.ActiveClusterName,
Clusters: []string{updateClusterActive, updateClusterStandby},
FailoverHistory: convertFailoverHistoryToPersistenceProto(failoverHistory),
},
ConfigVersion: updateConfigVersion,
FailoverNotificationVersion: updateFailoverVersion,
Expand Down
21 changes: 19 additions & 2 deletions common/namespace/transmissionTaskHandler.go
Expand Up @@ -53,6 +53,7 @@ type (
configVersion int64,
failoverVersion int64,
isGlobalNamespace bool,
failoverHistoy []*persistencespb.FailoverStatus,
) error
}

Expand Down Expand Up @@ -84,6 +85,7 @@ func (namespaceReplicator *namespaceReplicatorImpl) HandleTransmissionTask(
configVersion int64,
failoverVersion int64,
isGlobalNamespace bool,
failoverHistoy []*persistencespb.FailoverStatus,
) error {

if !isGlobalNamespace {
Expand Down Expand Up @@ -119,10 +121,11 @@ func (namespaceReplicator *namespaceReplicatorImpl) HandleTransmissionTask(
},
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: replicationConfig.ActiveClusterName,
Clusters: namespaceReplicator.convertClusterReplicationConfigToProto(replicationConfig.Clusters),
Clusters: convertClusterReplicationConfigToProto(replicationConfig.Clusters),
},
ConfigVersion: configVersion,
FailoverVersion: failoverVersion,
FailoverHistory: convertFailoverHistoryToReplicationProto(failoverHistoy),
},
}

Expand All @@ -134,7 +137,7 @@ func (namespaceReplicator *namespaceReplicatorImpl) HandleTransmissionTask(
})
}

func (namespaceReplicator *namespaceReplicatorImpl) convertClusterReplicationConfigToProto(
func convertClusterReplicationConfigToProto(
input []string,
) []*replicationpb.ClusterReplicationConfig {
output := make([]*replicationpb.ClusterReplicationConfig, 0, len(input))
Expand All @@ -143,3 +146,17 @@ func (namespaceReplicator *namespaceReplicatorImpl) convertClusterReplicationCon
}
return output
}

func convertFailoverHistoryToReplicationProto(
failoverHistoy []*persistencespb.FailoverStatus,
) []*replicationpb.FailoverStatus {
var replicationProto []*replicationpb.FailoverStatus
for _, failoverStatus := range failoverHistoy {
replicationProto = append(replicationProto, &replicationpb.FailoverStatus{
FailoverTime: failoverStatus.GetFailoverTime(),
FailoverVersion: failoverStatus.GetFailoverVersion(),
})
}

return replicationProto
}
12 changes: 9 additions & 3 deletions common/namespace/transmissionTaskHandler_test.go
Expand Up @@ -144,7 +144,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_RegisterNamespaceTask
},
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterActive,
Clusters: s.namespaceReplicator.convertClusterReplicationConfigToProto(clusters),
Clusters: convertClusterReplicationConfigToProto(clusters),
},
ConfigVersion: configVersion,
FailoverVersion: failoverVersion,
Expand All @@ -162,6 +162,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_RegisterNamespaceTask
configVersion,
failoverVersion,
isGlobalNamespace,
nil,
)
s.Nil(err)
}
Expand Down Expand Up @@ -216,6 +217,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_RegisterNamespaceTask
configVersion,
failoverVersion,
isGlobalNamespace,
nil,
)
s.Nil(err)
}
Expand Down Expand Up @@ -285,7 +287,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_I
},
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterActive,
Clusters: s.namespaceReplicator.convertClusterReplicationConfigToProto(clusters),
Clusters: convertClusterReplicationConfigToProto(clusters),
},
ConfigVersion: configVersion,
FailoverVersion: failoverVersion},
Expand All @@ -302,6 +304,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_I
configVersion,
failoverVersion,
isGlobalNamespace,
nil,
)
s.Nil(err)
}
Expand Down Expand Up @@ -355,6 +358,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_N
configVersion,
failoverVersion,
isGlobalNamespace,
nil,
)
s.Nil(err)
}
Expand Down Expand Up @@ -424,7 +428,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_R
},
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
ActiveClusterName: clusterActive,
Clusters: s.namespaceReplicator.convertClusterReplicationConfigToProto(singleClusterList),
Clusters: convertClusterReplicationConfigToProto(singleClusterList),
},
ConfigVersion: configVersion,
FailoverVersion: failoverVersion},
Expand All @@ -441,6 +445,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_R
configVersion,
failoverVersion,
isGlobalNamespace,
nil,
)
s.Nil(err)

Expand All @@ -454,6 +459,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_R
configVersion,
failoverVersion,
isGlobalNamespace,
nil,
)
s.Nil(err)
}
Expand Up @@ -97,6 +97,7 @@ message NamespaceTaskAttributes {
temporal.api.replication.v1.NamespaceReplicationConfig replication_config = 5;
int64 config_version = 6;
int64 failover_version = 7;
repeated temporal.api.replication.v1.FailoverStatus failover_history = 8;
}

message SyncShardStatusTaskAttributes {
Expand Down

0 comments on commit 70ca71b

Please sign in to comment.