Skip to content

Commit

Permalink
Include more API to Cassandra read after write inconsistency protecti…
Browse files Browse the repository at this point in the history
…on (#2366)

* Include PollWorkflowTask API to Cassandra read after write inconsistency protection
* Update GetWorkflowExecutionHistory to use common history node trim logic
  • Loading branch information
wxing1292 authored and yiminc committed Jan 12, 2022
1 parent 72aba0e commit 1a3cf49
Showing 1 changed file with 50 additions and 15 deletions.
65 changes: 50 additions & 15 deletions service/frontend/workflowHandler.go
Expand Up @@ -499,7 +499,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
execution *commonpb.WorkflowExecution,
expectedNextEventID int64,
currentBranchToken []byte,
) ([]byte, string, int64, int64, int64, bool, error) {
) ([]byte, string, int64, int64, bool, error) {
response, err := wh.historyClient.PollMutableState(ctx, &historyservice.PollMutableStateRequest{
NamespaceId: namespaceUUID.String(),
Execution: execution,
Expand All @@ -508,14 +508,13 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
})

if err != nil {
return nil, "", 0, 0, 0, false, err
return nil, "", 0, 0, false, err
}
isWorkflowRunning := response.GetWorkflowStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING

return response.CurrentBranchToken,
response.Execution.GetRunId(),
response.GetLastFirstEventId(),
response.GetLastFirstEventTxnId(),
response.GetNextEventId(),
isWorkflowRunning,
nil
Expand All @@ -528,7 +527,6 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ

var runID string
lastFirstEventID := common.FirstEventID
lastFirstEventTxnID := int64(0)
var nextEventID int64
var isWorkflowRunning bool

Expand All @@ -550,7 +548,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
if !isCloseEventOnly {
queryNextEventID = continuationToken.GetNextEventId()
}
continuationToken.BranchToken, _, lastFirstEventID, lastFirstEventTxnID, nextEventID, isWorkflowRunning, err =
continuationToken.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(namespaceID, execution, queryNextEventID, continuationToken.BranchToken)
if err != nil {
return nil, err
Expand All @@ -564,7 +562,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
if !isCloseEventOnly {
queryNextEventID = common.FirstEventID
}
continuationToken.BranchToken, runID, lastFirstEventID, lastFirstEventTxnID, nextEventID, isWorkflowRunning, err =
continuationToken.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(namespaceID, execution, queryNextEventID, nil)
if err != nil {
return nil, err
Expand All @@ -583,14 +581,8 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ
// when data inconsistency occurs
// long term solution should check event batch pointing backwards within history store
defer func() {
// lastFirstEventTxnID != 0 exists due to forward / backward compatibility
if _, ok := retError.(*serviceerror.DataLoss); ok && lastFirstEventTxnID != 0 {
_, _ = wh.persistenceExecutionManager.TrimHistoryBranch(&persistence.TrimHistoryBranchRequest{
ShardID: common.WorkflowIDToHistoryShard(namespaceID.String(), execution.GetWorkflowId(), wh.config.NumHistoryShards),
BranchToken: continuationToken.BranchToken,
NodeID: lastFirstEventID,
TransactionID: lastFirstEventTxnID,
})
if _, ok := retError.(*serviceerror.DataLoss); ok {
wh.trimHistoryNode(ctx, namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId())
}
}()

Expand Down Expand Up @@ -3024,7 +3016,7 @@ func (wh *WorkflowHandler) createPollWorkflowTaskQueueResponse(
namespaceID namespace.ID,
matchingResp *matchingservice.PollWorkflowTaskQueueResponse,
branchToken []byte,
) (*workflowservice.PollWorkflowTaskQueueResponse, error) {
) (_ *workflowservice.PollWorkflowTaskQueueResponse, retError error) {

if matchingResp.WorkflowExecution == nil {
// this will happen if there is no workflow task to be send to worker / caller
Expand Down Expand Up @@ -3060,6 +3052,15 @@ func (wh *WorkflowHandler) createPollWorkflowTaskQueueResponse(
if dErr != nil {
return nil, dErr
}

// TODO below is a temporal solution to guard against invalid event batch
// when data inconsistency occurs
// long term solution should check event batch pointing backwards within history store
defer func() {
if _, ok := retError.(*serviceerror.DataLoss); ok {
wh.trimHistoryNode(ctx, namespaceID.String(), matchingResp.WorkflowExecution.GetWorkflowId(), matchingResp.WorkflowExecution.GetRunId())
}
}()
history, persistenceToken, err = wh.getHistory(
wh.metricsScope(ctx),
namespaceID,
Expand Down Expand Up @@ -3431,3 +3432,37 @@ func (wh *WorkflowHandler) validateUTF8String(
}
return nil
}

func (wh *WorkflowHandler) trimHistoryNode(
ctx context.Context,
namespaceID string,
workflowID string,
runID string,
) {
response, err := wh.historyClient.GetMutableState(ctx, &historyservice.GetMutableStateRequest{
NamespaceId: namespaceID,
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
})
if err != nil {
return // abort
}

_, err = wh.persistenceExecutionManager.TrimHistoryBranch(&persistence.TrimHistoryBranchRequest{
ShardID: common.WorkflowIDToHistoryShard(namespaceID, workflowID, wh.config.NumHistoryShards),
BranchToken: response.CurrentBranchToken,
NodeID: response.GetLastFirstEventId(),
TransactionID: response.GetLastFirstEventTxnId(),
})
if err != nil {
// best effort
wh.logger.Error("unable to trim history branch",
tag.WorkflowNamespaceID(namespaceID),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.Error(err),
)
}
}

0 comments on commit 1a3cf49

Please sign in to comment.