Skip to content

Commit

Permalink
Test that ShardOwnershipLostErrors are never retried (#3625)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 6, 2022
1 parent c94d2bf commit 34e256a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 8 deletions.
15 changes: 10 additions & 5 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ type (
scheduledTaskMaxReadLevelMap map[string]time.Time // cluster -> scheduledTaskMaxReadLevel

// exist only in memory
remoteClusterInfos map[string]*remoteClusterInfo
handoverNamespaces map[string]*namespaceHandOverInfo // keyed on namespace name
remoteClusterInfos map[string]*remoteClusterInfo
handoverNamespaces map[string]*namespaceHandOverInfo // keyed on namespace name
acquireShardRetryPolicy backoff.RetryPolicy
}

remoteClusterInfo struct {
Expand Down Expand Up @@ -1621,7 +1622,9 @@ func (s *ContextImpl) transition(request contextRequest) error {
// Cancel lifecycle context as soon as we know we're shutting down
s.lifecycleCancel()
// This will cause the controller to remove this shard from the map and then call s.finishStop()
go s.closeCallback(s)
if s.closeCallback != nil {
go s.closeCallback(s)
}
}

setStateStopped := func() {
Expand Down Expand Up @@ -1889,8 +1892,10 @@ func (s *ContextImpl) acquireShard() {
// lifecycleCtx. The persistence operations called here use lifecycleCtx as their context,
// so if we were blocked in any of them, they should return immediately with a context
// canceled error.
policy := backoff.NewExponentialRetryPolicy(1 * time.Second).
WithExpirationInterval(5 * time.Minute)
policy := s.acquireShardRetryPolicy
if policy == nil {
policy = backoff.NewExponentialRetryPolicy(1 * time.Second).WithExpirationInterval(5 * time.Minute)
}

// Remember this value across attempts
ownershipChanged := false
Expand Down
61 changes: 58 additions & 3 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package shard
import (
"context"
"errors"
"fmt"
"math/rand"
"testing"
"time"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/stretchr/testify/suite"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/convert"
Expand All @@ -53,7 +55,7 @@ type (
*require.Assertions

controller *gomock.Controller
mockShard Context
mockShard *ContextTest
mockClusterMetadata *cluster.MockMetadata
mockShardManager *persistence.MockShardManager
mockExecutionManager *persistence.MockExecutionManager
Expand Down Expand Up @@ -157,7 +159,7 @@ func (s *contextSuite) TestTimerMaxReadLevelInitialization() {
)

// clear shardInfo and load from persistence
shardContextImpl := s.mockShard.(*ContextTest)
shardContextImpl := s.mockShard
shardContextImpl.shardInfo = nil
err := shardContextImpl.loadShardMetadata(convert.BoolPtr(false))
s.NoError(err)
Expand Down Expand Up @@ -211,7 +213,7 @@ func (s *contextSuite) TestTimerMaxReadLevelUpdate_SingleProcessor() {

// update in single processor mode
s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true)
scheduledTaskMaxReadLevelMap := s.mockShard.(*ContextTest).scheduledTaskMaxReadLevelMap
scheduledTaskMaxReadLevelMap := s.mockShard.scheduledTaskMaxReadLevelMap
s.Len(scheduledTaskMaxReadLevelMap, 2)
s.True(scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName].After(now))
s.True(scheduledTaskMaxReadLevelMap[cluster.TestAlternativeClusterName].After(now))
Expand Down Expand Up @@ -365,3 +367,56 @@ func (s *contextSuite) TestDeleteWorkflowExecution_ErrorAndContinue_Success() {
s.NoError(err)
s.Equal(tasks.DeleteWorkflowExecutionStageCurrent|tasks.DeleteWorkflowExecutionStageMutableState|tasks.DeleteWorkflowExecutionStageVisibility|tasks.DeleteWorkflowExecutionStageHistory, stage)
}

func (s *contextSuite) TestAcquireShardOwnershipLostErrorIsNotRetried() {
s.mockShard.state = contextStateAcquiring
s.mockShard.acquireShardRetryPolicy = backoff.NewExponentialRetryPolicy(time.Nanosecond).
WithMaximumAttempts(5)
s.mockShardManager.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).
Return(&persistence.ShardOwnershipLostError{}).Times(1)

s.mockShard.acquireShard()

s.Assert().Equal(contextStateStopping, s.mockShard.state)
}

func (s *contextSuite) TestAcquireShardNonOwnershipLostErrorIsRetried() {
s.mockShard.state = contextStateAcquiring
s.mockShard.acquireShardRetryPolicy = backoff.NewExponentialRetryPolicy(time.Nanosecond).
WithMaximumAttempts(5)
// TODO: make this 5 times instead of 6 when retry policy is fixed
s.mockShardManager.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).
Return(fmt.Errorf("temp error")).Times(6)

s.mockShard.acquireShard()

s.Assert().Equal(contextStateStopping, s.mockShard.state)
}

func (s *contextSuite) TestAcquireShardEventuallySucceeds() {
s.mockShard.state = contextStateAcquiring
s.mockShard.acquireShardRetryPolicy = backoff.NewExponentialRetryPolicy(time.Nanosecond).
WithMaximumAttempts(5)
s.mockShardManager.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).
Return(fmt.Errorf("temp error")).Times(3)
s.mockShardManager.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).
Return(nil).Times(1)
s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MinTimes(1)

s.mockShard.acquireShard()

s.Assert().Equal(contextStateAcquired, s.mockShard.state)
}

func (s *contextSuite) TestAcquireShardNoError() {
s.mockShard.state = contextStateAcquiring
s.mockShard.acquireShardRetryPolicy = backoff.NewExponentialRetryPolicy(time.Nanosecond).
WithMaximumAttempts(5)
s.mockShardManager.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).
Return(nil).Times(1)
s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MinTimes(1)

s.mockShard.acquireShard()

s.Assert().Equal(contextStateAcquired, s.mockShard.state)
}

0 comments on commit 34e256a

Please sign in to comment.