diff --git a/service/history/replicationTaskExecutor.go b/service/history/replicationTaskExecutor.go index fef8690762be..8931b8819991 100644 --- a/service/history/replicationTaskExecutor.go +++ b/service/history/replicationTaskExecutor.go @@ -155,7 +155,7 @@ func (e *replicationTaskExecutorImpl) handleActivityTask( stopwatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByActivityReplicationScope, metrics.ClientLatency) defer stopwatch.Stop() - if resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory( + resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory( namespace.ID(retryErr.NamespaceId), retryErr.WorkflowId, retryErr.RunId, @@ -163,9 +163,16 @@ func (e *replicationTaskExecutorImpl) handleActivityTask( retryErr.StartEventVersion, retryErr.EndEventId, retryErr.EndEventVersion, - ); resendErr != nil { + ) + switch resendErr.(type) { + case *serviceerror.NotFound: + // TODO" cleanup history + return nil + case nil: + //no-op + default: e.logger.Error("error resend history for history event", tag.Error(resendErr)) - return e.handleResendError(ctx, retryErr.NamespaceId, retryErr.WorkflowId, retryErr.RunId, resendErr) + return err } return e.historyEngine.SyncActivity(ctx, request) @@ -212,7 +219,7 @@ func (e *replicationTaskExecutorImpl) handleHistoryReplicationTask( resendStopWatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByHistoryReplicationScope, metrics.ClientLatency) defer resendStopWatch.Stop() - if resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory( + resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory( namespace.ID(retryErr.NamespaceId), retryErr.WorkflowId, retryErr.RunId, @@ -220,9 +227,16 @@ func (e *replicationTaskExecutorImpl) handleHistoryReplicationTask( retryErr.StartEventVersion, retryErr.EndEventId, retryErr.EndEventVersion, - ); resendErr != nil { + ) + switch resendErr.(type) { + case *serviceerror.NotFound: + // TODO: cleanup workflow history + return nil + case nil: + //no-op + default: e.logger.Error("error resend history for history event", tag.Error(resendErr)) - return e.handleResendError(ctx, retryErr.NamespaceId, retryErr.WorkflowId, retryErr.RunId, resendErr) + return err } return e.historyEngine.ReplicateEventsV2(ctx, request) @@ -256,28 +270,3 @@ FilterLoop: } return shouldProcessTask, nil } - -func (e *replicationTaskExecutorImpl) handleResendError( - ctx context.Context, - namespaceID string, - workflowID string, - runID string, - err error, -) error { - // clean up local workflow history if workflow does not exist in source cluster - switch err.(type) { - case *serviceerror.NotFound: - cleanupErr := e.historyEngine.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{ - NamespaceId: namespaceID, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - }) - if _, ok := cleanupErr.(*serviceerror.NotFound); ok { - return nil - } - return cleanupErr - } - return err -}