Skip to content

Commit

Permalink
Use detached context for shard context operations (#3194)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Aug 12, 2022
1 parent ba452a2 commit f8a59f7
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (s *ContextImpl) AddTasks(
ctx context.Context,
request *persistence.AddHistoryTasksRequest,
) error {
ctx, cancel, err := s.ensureMinContextTimeout(ctx)
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -596,7 +596,7 @@ func (s *ContextImpl) CreateWorkflowExecution(
ctx context.Context,
request *persistence.CreateWorkflowExecutionRequest,
) (*persistence.CreateWorkflowExecutionResponse, error) {
ctx, cancel, err := s.ensureMinContextTimeout(ctx)
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -640,7 +640,7 @@ func (s *ContextImpl) UpdateWorkflowExecution(
ctx context.Context,
request *persistence.UpdateWorkflowExecutionRequest,
) (*persistence.UpdateWorkflowExecutionResponse, error) {
ctx, cancel, err := s.ensureMinContextTimeout(ctx)
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -711,7 +711,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
ctx context.Context,
request *persistence.ConflictResolveWorkflowExecutionRequest,
) (*persistence.ConflictResolveWorkflowExecutionResponse, error) {
ctx, cancel, err := s.ensureMinContextTimeout(ctx)
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -775,7 +775,7 @@ func (s *ContextImpl) SetWorkflowExecution(
ctx context.Context,
request *persistence.SetWorkflowExecutionRequest,
) (*persistence.SetWorkflowExecutionResponse, error) {
ctx, cancel, err := s.ensureMinContextTimeout(ctx)
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -925,7 +925,7 @@ func (s *ContextImpl) DeleteWorkflowExecution(
// The history branch won't be accessible (because mutable state is deleted) and special garbage collection workflow will delete it eventually.
// Step 4 shouldn't be done earlier because if this func fails after it, workflow execution will be accessible but won't have history (inconsistent state).

ctx, cancel, err := s.ensureMinContextTimeout(ctx)
ctx, cancel, err := s.newDetachedContext(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1954,21 +1954,28 @@ func (s *ContextImpl) GetArchivalMetadata() archiver.ArchivalMetadata {
return s.archivalMetadata
}

func (s *ContextImpl) ensureMinContextTimeout(
func (s *ContextImpl) newDetachedContext(
ctx context.Context,
) (context.Context, context.CancelFunc, error) {
if err := ctx.Err(); err != nil {
return nil, nil, err
}

detachedContext := rpc.CopyContextValues(s.lifecycleCtx, ctx)

var cancel context.CancelFunc
deadline, ok := ctx.Deadline()
if !ok || deadline.Sub(s.GetTimeSource().Now()) >= minContextTimeout {
return ctx, func() {}, nil
if ok {
timeout := deadline.Sub(s.GetTimeSource().Now())
if timeout < minContextTimeout {
timeout = minContextTimeout
}
detachedContext, cancel = context.WithTimeout(detachedContext, timeout)
} else {
cancel = func() {}
}

newContext, cancel := context.WithTimeout(context.Background(), minContextTimeout)
newContext = rpc.CopyContextValues(newContext, ctx)
return newContext, cancel, nil
return detachedContext, cancel, nil
}

func (s *ContextImpl) newIOContext() (context.Context, context.CancelFunc) {
Expand Down

0 comments on commit f8a59f7

Please sign in to comment.