Skip to content

Commit

Permalink
Add CloseVisibilityTaskID to DeleteExecutionVisibilityTask (#3391)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 15, 2022
1 parent ce9a66f commit c91d596
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 235 deletions.
472 changes: 256 additions & 216 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

30 changes: 16 additions & 14 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,15 +927,16 @@ func (s *TaskSerializer) visibilityDeleteTaskToProto(
deleteVisibilityTask *tasks.DeleteExecutionVisibilityTask,
) *persistencespb.VisibilityTaskInfo {
return &persistencespb.VisibilityTaskInfo{
NamespaceId: deleteVisibilityTask.WorkflowKey.NamespaceID,
WorkflowId: deleteVisibilityTask.WorkflowKey.WorkflowID,
RunId: deleteVisibilityTask.WorkflowKey.RunID,
TaskType: enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION,
Version: deleteVisibilityTask.Version,
TaskId: deleteVisibilityTask.TaskID,
VisibilityTime: &deleteVisibilityTask.VisibilityTimestamp,
StartTime: deleteVisibilityTask.StartTime,
CloseTime: deleteVisibilityTask.CloseTime,
NamespaceId: deleteVisibilityTask.WorkflowKey.NamespaceID,
WorkflowId: deleteVisibilityTask.WorkflowKey.WorkflowID,
RunId: deleteVisibilityTask.WorkflowKey.RunID,
TaskType: enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION,
Version: deleteVisibilityTask.Version,
TaskId: deleteVisibilityTask.TaskID,
VisibilityTime: &deleteVisibilityTask.VisibilityTimestamp,
StartTime: deleteVisibilityTask.StartTime,
CloseTime: deleteVisibilityTask.CloseTime,
CloseVisibilityTaskId: deleteVisibilityTask.CloseVisibilityTaskID,
}
}

Expand All @@ -948,11 +949,12 @@ func (s *TaskSerializer) visibilityDeleteTaskFromProto(
deleteVisibilityTask.WorkflowId,
deleteVisibilityTask.RunId,
),
VisibilityTimestamp: *deleteVisibilityTask.VisibilityTime,
TaskID: deleteVisibilityTask.TaskId,
Version: deleteVisibilityTask.Version,
StartTime: deleteVisibilityTask.StartTime,
CloseTime: deleteVisibilityTask.CloseTime,
VisibilityTimestamp: *deleteVisibilityTask.VisibilityTime,
TaskID: deleteVisibilityTask.TaskId,
Version: deleteVisibilityTask.Version,
StartTime: deleteVisibilityTask.StartTime,
CloseTime: deleteVisibilityTask.CloseTime,
CloseVisibilityTaskID: deleteVisibilityTask.CloseVisibilityTaskId,
}
}

Expand Down
23 changes: 23 additions & 0 deletions common/persistence/serialization/task_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,29 @@ func (s *taskSerializerSuite) TestReplicateHistoryTask() {
s.assertEqualTasks(replicateHistoryTask)
}

func (s *taskSerializerSuite) TestDeleteExecutionVisibilityTask() {
replicateHistoryTask := &tasks.DeleteExecutionVisibilityTask{
WorkflowKey: s.workflowKey,
VisibilityTimestamp: time.Unix(0, 0).UTC(), // go == compare for location as well which is striped during marshaling/unmarshaling
TaskID: rand.Int63(),
Version: rand.Int63(),
CloseVisibilityTaskID: rand.Int63(),
}

s.assertEqualTasks(replicateHistoryTask)
}

func (s *taskSerializerSuite) TestDeleteExecutionTask() {
replicateHistoryTask := &tasks.DeleteExecutionTask{
WorkflowKey: s.workflowKey,
VisibilityTimestamp: time.Unix(0, 0).UTC(), // go == compare for location as well which is striped during marshaling/unmarshaling
TaskID: rand.Int63(),
Version: rand.Int63(),
}

s.assertEqualTasks(replicateHistoryTask)
}

func (s *taskSerializerSuite) assertEqualTasks(
task tasks.Task,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ message ShardInfo {
reserved 15;
// Map from task category to ack levels of the corresponding queue processor
map<int32, QueueAckLevel> queue_ack_levels = 16;
map<int32, QueueState> queue_states = 17;
map<int32, QueueState> queue_states = 17;
}

// execution column
Expand Down Expand Up @@ -205,6 +205,7 @@ message VisibilityTaskInfo {
google.protobuf.Timestamp visibility_time = 7 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp close_time = 8 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp start_time = 9 [(gogoproto.stdtime) = true];
int64 close_visibility_task_id = 10;
}

// timer column
Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *controllerSuite) TestAcquireShardSuccess() {
if hostID == 0 {
myShards = append(myShards, shardID)
s.mockHistoryEngine.EXPECT().Start().Return()
s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any())
s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).AnyTimes()
s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(s.hostInfo, nil).Times(2)
s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine)
s.mockShardManager.EXPECT().GetOrCreateShard(gomock.Any(), getOrCreateShardRequestMatcher(shardID)).Return(
Expand Down
11 changes: 8 additions & 3 deletions service/history/tasks/delete_visibility_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var _ Task = (*DeleteExecutionVisibilityTask)(nil)
type (
DeleteExecutionVisibilityTask struct {
definition.WorkflowKey
VisibilityTimestamp time.Time
TaskID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
Version int64
CloseVisibilityTaskID int64
// These two fields are needed for cassandra standard visibility.
// TODO (alex): Remove them when cassandra standard visibility is removed.
StartTime *time.Time
Expand Down Expand Up @@ -81,3 +82,7 @@ func (t *DeleteExecutionVisibilityTask) GetCategory() Category {
func (t *DeleteExecutionVisibilityTask) GetType() enumsspb.TaskType {
return enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION
}

func (t *DeleteExecutionVisibilityTask) GetCloseVisibilityTaskID() int64 {
return t.CloseVisibilityTaskID
}

0 comments on commit c91d596

Please sign in to comment.