Skip to content

Commit

Permalink
Release shard lock earlier during delete workflow execution (#3028)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jul 8, 2022
1 parent 4657f0f commit 6513af8
Showing 1 changed file with 38 additions and 33 deletions.
71 changes: 38 additions & 33 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,49 +955,54 @@ func (s *ContextImpl) DeleteWorkflowExecution(
}
}()

s.wLock()
defer s.wUnlock()
// Wrap step 1 and 2 with function to release the lock with defer after step 2.
err = func() error {
s.wLock()
defer s.wUnlock()

if err := s.errorByStateLocked(); err != nil {
return err
}
if err := s.errorByStateLocked(); err != nil {
return err
}

// Step 1. Delete visibility.
if deleteVisibilityRecord {
// TODO: move to existing task generator logic
newTasks = map[tasks.Category][]tasks.Task{
tasks.CategoryVisibility: {
&tasks.DeleteExecutionVisibilityTask{
// TaskID is set by addTasksLocked
WorkflowKey: key,
VisibilityTimestamp: s.timeSource.Now(),
StartTime: startTime,
CloseTime: closeTime,
// Step 1. Delete visibility.
if deleteVisibilityRecord {
// TODO: move to existing task generator logic
newTasks = map[tasks.Category][]tasks.Task{
tasks.CategoryVisibility: {
&tasks.DeleteExecutionVisibilityTask{
// TaskID is set by addTasksLocked
WorkflowKey: key,
VisibilityTimestamp: s.timeSource.Now(),
StartTime: startTime,
CloseTime: closeTime,
},
},
},
}
addTasksRequest := &persistence.AddHistoryTasksRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
WorkflowID: key.WorkflowID,
RunID: key.RunID,

Tasks: newTasks,
}
err = s.addTasksLocked(ctx, addTasksRequest, namespaceEntry)
if err != nil {
return err
}
}
addTasksRequest := &persistence.AddHistoryTasksRequest{

// Step 2. Delete current workflow execution pointer.
delCurRequest := &persistence.DeleteCurrentWorkflowExecutionRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
WorkflowID: key.WorkflowID,
RunID: key.RunID,

Tasks: newTasks,
}
err = s.addTasksLocked(ctx, addTasksRequest, namespaceEntry)
if err != nil {
return err
}
}
err = s.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, delCurRequest)
return err
}()

// Step 2. Delete current workflow execution pointer.
delCurRequest := &persistence.DeleteCurrentWorkflowExecutionRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
WorkflowID: key.WorkflowID,
RunID: key.RunID,
}
err = s.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, delCurRequest)
if err != nil {
return err
}
Expand Down

0 comments on commit 6513af8

Please sign in to comment.