Skip to content

Commit

Permalink
Make shard ownership more resilient when shard is busy (#4283)
Browse files Browse the repository at this point in the history
* Do timeout check after shard lock acquired, not before
  • Loading branch information
wxing1292 committed May 5, 2023
1 parent 0bdbbfc commit 9fdd08c
Showing 1 changed file with 52 additions and 44 deletions.
96 changes: 52 additions & 44 deletions service/history/shard/context_impl.go
Expand Up @@ -467,53 +467,51 @@ func (s *ContextImpl) AddTasks(
ctx context.Context,
request *persistence.AddHistoryTasksRequest,
) error {
ctx, cancel, err := s.newDetachedContext(ctx)
engine, err := s.GetEngine(ctx)
if err != nil {
return err
}
defer cancel()
err = s.addTasksWithoutNotification(ctx, request)
if OperationPossiblySucceeded(err) {
engine.NotifyNewTasks(request.Tasks)
}
return err
}

func (s *ContextImpl) addTasksWithoutNotification(
ctx context.Context,
request *persistence.AddHistoryTasksRequest,
) error {
// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.NamespaceID)
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
if err != nil {
return err
}

engine, err := s.GetEngine(ctx)
s.wLock()
defer s.wUnlock()

// timeout check should be done within the shard lock, in case of shard lock contention
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return err
}
defer cancel()

s.wLock()
if err := s.errorByState(); err != nil {
s.wUnlock()
return err
}
if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil {
s.wUnlock()
return err
}
err = s.addTasksLocked(ctx, request, namespaceEntry)
s.wUnlock()

if OperationPossiblySucceeded(err) {
engine.NotifyNewTasks(request.Tasks)
}

return err
return s.addTasksLocked(ctx, request, namespaceEntry)
}

func (s *ContextImpl) CreateWorkflowExecution(
ctx context.Context,
request *persistence.CreateWorkflowExecutionRequest,
) (*persistence.CreateWorkflowExecutionResponse, error) {
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
defer cancel()

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.NewWorkflowSnapshot.ExecutionInfo.NamespaceId)
workflowID := request.NewWorkflowSnapshot.ExecutionInfo.WorkflowId
Expand All @@ -525,6 +523,13 @@ func (s *ContextImpl) CreateWorkflowExecution(
s.wLock()
defer s.wUnlock()

// timeout check should be done within the shard lock, in case of shard lock contention
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
defer cancel()

if err := s.errorByState(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -557,12 +562,6 @@ func (s *ContextImpl) UpdateWorkflowExecution(
ctx context.Context,
request *persistence.UpdateWorkflowExecutionRequest,
) (*persistence.UpdateWorkflowExecutionResponse, error) {
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
defer cancel()

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.UpdateWorkflowMutation.ExecutionInfo.NamespaceId)
workflowID := request.UpdateWorkflowMutation.ExecutionInfo.WorkflowId
Expand All @@ -574,6 +573,13 @@ func (s *ContextImpl) UpdateWorkflowExecution(
s.wLock()
defer s.wUnlock()

// timeout check should be done within the shard lock, in case of shard lock contention
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
defer cancel()

if err := s.errorByState(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -632,12 +638,6 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
ctx context.Context,
request *persistence.ConflictResolveWorkflowExecutionRequest,
) (*persistence.ConflictResolveWorkflowExecutionResponse, error) {
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
defer cancel()

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.ResetWorkflowSnapshot.ExecutionInfo.NamespaceId)
workflowID := request.ResetWorkflowSnapshot.ExecutionInfo.WorkflowId
Expand All @@ -649,6 +649,13 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
s.wLock()
defer s.wUnlock()

// timeout check should be done within the shard lock, in case of shard lock contention
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
defer cancel()

if err := s.errorByState(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -700,12 +707,6 @@ func (s *ContextImpl) SetWorkflowExecution(
ctx context.Context,
request *persistence.SetWorkflowExecutionRequest,
) (*persistence.SetWorkflowExecutionResponse, error) {
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
defer cancel()

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.SetWorkflowSnapshot.ExecutionInfo.NamespaceId)
workflowID := request.SetWorkflowSnapshot.ExecutionInfo.WorkflowId
Expand All @@ -717,6 +718,13 @@ func (s *ContextImpl) SetWorkflowExecution(
s.wLock()
defer s.wUnlock()

// timeout check should be done within the shard lock, in case of shard lock contention
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
defer cancel()

if err := s.errorByState(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -855,12 +863,6 @@ func (s *ContextImpl) DeleteWorkflowExecution(
// The history branch won't be accessible (because mutable state is deleted) and special garbage collection workflow will delete it eventually.
// Stage 4 shouldn't be done earlier because if this func fails after it, workflow execution will be accessible but won't have history (inconsistent state).

ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return err
}
defer cancel()

engine, err := s.GetEngine(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -897,6 +899,12 @@ func (s *ContextImpl) DeleteWorkflowExecution(
s.wLock()
defer s.wUnlock()

ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return err
}
defer cancel()

if err := s.errorByState(); err != nil {
return err
}
Expand Down

0 comments on commit 9fdd08c

Please sign in to comment.