Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional service protection against data inconsistency #1450

Merged
merged 3 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
551 changes: 318 additions & 233 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

428 changes: 214 additions & 214 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion common/persistence/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type (
const (
defaultLastNodeID = common.FirstEventID - 1
defaultLastTransactionID = int64(0)

// TrimHistoryBranch will only dump metadata, relatively cheap
trimHistoryBranchPageSize = 1000
)

var _ HistoryManager = (*historyV2ManagerImpl)(nil)
Expand Down Expand Up @@ -145,7 +148,7 @@ func (m *historyV2ManagerImpl) TrimHistoryBranch(
shardID := request.ShardID
minNodeID := common.FirstEventID
maxNodeID := request.NodeID + 1
pageSize := 1000
pageSize := trimHistoryBranchPageSize

branch, err := serialization.HistoryBranchFromBlob(request.BranchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ message GetMutableStateResponse {
temporal.api.enums.v1.WorkflowExecutionStatus workflow_status = 16;
temporal.server.api.history.v1.VersionHistories version_histories = 17;
bool is_sticky_task_queue_enabled = 18;
int64 last_first_event_txn_id = 19;
}

message PollMutableStateRequest {
Expand Down Expand Up @@ -121,6 +122,7 @@ message PollMutableStateResponse {
temporal.server.api.history.v1.VersionHistories version_histories = 14;
temporal.server.api.enums.v1.WorkflowExecutionState workflow_state = 15;
temporal.api.enums.v1.WorkflowExecutionStatus workflow_status = 16;
int64 last_first_event_txn_id = 17;
}

message ResetStickyTaskQueueRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ message WorkflowExecutionInfo {
string first_execution_run_id = 55;
ExecutionStats execution_stats = 56;
google.protobuf.Timestamp workflow_run_expiration_time = 57 [(gogoproto.stdtime) = true];
int64 last_history_node_txn_id = 58;
// Transaction Id of the first event in the last batch of events.
int64 last_first_event_txn_id = 58;
wxing1292 marked this conversation as resolved.
Show resolved Hide resolved
}

message ExecutionStats {
Expand Down
26 changes: 22 additions & 4 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
execution *commonpb.WorkflowExecution,
expectedNextEventID int64,
currentBranchToken []byte,
) ([]byte, string, int64, int64, bool, error) {
) ([]byte, string, int64, int64, int64, bool, error) {
response, err := wh.GetHistoryClient().PollMutableState(ctx, &historyservice.PollMutableStateRequest{
NamespaceId: namespaceUUID,
Execution: execution,
Expand All @@ -522,13 +522,14 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
})

if err != nil {
return nil, "", 0, 0, false, err
return nil, "", 0, 0, 0, false, err
}
isWorkflowRunning := response.GetWorkflowStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING

return response.CurrentBranchToken,
response.Execution.GetRunId(),
response.GetLastFirstEventId(),
response.GetLastFirstEventTxnId(),
response.GetNextEventId(),
isWorkflowRunning,
nil
Expand All @@ -541,6 +542,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ

var runID string
lastFirstEventID := common.FirstEventID
lastFirstEventTxnID := int64(0)
var nextEventID int64
var isWorkflowRunning bool

Expand All @@ -562,7 +564,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
if !isCloseEventOnly {
queryNextEventID = continuationToken.GetNextEventId()
}
continuationToken.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
continuationToken.BranchToken, _, lastFirstEventID, lastFirstEventTxnID, nextEventID, isWorkflowRunning, err =
queryHistory(namespaceID, execution, queryNextEventID, continuationToken.BranchToken)
if err != nil {
return nil, err
Expand All @@ -576,7 +578,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
if !isCloseEventOnly {
queryNextEventID = common.FirstEventID
}
continuationToken.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
continuationToken.BranchToken, runID, lastFirstEventID, lastFirstEventTxnID, nextEventID, isWorkflowRunning, err =
queryHistory(namespaceID, execution, queryNextEventID, nil)
if err != nil {
return nil, err
Expand All @@ -590,6 +592,22 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
continuationToken.IsWorkflowRunning = isWorkflowRunning
continuationToken.PersistenceToken = nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you list potential functions that might return *serviceerror.DataLoss here. And also some sort of TODO comment to move this close to history store.

// TODO below is a temporal solution to guard against invalid event batch
// when data inconsistency occurs
// long term solution should check event batch pointing backwards within history store
defer func() {
// lastFirstEventTxnID != 0 exists due to forward / backward compatibility
if _, ok := retError.(*serviceerror.DataLoss); ok && lastFirstEventTxnID != 0 {
_, _ = wh.GetHistoryManager().TrimHistoryBranch(&persistence.TrimHistoryBranchRequest{
ShardID: common.WorkflowIDToHistoryShard(namespaceID, execution.GetWorkflowId(), wh.config.NumHistoryShards),
BranchToken: continuationToken.BranchToken,
NodeID: lastFirstEventID,
TransactionID: lastFirstEventTxnID,
})
}
}()

rawHistoryQueryEnabled := wh.config.SendRawWorkflowHistory(request.GetNamespace())

history := &historypb.History{}
Expand Down
3 changes: 3 additions & 0 deletions service/history/events/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type (
Notification struct {
ID definition.WorkflowIdentifier
LastFirstEventID int64
LastFirstEventTxnID int64
NextEventID int64
PreviousStartedEventID int64
Timestamp time.Time
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewNotification(
namespaceID string,
workflowExecution *commonpb.WorkflowExecution,
lastFirstEventID int64,
lastFirstEventTxnID int64,
nextEventID int64,
previousStartedEventID int64,
currentBranchToken []byte,
Expand All @@ -104,6 +106,7 @@ func NewNotification(
workflowExecution.GetRunId(),
),
LastFirstEventID: lastFirstEventID,
LastFirstEventTxnID: lastFirstEventTxnID,
NextEventID: nextEventID,
PreviousStartedEventID: previousStartedEventID,
CurrentBranchToken: currentBranchToken,
Expand Down
6 changes: 4 additions & 2 deletions service/history/events/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ func (s *notifierSuite) TestSingleSubscriberWatchingEvents() {
RunId: "run ID",
}
lastFirstEventID := int64(3)
lastFirstEventTxnID := int64(398)
previousStartedEventID := int64(5)
nextEventID := int64(18)
workflowState := enumsspb.WORKFLOW_EXECUTION_STATE_CREATED
workflowStatus := enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING
branchToken := make([]byte, 0)
historyEvent := NewNotification(namespaceID, execution, lastFirstEventID, nextEventID, previousStartedEventID, branchToken, workflowState, workflowStatus)
historyEvent := NewNotification(namespaceID, execution, lastFirstEventID, lastFirstEventTxnID, nextEventID, previousStartedEventID, branchToken, workflowState, workflowStatus)
timerChan := time.NewTimer(time.Second * 2).C

subscriberID, channel, err := s.notifier.WatchHistoryEvent(definition.NewWorkflowIdentifier(namespaceID, execution.GetWorkflowId(), execution.GetRunId()))
Expand Down Expand Up @@ -121,12 +122,13 @@ func (s *notifierSuite) TestMultipleSubscriberWatchingEvents() {
}

lastFirstEventID := int64(3)
lastFirstEventTxnID := int64(3980)
previousStartedEventID := int64(5)
nextEventID := int64(18)
workflowState := enumsspb.WORKFLOW_EXECUTION_STATE_CREATED
workflowStatus := enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING
branchToken := make([]byte, 0)
historyEvent := NewNotification(namespaceID, execution, lastFirstEventID, nextEventID, previousStartedEventID, branchToken, workflowState, workflowStatus)
historyEvent := NewNotification(namespaceID, execution, lastFirstEventID, lastFirstEventTxnID, nextEventID, previousStartedEventID, branchToken, workflowState, workflowStatus)
timerChan := time.NewTimer(time.Second * 5).C

subscriberCount := 100
Expand Down
9 changes: 7 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ func (e *historyEngineImpl) PollMutableState(
NamespaceId: request.GetNamespaceId(),
Execution: request.Execution,
ExpectedNextEventId: request.ExpectedNextEventId,
CurrentBranchToken: request.CurrentBranchToken})
CurrentBranchToken: request.CurrentBranchToken,
})

if err != nil {
return nil, err
Expand All @@ -657,6 +658,7 @@ func (e *historyEngineImpl) PollMutableState(
NextEventId: response.NextEventId,
PreviousStartedEventId: response.PreviousStartedEventId,
LastFirstEventId: response.LastFirstEventId,
LastFirstEventTxnId: response.LastFirstEventTxnId,
TaskQueue: response.TaskQueue,
StickyTaskQueue: response.StickyTaskQueue,
StickyTaskQueueScheduleToStartTimeout: response.StickyTaskQueueScheduleToStartTimeout,
Expand Down Expand Up @@ -730,6 +732,7 @@ func (e *historyEngineImpl) getMutableStateOrPolling(
select {
case event := <-channel:
response.LastFirstEventId = event.LastFirstEventID
response.LastFirstEventTxnId = event.LastFirstEventTxnID
response.NextEventId = event.NextEventID
response.PreviousStartedEventId = event.PreviousStartedEventID
response.WorkflowState = event.WorkflowState
Expand Down Expand Up @@ -1005,10 +1008,12 @@ func (e *historyEngineImpl) getMutableState(
executionInfo := mutableState.GetExecutionInfo()
execution.RunId = context.getExecution().RunId
workflowState, workflowStatus := mutableState.GetWorkflowStateStatus()
lastFirstEventID, lastFirstEventTxnID := mutableState.GetLastFirstEventIDTxnID()
retResp = &historyservice.GetMutableStateResponse{
Execution: &execution,
WorkflowType: &commonpb.WorkflowType{Name: executionInfo.WorkflowTypeName},
LastFirstEventId: mutableState.GetLastFirstEventID(),
LastFirstEventId: lastFirstEventID,
LastFirstEventTxnId: lastFirstEventTxnID,
NextEventId: mutableState.GetNextEventID(),
PreviousStartedEventId: mutableState.GetPreviousStartedEventID(),
TaskQueue: &taskqueuepb.TaskQueue{
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func (s *engineSuite) TestGetMutableStateLongPoll_CurrentBranchChanged() {
"testNamespaceID",
newExecution,
int64(1),
int64(0),
int64(4),
int64(1),
[]byte{1},
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type (
GetExecutionState() *persistencespb.WorkflowExecutionState
GetInFlightWorkflowTask() (*workflowTaskInfo, bool)
GetPendingWorkflowTask() (*workflowTaskInfo, bool)
GetLastFirstEventID() int64
GetLastFirstEventIDTxnID() (int64, int64)
GetLastWriteVersion() (int64, error)
GetNextEventID() int64
GetPreviousStartedEventID() int64
Expand Down
21 changes: 6 additions & 15 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,10 +1128,10 @@ func (e *mutableStateBuilder) ClearStickyness() {
e.executionInfo.StickyScheduleToStartTimeout = timestamp.DurationFromSeconds(0)
}

// GetLastFirstEventID returns last first event ID
// GetLastFirstEventIDTxnID returns last first event ID and corresponding transaction ID
// first event ID is the ID of a batch of events in a single history events record
func (e *mutableStateBuilder) GetLastFirstEventID() int64 {
return e.executionInfo.LastFirstEventId
func (e *mutableStateBuilder) GetLastFirstEventIDTxnID() (int64, int64) {
return e.executionInfo.LastFirstEventId, e.executionInfo.LastFirstEventTxnId
}

// GetNextEventID returns next event ID
Expand Down Expand Up @@ -3614,9 +3614,7 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation(

if len(workflowEventsSeq) > 0 {
lastEvents := workflowEventsSeq[len(workflowEventsSeq)-1].Events
firstEvent := lastEvents[0]
lastEvent := lastEvents[len(lastEvents)-1]
e.updateWithLastFirstEvent(firstEvent)
if err := e.updateWithLastWriteEvent(
lastEvent,
transactionPolicy,
Expand Down Expand Up @@ -3707,9 +3705,7 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot(

if len(workflowEventsSeq) > 0 {
lastEvents := workflowEventsSeq[len(workflowEventsSeq)-1].Events
firstEvent := lastEvents[0]
lastEvent := lastEvents[len(lastEvents)-1]
e.updateWithLastFirstEvent(firstEvent)
if err := e.updateWithLastWriteEvent(
lastEvent,
transactionPolicy,
Expand Down Expand Up @@ -3884,12 +3880,13 @@ func (e *mutableStateBuilder) prepareEventsAndReplicationTasks(
WorkflowID: e.executionInfo.WorkflowId,
RunID: e.executionState.RunId,
BranchToken: currentBranchToken,
PrevTxnID: e.executionInfo.LastHistoryNodeTxnId,
PrevTxnID: e.executionInfo.LastFirstEventTxnId,
TxnID: historyNodeTxnIDs[index],
Events: eventBatch,
}
e.GetExecutionInfo().LastEventTaskId = eventBatch[len(eventBatch)-1].GetTaskId()
e.executionInfo.LastHistoryNodeTxnId = historyNodeTxnIDs[index]
e.executionInfo.LastFirstEventId = eventBatch[0].GetEventId()
e.executionInfo.LastFirstEventTxnId = historyNodeTxnIDs[index]
}

if err := e.validateNoEventsAfterWorkflowFinish(
Expand Down Expand Up @@ -4021,12 +4018,6 @@ func (e *mutableStateBuilder) updateWithLastWriteEvent(
return nil
}

func (e *mutableStateBuilder) updateWithLastFirstEvent(
lastFirstEvent *historypb.HistoryEvent,
) {
e.GetExecutionInfo().LastFirstEventId = lastFirstEvent.GetEventId()
}

func (e *mutableStateBuilder) canReplicateEvents() bool {
return e.namespaceEntry.GetReplicationPolicy() == cache.ReplicationPolicyMultiCluster
}
Expand Down
15 changes: 8 additions & 7 deletions service/history/mutableState_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,9 +729,9 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple
mutableState.executionInfo.LastUpdateTime = input.UpdateWorkflowMutation.ExecutionInfo.LastUpdateTime
input.RangeID = 0
input.UpdateWorkflowMutation.ExecutionInfo.LastEventTaskId = 0
input.UpdateWorkflowMutation.ExecutionInfo.LastHistoryNodeTxnId = 0
input.UpdateWorkflowMutation.ExecutionInfo.LastFirstEventTxnId = 0
mutableState.executionInfo.LastEventTaskId = 0
mutableState.executionInfo.LastHistoryNodeTxnId = 0
mutableState.executionInfo.LastFirstEventTxnId = 0
mutableState.executionInfo.WorkflowTaskOriginalScheduledTime = input.UpdateWorkflowMutation.ExecutionInfo.WorkflowTaskOriginalScheduledTime
mutableState.executionInfo.ExecutionStats = &persistencespb.ExecutionStats{}

Expand Down
8 changes: 6 additions & 2 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,12 @@ func (c *workflowExecutionContextImpl) conflictResolveWorkflowExecution(

workflowState, workflowStatus := resetMutableState.GetWorkflowStateStatus()
// Current branch changed and notify the watchers
lastFirstEventID, lastFirstEventTxnID := resetMutableState.GetLastFirstEventIDTxnID()
c.engine.NotifyNewHistoryEvent(events.NewNotification(
c.namespaceID,
&c.workflowExecution,
resetMutableState.GetLastFirstEventID(),
lastFirstEventID,
lastFirstEventTxnID,
resetMutableState.GetNextEventID(),
resetMutableState.GetPreviousStartedEventID(),
currentBranchToken,
Expand Down Expand Up @@ -771,10 +773,12 @@ func (c *workflowExecutionContextImpl) updateWorkflowExecutionWithNew(
return err
}
workflowState, workflowStatus := c.mutableState.GetWorkflowStateStatus()
lastFirstEventID, lastFirstEventTxnID := c.mutableState.GetLastFirstEventIDTxnID()
c.engine.NotifyNewHistoryEvent(events.NewNotification(
c.namespaceID,
&c.workflowExecution,
c.mutableState.GetLastFirstEventID(),
lastFirstEventID,
lastFirstEventTxnID,
c.mutableState.GetNextEventID(),
c.mutableState.GetPreviousStartedEventID(),
currentBranchToken,
Expand Down