Skip to content

Commit

Permalink
Remove ExecutableWrapper (#5033)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
I discovered an issue with the `ExecutableWrapper`
[here](https://github.com/temporalio/temporal/blob/df4705d6488485ae12e27f4cb1c719b62c980304/service/history/queues/executable.go#L451).
Because we don't wrap the `Reschedule` method, it adds the unwrapped
version to the rescheduler, which loses all of the wrapped behavior. I
implemented a fix using the "delegate" pattern, where the base instance
holds a reference to the wrapped instance, but it is fraught with peril:
you need to remember to use the delegate everywhere it's applicable, and
you need to remember to set it--also only one delegate can be set, so
you can't have different wrappers for the same object. As a result, I
decided to get rid of the wrapper-based approach, and instead added the
code directly to the base executable implementation.

<!-- Tell your future self why have you made these changes -->
**Why?**
The huge refactoring is necessary because the `ExecutableWrapper`
approach is too complicated and risky.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
There's still 100% test coverage for the DLQ code. Most imporantly,
there is now an end-to-end integration test which starts a workflow that
produces terminally failing tasks, verifies that it's added to the DLQ,
and then uses the tdbg read command to delete the DLQ tasks. This wasn't
possible before due to the bug in the `ExecutableWrapper`.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
MichaelSnowden committed Oct 25, 2023
1 parent 276d610 commit c3d53ba
Show file tree
Hide file tree
Showing 39 changed files with 624 additions and 1,425 deletions.
114 changes: 58 additions & 56 deletions api/common/v1/dlq.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/persistence/data_interfaces.go
Expand Up @@ -1269,7 +1269,7 @@ type (

RawHistoryTask struct {
MessageMetadata MessageMetadata
Task *persistencespb.HistoryTask
Payload *persistencespb.HistoryTask
}

ReadRawTasksResponse struct {
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/history_task_queue_manager.go
Expand Up @@ -151,7 +151,7 @@ func (m *HistoryTaskQueueManagerImpl) ReadRawTasks(
return nil, fmt.Errorf("%v: %w", ErrMsgDeserializeRawHistoryTask, err)
}
responseTasks[i].MessageMetadata = message.MetaData
responseTasks[i].Task = &task
responseTasks[i].Payload = &task
}

return &ReadRawTasksResponse{
Expand All @@ -170,7 +170,7 @@ func (m *HistoryTaskQueueManagerImpl) ReadTasks(ctx context.Context, request *Re
resTasks := make([]HistoryTask, len(response.Tasks))

for i, rawTask := range response.Tasks {
blob := rawTask.Task.Blob
blob := rawTask.Payload.Blob
if blob == nil {
return nil, serialization.NewDeserializationError(enums.ENCODING_TYPE_PROTO3, ErrHistoryTaskBlobIsNil)
}
Expand Down
Expand Up @@ -42,7 +42,6 @@ import (
"go.temporal.io/server/common/persistence/persistencetest"
"go.temporal.io/server/service/history/api/deletedlqtasks/deletedlqtaskstest"
"go.temporal.io/server/service/history/api/getdlqtasks/getdlqtaskstest"
"go.temporal.io/server/service/history/queues/queuestest"
"go.temporal.io/server/service/history/tasks"
)

Expand Down Expand Up @@ -136,10 +135,6 @@ func RunHistoryTaskQueueManagerTestSuite(t *testing.T, queue persistence.QueueV2
t.Parallel()
historytest.TestClient(t, historyTaskQueueManager)
})
t.Run("ExecutableTest", func(t *testing.T) {
t.Parallel()
queuestest.TestExecutable(t, historyTaskQueueManager)
})
}

func testHistoryTaskQueueManagerCreateQueueErr(t *testing.T, queue persistence.QueueV2) {
Expand Down
5 changes: 3 additions & 2 deletions proto/internal/temporal/server/api/common/v1/dlq.proto
Expand Up @@ -8,7 +8,7 @@ import "temporal/api/common/v1/message.proto";
message HistoryTask {
// shard_id is included to avoid having to deserialize the task blob.
int32 shard_id = 1;
temporal.api.common.v1.DataBlob task = 2;
temporal.api.common.v1.DataBlob blob = 2;
}

message HistoryDLQTaskMetadata {
Expand All @@ -20,7 +20,8 @@ message HistoryDLQTaskMetadata {
// queue).
message HistoryDLQTask {
HistoryDLQTaskMetadata metadata = 1;
HistoryTask task = 2;
// This is named payload to prevent stuttering (e.g. task.Task).
HistoryTask payload = 2;
}

// HistoryDLQKey is a compound key that identifies a history DLQ.
Expand Down
7 changes: 4 additions & 3 deletions service/frontend/admin_handler.go
Expand Up @@ -1808,13 +1808,13 @@ func (adh *AdminHandler) PurgeDLQTasks(
}
workflowID := fmt.Sprintf("delete-dlq-tasks-%s", key.GetQueueName())
client := adh.sdkClientFactory.GetSystemClient()
future, err := client.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
run, err := client.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
ID: workflowID,
TaskQueue: primitives.DefaultWorkerTaskQueue,
}, dlq.WorkflowName, dlq.WorkflowParams{
WorkflowType: dlq.WorkflowTypeDelete,
DeleteParams: dlq.DeleteParams{
TaskCategory: int(category.ID()),
TaskCategory: category.ID(),
SourceCluster: sourceCluster,
TargetCluster: targetCluster,
MaxMessageID: request.InclusiveMaxTaskMetadata.MessageId,
Expand All @@ -1823,9 +1823,10 @@ func (adh *AdminHandler) PurgeDLQTasks(
if err != nil {
return nil, err
}
runID := run.GetRunID()
jobToken := adminservice.DLQJobToken{
WorkflowId: workflowID,
RunId: future.GetRunID(),
RunId: runID,
}
jobTokenBytes, _ := jobToken.Marshal()
return &adminservice.PurgeDLQTasksResponse{
Expand Down
8 changes: 4 additions & 4 deletions service/frontend/admin_handler_test.go
Expand Up @@ -1173,9 +1173,9 @@ func (s *adminHandlerSuite) TestGetDLQTasks() {
Metadata: &commonspb.HistoryDLQTaskMetadata{
MessageId: 21,
},
Task: &commonspb.HistoryTask{
Payload: &commonspb.HistoryTask{
ShardId: 34,
Task: blob,
Blob: blob,
},
},
},
Expand All @@ -1202,9 +1202,9 @@ func (s *adminHandlerSuite) TestGetDLQTasks() {
Metadata: &commonspb.HistoryDLQTaskMetadata{
MessageId: 21,
},
Task: &commonspb.HistoryTask{
Payload: &commonspb.HistoryTask{
ShardId: 34,
Task: blob,
Blob: blob,
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions service/history/api/getdlqtasks/api.go
Expand Up @@ -77,9 +77,9 @@ func Invoke(
Metadata: &commonspb.HistoryDLQTaskMetadata{
MessageId: task.MessageMetadata.ID,
},
Task: &commonspb.HistoryTask{
ShardId: task.Task.ShardId,
Task: task.Task.Blob,
Payload: &commonspb.HistoryTask{
ShardId: task.Payload.ShardId,
Blob: task.Payload.Blob,
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/getdlqtasks/getdlqtaskstest/apitest.go
Expand Up @@ -81,9 +81,9 @@ func TestInvoke(t *testing.T, manager persistence.HistoryTaskQueueManager) {
require.NoError(t, err)
require.Equal(t, 1, len(res.DlqTasks))
assert.Equal(t, int64(persistence.FirstQueueMessageID), res.DlqTasks[0].Metadata.MessageId)
assert.Equal(t, 1, int(res.DlqTasks[0].Task.ShardId))
assert.Equal(t, 1, int(res.DlqTasks[0].Payload.ShardId))
serializer := serialization.NewTaskSerializer()
outTask, err := serializer.DeserializeTask(tasks.CategoryTransfer, *res.DlqTasks[0].Task.Task)
outTask, err := serializer.DeserializeTask(tasks.CategoryTransfer, *res.DlqTasks[0].Payload.Blob)
require.NoError(t, err)
assert.Equal(t, inTask, outTask)
}

0 comments on commit c3d53ba

Please sign in to comment.