Skip to content

Commit

Permalink
Make TransientWorkflowTaskInfo more general (#3109)
Browse files Browse the repository at this point in the history
Convert this type to carry an arbitrary number of HistoryEvent instances
rather than 2 specific events.
  • Loading branch information
Matt McShane committed Jul 18, 2022
1 parent 44db941 commit 2cf6ec2
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 70 deletions.
168 changes: 127 additions & 41 deletions api/history/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions proto/internal/temporal/server/api/history/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,16 @@ import "dependencies/gogoproto/gogo.proto";
import "temporal/api/history/v1/message.proto";

message TransientWorkflowTaskInfo {
temporal.api.history.v1.HistoryEvent scheduled_event = 1;
temporal.api.history.v1.HistoryEvent started_event = 2;
//TODO (mmcshane): remove these deprecated fields after v1.18 is released

// Rather than use this field, instead add the event to the history_suffix list.
temporal.api.history.v1.HistoryEvent scheduled_event = 1 [deprecated = true];
// Rather than use this field, instead add the event to the history_suffix list.
temporal.api.history.v1.HistoryEvent started_event = 2 [deprecated = true];

// A list of history events that are to be appended to the "real" workflow
// history.
repeated temporal.api.history.v1.HistoryEvent history_suffix = 3;
}

// VersionHistoryItem contains signal eventId and the corresponding version.
Expand Down
70 changes: 43 additions & 27 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3793,25 +3793,21 @@ func (wh *WorkflowHandler) getRawHistory(
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
tag.Error(err))
}

blob, err := wh.payloadSerializer.SerializeEvent(transientWorkflowTaskInfo.ScheduledEvent, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, nil, err
}
rawHistory = append(rawHistory, &commonpb.DataBlob{
EncodingType: enumspb.ENCODING_TYPE_PROTO3,
Data: blob.Data,
})

blob, err = wh.payloadSerializer.SerializeEvent(transientWorkflowTaskInfo.StartedEvent, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, nil, err
suffix := extractHistorySuffix(transientWorkflowTaskInfo)

for _, event := range suffix {
blob, err := wh.payloadSerializer.SerializeEvent(event, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, nil, err
}
rawHistory = append(rawHistory, &commonpb.DataBlob{
EncodingType: enumspb.ENCODING_TYPE_PROTO3,
Data: blob.Data,
})
}
rawHistory = append(rawHistory, &commonpb.DataBlob{
EncodingType: enumspb.ENCODING_TYPE_PROTO3,
Data: blob.Data,
})
}

return rawHistory, resp.NextPageToken, nil
Expand Down Expand Up @@ -3871,7 +3867,6 @@ func (wh *WorkflowHandler) getHistory(
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
tag.Error(err))
return nil, nil, err
}

if len(nextPageToken) == 0 && transientWorkflowTaskInfo != nil {
Expand All @@ -3884,7 +3879,7 @@ func (wh *WorkflowHandler) getHistory(
tag.Error(err))
}
// Append the transient workflow task events once we are done enumerating everything from the events table
historyEvents = append(historyEvents, transientWorkflowTaskInfo.ScheduledEvent, transientWorkflowTaskInfo.StartedEvent)
historyEvents = append(historyEvents, extractHistorySuffix(transientWorkflowTaskInfo)...)
}

if err := wh.processOutgoingSearchAttributes(historyEvents, namespace); err != nil {
Expand Down Expand Up @@ -3998,20 +3993,41 @@ func (wh *WorkflowHandler) processIncomingSearchAttributes(searchAttributes *com
}

func (wh *WorkflowHandler) validateTransientWorkflowTaskEvents(
expectedNextEventID int64,
eventIDOffset int64,
transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo,
) error {

if transientWorkflowTaskInfo.ScheduledEvent.GetEventId() == expectedNextEventID &&
transientWorkflowTaskInfo.StartedEvent.GetEventId() == expectedNextEventID+1 {
return nil
suffix := extractHistorySuffix(transientWorkflowTaskInfo)
for i, event := range suffix {
expectedEventID := eventIDOffset + int64(i)
if event.GetEventId() != expectedEventID {
return serviceerror.NewInternal(
fmt.Sprintf(
"invalid transient workflow task at position %v; expected event ID %v, found event ID %v",
i,
expectedEventID,
event.GetEventId()))
}
}

return fmt.Errorf("invalid transient workflow task: expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
expectedNextEventID,
expectedNextEventID+1,
transientWorkflowTaskInfo.ScheduledEvent.GetEventId(),
transientWorkflowTaskInfo.StartedEvent.GetEventId())
return nil
}

func extractHistorySuffix(transientTasks *historyspb.TransientWorkflowTaskInfo) []*historypb.HistoryEvent {
// TODO (mmcshane): remove this function after v1.18 is release as we will
// be able to just use transientTasks.HistorySuffix directly and the other
// fields will be removed.

suffix := transientTasks.HistorySuffix
if len(suffix) == 0 {
// HistorySuffix is a new field - we may still need to handle
// instances that carry the separate ScheduledEvent and StartedEvent
// fields

// One might be tempted to check for nil here but the old code did not
// make that check and we aim to preserve compatiblity
suffix = append(suffix, transientTasks.ScheduledEvent, transientTasks.StartedEvent)
}
return suffix
}

func (wh *WorkflowHandler) validateTaskQueue(t *taskqueuepb.TaskQueue) error {
Expand Down
Loading

0 comments on commit 2cf6ec2

Please sign in to comment.