Skip to content

Commit

Permalink
Rename legacy history replication task (#2901)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed May 25, 2022
1 parent 8a2dcec commit 89c0ef3
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 186 deletions.
312 changes: 156 additions & 156 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions host/ndc/ndc_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1881,14 +1881,15 @@ func (s *nDCIntegrationTestSuite) applyEventsThroughFetcher(
replicationTask := &replicationspb.ReplicationTask{
TaskType: taskType,
SourceTaskId: 1,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
NamespaceId: s.namespaceID,
WorkflowId: workflowID,
RunId: runID,
VersionHistoryItems: versionHistory.GetItems(),
Events: eventBlob,
NewRunEvents: newRunEventBlob,
}},
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: s.namespaceID,
WorkflowId: workflowID,
RunId: runID,
VersionHistoryItems: versionHistory.GetItems(),
Events: eventBlob,
NewRunEvents: newRunEventBlob,
}},
}

s.standByReplicationTasksChan <- replicationTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ message ReplicationTask {
NamespaceTaskAttributes namespace_task_attributes = 3;
SyncShardStatusTaskAttributes sync_shard_status_task_attributes = 5;
SyncActivityTaskAttributes sync_activity_task_attributes = 6;
HistoryTaskV2Attributes history_task_v2_attributes = 8;
HistoryTaskAttributes history_task_attributes = 8;
SyncWorkflowStateTaskAttributes sync_workflow_state_task_attributes = 10;
}
google.protobuf.Timestamp visibility_time = 9 [(gogoproto.stdtime) = true];
Expand Down Expand Up @@ -123,7 +123,7 @@ message SyncActivityTaskAttributes {
temporal.server.api.history.v1.VersionHistory version_history = 14;
}

message HistoryTaskV2Attributes {
message HistoryTaskAttributes {
reserved 1;
string namespace_id = 2;
string workflow_id = 3;
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,8 @@ func (p *ackMgrImpl) generateHistoryReplicationTask(
replicationTask := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
SourceTaskId: taskID,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
RunId: runID,
Expand Down
8 changes: 4 additions & 4 deletions service/history/replication/dlq_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ func (s *dlqHandlerSuite) TestReadMessages_OK() {
remoteTask := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_TASK,
SourceTaskId: taskID,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID,
WorkflowId: workflowID,
RunId: runID,
Expand Down Expand Up @@ -250,8 +250,8 @@ func (s *dlqHandlerSuite) TestMergeMessages() {
remoteTask := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_TASK,
SourceTaskId: taskID,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID,
WorkflowId: workflowID,
RunId: runID,
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
forceApply bool,
) error {

attr := task.GetHistoryTaskV2Attributes()
attr := task.GetHistoryTaskAttributes()
doContinue, err := e.filterTask(namespace.ID(attr.GetNamespaceId()), forceApply)
if err != nil || !doContinue {
return err
Expand Down
8 changes: 4 additions & 4 deletions service/history/replication/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() {
runID := uuid.New()
task := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
RunId: runID,
Expand Down Expand Up @@ -328,8 +328,8 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
runID := uuid.New()
task := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
RunId: runID,
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask(
}, nil

case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK:
taskAttributes := replicationTask.GetHistoryTaskV2Attributes()
taskAttributes := replicationTask.GetHistoryTaskAttributes()

events, err := p.historySerializer.DeserializeEvents(taskAttributes.GetEvents())
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_History() {

task := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID,
WorkflowId: workflowID,
RunId: runID,
Expand Down Expand Up @@ -337,8 +337,8 @@ func (s *taskProcessorSuite) TestConvertTaskToDLQTask_History() {

task := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID,
WorkflowId: workflowID,
RunId: runID,
Expand Down Expand Up @@ -425,8 +425,8 @@ func (s *taskProcessorSuite) TestPaginationFn_Success_More() {
task := &replicationspb.ReplicationTask{
SourceTaskId: taskID,
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID,
WorkflowId: workflowID,
RunId: runID,
Expand Down Expand Up @@ -498,8 +498,8 @@ func (s *taskProcessorSuite) TestPaginationFn_Success_NoMore() {
task := &replicationspb.ReplicationTask{
SourceTaskId: taskID,
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
Attributes: &replicationspb.ReplicationTask_HistoryTaskV2Attributes{
HistoryTaskV2Attributes: &replicationspb.HistoryTaskV2Attributes{
Attributes: &replicationspb.ReplicationTask_HistoryTaskAttributes{
HistoryTaskAttributes: &replicationspb.HistoryTaskAttributes{
NamespaceId: namespaceID,
WorkflowId: workflowID,
RunId: runID,
Expand Down

0 comments on commit 89c0ef3

Please sign in to comment.