Skip to content

Commit

Permalink
it should clean up history
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed Mar 20, 2022
1 parent 3e43886 commit 5dc982a
Showing 1 changed file with 20 additions and 31 deletions.
51 changes: 20 additions & 31 deletions service/history/replicationTaskExecutor.go
Expand Up @@ -155,17 +155,24 @@ func (e *replicationTaskExecutorImpl) handleActivityTask(
stopwatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByActivityReplicationScope, metrics.ClientLatency)
defer stopwatch.Stop()

if resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
namespace.ID(retryErr.NamespaceId),
retryErr.WorkflowId,
retryErr.RunId,
retryErr.StartEventId,
retryErr.StartEventVersion,
retryErr.EndEventId,
retryErr.EndEventVersion,
); resendErr != nil {
)
switch resendErr.(type) {
case *serviceerror.NotFound:
// TODO" cleanup history
return nil
case nil:
//no-op
default:
e.logger.Error("error resend history for history event", tag.Error(resendErr))
return e.handleResendError(ctx, retryErr.NamespaceId, retryErr.WorkflowId, retryErr.RunId, resendErr)
return err
}
return e.historyEngine.SyncActivity(ctx, request)

Expand Down Expand Up @@ -212,17 +219,24 @@ func (e *replicationTaskExecutorImpl) handleHistoryReplicationTask(
resendStopWatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByHistoryReplicationScope, metrics.ClientLatency)
defer resendStopWatch.Stop()

if resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
namespace.ID(retryErr.NamespaceId),
retryErr.WorkflowId,
retryErr.RunId,
retryErr.StartEventId,
retryErr.StartEventVersion,
retryErr.EndEventId,
retryErr.EndEventVersion,
); resendErr != nil {
)
switch resendErr.(type) {
case *serviceerror.NotFound:
// TODO: cleanup workflow history
return nil
case nil:
//no-op
default:
e.logger.Error("error resend history for history event", tag.Error(resendErr))
return e.handleResendError(ctx, retryErr.NamespaceId, retryErr.WorkflowId, retryErr.RunId, resendErr)
return err
}

return e.historyEngine.ReplicateEventsV2(ctx, request)
Expand Down Expand Up @@ -256,28 +270,3 @@ FilterLoop:
}
return shouldProcessTask, nil
}

func (e *replicationTaskExecutorImpl) handleResendError(
ctx context.Context,
namespaceID string,
workflowID string,
runID string,
err error,
) error {
// clean up local workflow history if workflow does not exist in source cluster
switch err.(type) {
case *serviceerror.NotFound:
cleanupErr := e.historyEngine.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{
NamespaceId: namespaceID,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
})
if _, ok := cleanupErr.(*serviceerror.NotFound); ok {
return nil
}
return cleanupErr
}
return err
}

0 comments on commit 5dc982a

Please sign in to comment.