Skip to content

Commit

Permalink
Add additional service protection against data inconsistency (#1450)
Browse files Browse the repository at this point in the history
* Trigger TrimHistoryBranch operation if GetWorkflowExecutionHistory sees dataloss error
* Rename last_history_node_txn_id to last_first_event_txn_id
  • Loading branch information
wxing1292 committed Apr 15, 2021
1 parent 5d924f4 commit 57d9851
Show file tree
Hide file tree
Showing 15 changed files with 600 additions and 484 deletions.
551 changes: 318 additions & 233 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

428 changes: 214 additions & 214 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion common/persistence/history_store.go
Expand Up @@ -55,6 +55,9 @@ type (
const (
defaultLastNodeID = common.FirstEventID - 1
defaultLastTransactionID = int64(0)

// TrimHistoryBranch will only dump metadata, relatively cheap
trimHistoryBranchPageSize = 1000
)

var _ HistoryManager = (*historyV2ManagerImpl)(nil)
Expand Down Expand Up @@ -145,7 +148,7 @@ func (m *historyV2ManagerImpl) TrimHistoryBranch(
shardID := request.ShardID
minNodeID := common.FirstEventID
maxNodeID := request.NodeID + 1
pageSize := 1000
pageSize := trimHistoryBranchPageSize

branch, err := serialization.HistoryBranchFromBlob(request.BranchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
Expand Down
Expand Up @@ -93,6 +93,7 @@ message GetMutableStateResponse {
temporal.api.enums.v1.WorkflowExecutionStatus workflow_status = 16;
temporal.server.api.history.v1.VersionHistories version_histories = 17;
bool is_sticky_task_queue_enabled = 18;
int64 last_first_event_txn_id = 19;
}

message PollMutableStateRequest {
Expand Down Expand Up @@ -121,6 +122,7 @@ message PollMutableStateResponse {
temporal.server.api.history.v1.VersionHistories version_histories = 14;
temporal.server.api.enums.v1.WorkflowExecutionState workflow_state = 15;
temporal.api.enums.v1.WorkflowExecutionStatus workflow_status = 16;
int64 last_first_event_txn_id = 17;
}

message ResetStickyTaskQueueRequest {
Expand Down
Expand Up @@ -120,7 +120,8 @@ message WorkflowExecutionInfo {
string first_execution_run_id = 55;
ExecutionStats execution_stats = 56;
google.protobuf.Timestamp workflow_run_expiration_time = 57 [(gogoproto.stdtime) = true];
int64 last_history_node_txn_id = 58;
// Transaction Id of the first event in the last batch of events.
int64 last_first_event_txn_id = 58;
}

message ExecutionStats {
Expand Down
26 changes: 22 additions & 4 deletions service/frontend/workflowHandler.go
Expand Up @@ -513,7 +513,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
execution *commonpb.WorkflowExecution,
expectedNextEventID int64,
currentBranchToken []byte,
) ([]byte, string, int64, int64, bool, error) {
) ([]byte, string, int64, int64, int64, bool, error) {
response, err := wh.GetHistoryClient().PollMutableState(ctx, &historyservice.PollMutableStateRequest{
NamespaceId: namespaceUUID,
Execution: execution,
Expand All @@ -522,13 +522,14 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
})

if err != nil {
return nil, "", 0, 0, false, err
return nil, "", 0, 0, 0, false, err
}
isWorkflowRunning := response.GetWorkflowStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING

return response.CurrentBranchToken,
response.Execution.GetRunId(),
response.GetLastFirstEventId(),
response.GetLastFirstEventTxnId(),
response.GetNextEventId(),
isWorkflowRunning,
nil
Expand All @@ -541,6 +542,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ

var runID string
lastFirstEventID := common.FirstEventID
lastFirstEventTxnID := int64(0)
var nextEventID int64
var isWorkflowRunning bool

Expand All @@ -562,7 +564,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
if !isCloseEventOnly {
queryNextEventID = continuationToken.GetNextEventId()
}
continuationToken.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
continuationToken.BranchToken, _, lastFirstEventID, lastFirstEventTxnID, nextEventID, isWorkflowRunning, err =
queryHistory(namespaceID, execution, queryNextEventID, continuationToken.BranchToken)
if err != nil {
return nil, err
Expand All @@ -576,7 +578,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
if !isCloseEventOnly {
queryNextEventID = common.FirstEventID
}
continuationToken.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
continuationToken.BranchToken, runID, lastFirstEventID, lastFirstEventTxnID, nextEventID, isWorkflowRunning, err =
queryHistory(namespaceID, execution, queryNextEventID, nil)
if err != nil {
return nil, err
Expand All @@ -590,6 +592,22 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
continuationToken.IsWorkflowRunning = isWorkflowRunning
continuationToken.PersistenceToken = nil
}

// TODO below is a temporal solution to guard against invalid event batch
// when data inconsistency occurs
// long term solution should check event batch pointing backwards within history store
defer func() {
// lastFirstEventTxnID != 0 exists due to forward / backward compatibility
if _, ok := retError.(*serviceerror.DataLoss); ok && lastFirstEventTxnID != 0 {
_, _ = wh.GetHistoryManager().TrimHistoryBranch(&persistence.TrimHistoryBranchRequest{
ShardID: common.WorkflowIDToHistoryShard(namespaceID, execution.GetWorkflowId(), wh.config.NumHistoryShards),
BranchToken: continuationToken.BranchToken,
NodeID: lastFirstEventID,
TransactionID: lastFirstEventTxnID,
})
}
}()

rawHistoryQueryEnabled := wh.config.SendRawWorkflowHistory(request.GetNamespace())

history := &historypb.History{}
Expand Down
3 changes: 3 additions & 0 deletions service/history/events/notifier.go
Expand Up @@ -56,6 +56,7 @@ type (
Notification struct {
ID definition.WorkflowIdentifier
LastFirstEventID int64
LastFirstEventTxnID int64
NextEventID int64
PreviousStartedEventID int64
Timestamp time.Time
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewNotification(
namespaceID string,
workflowExecution *commonpb.WorkflowExecution,
lastFirstEventID int64,
lastFirstEventTxnID int64,
nextEventID int64,
previousStartedEventID int64,
currentBranchToken []byte,
Expand All @@ -104,6 +106,7 @@ func NewNotification(
workflowExecution.GetRunId(),
),
LastFirstEventID: lastFirstEventID,
LastFirstEventTxnID: lastFirstEventTxnID,
NextEventID: nextEventID,
PreviousStartedEventID: previousStartedEventID,
CurrentBranchToken: currentBranchToken,
Expand Down
6 changes: 4 additions & 2 deletions service/history/events/notifier_test.go
Expand Up @@ -88,12 +88,13 @@ func (s *notifierSuite) TestSingleSubscriberWatchingEvents() {
RunId: "run ID",
}
lastFirstEventID := int64(3)
lastFirstEventTxnID := int64(398)
previousStartedEventID := int64(5)
nextEventID := int64(18)
workflowState := enumsspb.WORKFLOW_EXECUTION_STATE_CREATED
workflowStatus := enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING
branchToken := make([]byte, 0)
historyEvent := NewNotification(namespaceID, execution, lastFirstEventID, nextEventID, previousStartedEventID, branchToken, workflowState, workflowStatus)
historyEvent := NewNotification(namespaceID, execution, lastFirstEventID, lastFirstEventTxnID, nextEventID, previousStartedEventID, branchToken, workflowState, workflowStatus)
timerChan := time.NewTimer(time.Second * 2).C

subscriberID, channel, err := s.notifier.WatchHistoryEvent(definition.NewWorkflowIdentifier(namespaceID, execution.GetWorkflowId(), execution.GetRunId()))
Expand Down Expand Up @@ -121,12 +122,13 @@ func (s *notifierSuite) TestMultipleSubscriberWatchingEvents() {
}

lastFirstEventID := int64(3)
lastFirstEventTxnID := int64(3980)
previousStartedEventID := int64(5)
nextEventID := int64(18)
workflowState := enumsspb.WORKFLOW_EXECUTION_STATE_CREATED
workflowStatus := enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING
branchToken := make([]byte, 0)
historyEvent := NewNotification(namespaceID, execution, lastFirstEventID, nextEventID, previousStartedEventID, branchToken, workflowState, workflowStatus)
historyEvent := NewNotification(namespaceID, execution, lastFirstEventID, lastFirstEventTxnID, nextEventID, previousStartedEventID, branchToken, workflowState, workflowStatus)
timerChan := time.NewTimer(time.Second * 5).C

subscriberCount := 100
Expand Down
9 changes: 7 additions & 2 deletions service/history/historyEngine.go
Expand Up @@ -646,7 +646,8 @@ func (e *historyEngineImpl) PollMutableState(
NamespaceId: request.GetNamespaceId(),
Execution: request.Execution,
ExpectedNextEventId: request.ExpectedNextEventId,
CurrentBranchToken: request.CurrentBranchToken})
CurrentBranchToken: request.CurrentBranchToken,
})

if err != nil {
return nil, err
Expand All @@ -657,6 +658,7 @@ func (e *historyEngineImpl) PollMutableState(
NextEventId: response.NextEventId,
PreviousStartedEventId: response.PreviousStartedEventId,
LastFirstEventId: response.LastFirstEventId,
LastFirstEventTxnId: response.LastFirstEventTxnId,
TaskQueue: response.TaskQueue,
StickyTaskQueue: response.StickyTaskQueue,
StickyTaskQueueScheduleToStartTimeout: response.StickyTaskQueueScheduleToStartTimeout,
Expand Down Expand Up @@ -730,6 +732,7 @@ func (e *historyEngineImpl) getMutableStateOrPolling(
select {
case event := <-channel:
response.LastFirstEventId = event.LastFirstEventID
response.LastFirstEventTxnId = event.LastFirstEventTxnID
response.NextEventId = event.NextEventID
response.PreviousStartedEventId = event.PreviousStartedEventID
response.WorkflowState = event.WorkflowState
Expand Down Expand Up @@ -1005,10 +1008,12 @@ func (e *historyEngineImpl) getMutableState(
executionInfo := mutableState.GetExecutionInfo()
execution.RunId = context.getExecution().RunId
workflowState, workflowStatus := mutableState.GetWorkflowStateStatus()
lastFirstEventID, lastFirstEventTxnID := mutableState.GetLastFirstEventIDTxnID()
retResp = &historyservice.GetMutableStateResponse{
Execution: &execution,
WorkflowType: &commonpb.WorkflowType{Name: executionInfo.WorkflowTypeName},
LastFirstEventId: mutableState.GetLastFirstEventID(),
LastFirstEventId: lastFirstEventID,
LastFirstEventTxnId: lastFirstEventTxnID,
NextEventId: mutableState.GetNextEventID(),
PreviousStartedEventId: mutableState.GetPreviousStartedEventID(),
TaskQueue: &taskqueuepb.TaskQueue{
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine_test.go
Expand Up @@ -458,6 +458,7 @@ func (s *engineSuite) TestGetMutableStateLongPoll_CurrentBranchChanged() {
"testNamespaceID",
newExecution,
int64(1),
int64(0),
int64(4),
int64(1),
[]byte{1},
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableState.go
Expand Up @@ -138,7 +138,7 @@ type (
GetExecutionState() *persistencespb.WorkflowExecutionState
GetInFlightWorkflowTask() (*workflowTaskInfo, bool)
GetPendingWorkflowTask() (*workflowTaskInfo, bool)
GetLastFirstEventID() int64
GetLastFirstEventIDTxnID() (int64, int64)
GetLastWriteVersion() (int64, error)
GetNextEventID() int64
GetPreviousStartedEventID() int64
Expand Down
21 changes: 6 additions & 15 deletions service/history/mutableStateBuilder.go
Expand Up @@ -1128,10 +1128,10 @@ func (e *mutableStateBuilder) ClearStickyness() {
e.executionInfo.StickyScheduleToStartTimeout = timestamp.DurationFromSeconds(0)
}

// GetLastFirstEventID returns last first event ID
// GetLastFirstEventIDTxnID returns last first event ID and corresponding transaction ID
// first event ID is the ID of a batch of events in a single history events record
func (e *mutableStateBuilder) GetLastFirstEventID() int64 {
return e.executionInfo.LastFirstEventId
func (e *mutableStateBuilder) GetLastFirstEventIDTxnID() (int64, int64) {
return e.executionInfo.LastFirstEventId, e.executionInfo.LastFirstEventTxnId
}

// GetNextEventID returns next event ID
Expand Down Expand Up @@ -3614,9 +3614,7 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation(

if len(workflowEventsSeq) > 0 {
lastEvents := workflowEventsSeq[len(workflowEventsSeq)-1].Events
firstEvent := lastEvents[0]
lastEvent := lastEvents[len(lastEvents)-1]
e.updateWithLastFirstEvent(firstEvent)
if err := e.updateWithLastWriteEvent(
lastEvent,
transactionPolicy,
Expand Down Expand Up @@ -3707,9 +3705,7 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot(

if len(workflowEventsSeq) > 0 {
lastEvents := workflowEventsSeq[len(workflowEventsSeq)-1].Events
firstEvent := lastEvents[0]
lastEvent := lastEvents[len(lastEvents)-1]
e.updateWithLastFirstEvent(firstEvent)
if err := e.updateWithLastWriteEvent(
lastEvent,
transactionPolicy,
Expand Down Expand Up @@ -3884,12 +3880,13 @@ func (e *mutableStateBuilder) prepareEventsAndReplicationTasks(
WorkflowID: e.executionInfo.WorkflowId,
RunID: e.executionState.RunId,
BranchToken: currentBranchToken,
PrevTxnID: e.executionInfo.LastHistoryNodeTxnId,
PrevTxnID: e.executionInfo.LastFirstEventTxnId,
TxnID: historyNodeTxnIDs[index],
Events: eventBatch,
}
e.GetExecutionInfo().LastEventTaskId = eventBatch[len(eventBatch)-1].GetTaskId()
e.executionInfo.LastHistoryNodeTxnId = historyNodeTxnIDs[index]
e.executionInfo.LastFirstEventId = eventBatch[0].GetEventId()
e.executionInfo.LastFirstEventTxnId = historyNodeTxnIDs[index]
}

if err := e.validateNoEventsAfterWorkflowFinish(
Expand Down Expand Up @@ -4021,12 +4018,6 @@ func (e *mutableStateBuilder) updateWithLastWriteEvent(
return nil
}

func (e *mutableStateBuilder) updateWithLastFirstEvent(
lastFirstEvent *historypb.HistoryEvent,
) {
e.GetExecutionInfo().LastFirstEventId = lastFirstEvent.GetEventId()
}

func (e *mutableStateBuilder) canReplicateEvents() bool {
return e.namespaceEntry.GetReplicationPolicy() == cache.ReplicationPolicyMultiCluster
}
Expand Down
15 changes: 8 additions & 7 deletions service/history/mutableState_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Expand Up @@ -729,9 +729,9 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple
mutableState.executionInfo.LastUpdateTime = input.UpdateWorkflowMutation.ExecutionInfo.LastUpdateTime
input.RangeID = 0
input.UpdateWorkflowMutation.ExecutionInfo.LastEventTaskId = 0
input.UpdateWorkflowMutation.ExecutionInfo.LastHistoryNodeTxnId = 0
input.UpdateWorkflowMutation.ExecutionInfo.LastFirstEventTxnId = 0
mutableState.executionInfo.LastEventTaskId = 0
mutableState.executionInfo.LastHistoryNodeTxnId = 0
mutableState.executionInfo.LastFirstEventTxnId = 0
mutableState.executionInfo.WorkflowTaskOriginalScheduledTime = input.UpdateWorkflowMutation.ExecutionInfo.WorkflowTaskOriginalScheduledTime
mutableState.executionInfo.ExecutionStats = &persistencespb.ExecutionStats{}

Expand Down
8 changes: 6 additions & 2 deletions service/history/workflowExecutionContext.go
Expand Up @@ -555,10 +555,12 @@ func (c *workflowExecutionContextImpl) conflictResolveWorkflowExecution(

workflowState, workflowStatus := resetMutableState.GetWorkflowStateStatus()
// Current branch changed and notify the watchers
lastFirstEventID, lastFirstEventTxnID := resetMutableState.GetLastFirstEventIDTxnID()
c.engine.NotifyNewHistoryEvent(events.NewNotification(
c.namespaceID,
&c.workflowExecution,
resetMutableState.GetLastFirstEventID(),
lastFirstEventID,
lastFirstEventTxnID,
resetMutableState.GetNextEventID(),
resetMutableState.GetPreviousStartedEventID(),
currentBranchToken,
Expand Down Expand Up @@ -771,10 +773,12 @@ func (c *workflowExecutionContextImpl) updateWorkflowExecutionWithNew(
return err
}
workflowState, workflowStatus := c.mutableState.GetWorkflowStateStatus()
lastFirstEventID, lastFirstEventTxnID := c.mutableState.GetLastFirstEventIDTxnID()
c.engine.NotifyNewHistoryEvent(events.NewNotification(
c.namespaceID,
&c.workflowExecution,
c.mutableState.GetLastFirstEventID(),
lastFirstEventID,
lastFirstEventTxnID,
c.mutableState.GetNextEventID(),
c.mutableState.GetPreviousStartedEventID(),
currentBranchToken,
Expand Down

0 comments on commit 57d9851

Please sign in to comment.