Skip to content

Commit

Permalink
Bugfix: ImportWorkflow should always clear mutable state cache (#5039)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
* ImportWorkflow should always clear "cached" mutable state since 
  * shared mutable state will at most be used once
  * shared mutable state may contain changes not persisted to DB

<!-- Tell your future self why have you made these changes -->
**Why?**
N/A

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
N/A

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
N/A

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
N/A
  • Loading branch information
wxing1292 committed Oct 26, 2023
1 parent 949b0f9 commit 1b0ec47
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
11 changes: 5 additions & 6 deletions service/history/ndc/history_importer.go
Expand Up @@ -145,12 +145,11 @@ func (r *HistoryImporterImpl) ImportWorkflow(
return nil, err
}
defer func() {
if rec := recover(); rec != nil {
ndcWorkflow.GetReleaseFn()(errPanic)
panic(rec)
} else {
ndcWorkflow.GetReleaseFn()(retError)
}
// it is ok to clear everytime this function is invoked
// mutable state will be at most initialized once from shard mutable state cache
// mutable state will be usually initialized from input token
ndcWorkflow.GetContext().Clear()
ndcWorkflow.GetReleaseFn()(retError)
}()

if len(eventsSlice) != 0 {
Expand Down
20 changes: 10 additions & 10 deletions tests/ndc/ndc_test.go
Expand Up @@ -2220,17 +2220,17 @@ func (s *nDCFunctionalTestSuite) importEvents(
resp, err := historyClient.ImportWorkflowExecution(s.newContext(), req)
s.NoError(err, "Failed to import history event")
token = resp.Token
}

if verifyWorkflowNotExists {
_, err := historyClient.GetMutableState(s.newContext(), &historyservice.GetMutableStateRequest{
NamespaceId: s.namespaceID.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
})
s.IsType(&serviceerror.NotFound{}, err)
}
if verifyWorkflowNotExists {
_, err := historyClient.GetMutableState(s.newContext(), &historyservice.GetMutableStateRequest{
NamespaceId: s.namespaceID.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
})
s.IsType(&serviceerror.NotFound{}, err)
}

req := &historyservice.ImportWorkflowExecutionRequest{
Expand Down

0 comments on commit 1b0ec47

Please sign in to comment.