Skip to content

Commit

Permalink
add stop reason for context
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred-landrum committed Jul 7, 2023
1 parent a3f530d commit 9c483cd
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 27 deletions.
2 changes: 1 addition & 1 deletion service/history/api/consistency_checker.go
Expand Up @@ -162,7 +162,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
}
if cmpResult > 0 {
shardID := c.shardContext.GetShardID()
c.shardContext.Unload()
c.shardContext.UnloadForOwnershipLost()
return nil, &persistence.ShardOwnershipLostError{
ShardID: shardID,
Msg: fmt.Sprintf("Shard: %v consistency check failed, reloading", shardID),
Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/context.go
Expand Up @@ -118,6 +118,6 @@ type (
// If branchToken != nil, then delete history also, otherwise leave history.
DeleteWorkflowExecution(ctx context.Context, workflowKey definition.WorkflowKey, branchToken []byte, startTime *time.Time, closeTime *time.Time, closeExecutionVisibilityTaskID int64, stage *tasks.DeleteWorkflowExecutionStage) error

Unload()
UnloadForOwnershipLost()
}
)
52 changes: 38 additions & 14 deletions service/history/shard/context_impl.go
Expand Up @@ -128,8 +128,9 @@ type (
lifecycleCancel context.CancelFunc

// state is protected by stateLock
stateLock sync.Mutex
state contextState
stateLock sync.Mutex
state contextState
stopReason stopReason

// All following fields are protected by rwLock, and only valid if state >= Acquiring:
rwLock sync.RWMutex
Expand Down Expand Up @@ -163,8 +164,15 @@ type (
contextRequestAcquire struct{}
contextRequestAcquired struct{ engine Engine }
contextRequestLost struct{}
contextRequestStop struct{}
contextRequestStop struct{ reason stopReason }
contextRequestFinishStop struct{}

stopReason int
)

const (
stopReasonUnspecified stopReason = iota
stopReasonOwnershipLost
)

var _ Context = (*ContextImpl)(nil)
Expand Down Expand Up @@ -1280,7 +1288,7 @@ func (s *ContextImpl) handleReadError(err error) error {
case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
_ = s.transition(contextRequestStop{})
_ = s.transition(contextRequestStop{reason: stopReasonOwnershipLost})
return err

default:
Expand Down Expand Up @@ -1315,7 +1323,7 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
_ = s.transition(contextRequestStop{})
_ = s.transition(contextRequestStop{reason: stopReasonOwnershipLost})
return err

default:
Expand Down Expand Up @@ -1352,8 +1360,13 @@ func (s *ContextImpl) start() {
_ = s.transition(contextRequestAcquire{})
}

func (s *ContextImpl) Unload() {
_ = s.transition(contextRequestStop{})
func (s *ContextImpl) UnloadForOwnershipLost() {
_ = s.transition(contextRequestStop{reason: stopReasonOwnershipLost})
}

// requestStop should only be called by the controller.
func (s *ContextImpl) requestStop() {
_ = s.transition(contextRequestStop{reason: stopReasonUnspecified})
}

// finishStop should only be called by the controller.
Expand All @@ -1379,6 +1392,12 @@ func (s *ContextImpl) IsValid() bool {
return s.state < contextStateStopping
}

func (s *ContextImpl) stoppedForOwnershipLost() bool {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.state >= contextStateStopping && s.stopReason == stopReasonOwnershipLost
}

func (s *ContextImpl) wLock() {
handler := s.metricsHandler.WithTags(metrics.OperationTag(metrics.ShardInfoScope))
handler.Counter(metrics.LockRequests.GetMetricName()).Record(1)
Expand Down Expand Up @@ -1461,8 +1480,9 @@ func (s *ContextImpl) transition(request contextRequest) error {
go s.acquireShard()
}

setStateStopping := func() {
setStateStopping := func(request contextRequestStop) {
s.state = contextStateStopping
s.stopReason = request.reason
// Cancel lifecycle context as soon as we know we're shutting down
s.lifecycleCancel()
// This will cause the controller to remove this shard from the map and then call s.finishStop()
Expand All @@ -1480,12 +1500,12 @@ func (s *ContextImpl) transition(request contextRequest) error {

switch s.state {
case contextStateInitialized:
switch request.(type) {
switch request := request.(type) {
case contextRequestAcquire:
setStateAcquiring()
return nil
case contextRequestStop:
setStateStopping()
setStateStopping(request)
return nil
case contextRequestFinishStop:
setStateStopped()
Expand Down Expand Up @@ -1518,21 +1538,21 @@ func (s *ContextImpl) transition(request contextRequest) error {
case contextRequestLost:
return nil // nothing to do, already acquiring
case contextRequestStop:
setStateStopping()
setStateStopping(request)
return nil
case contextRequestFinishStop:
setStateStopped()
return nil
}
case contextStateAcquired:
switch request.(type) {
switch request := request.(type) {
case contextRequestAcquire:
return nil // nothing to to do, already acquired
case contextRequestLost:
setStateAcquiring()
return nil
case contextRequestStop:
setStateStopping()
setStateStopping(request)
return nil
case contextRequestFinishStop:
setStateStopped()
Expand Down Expand Up @@ -1849,9 +1869,13 @@ func (s *ContextImpl) acquireShard() {
// We got an non-retryable error, e.g. ShardOwnershipLostError
s.contextTaggedLogger.Error("Couldn't acquire shard", tag.Error(err))

reason := stopReasonUnspecified
if IsShardOwnershipLostError(err) {
reason = stopReasonOwnershipLost
}
// On any error, initiate shutting down the shard. If we already changed state
// because we got a ShardOwnershipLostError, this won't do anything.
_ = s.transition(contextRequestStop{})
_ = s.transition(contextRequestStop{reason: reason})
}
}

Expand Down
12 changes: 6 additions & 6 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 56 additions & 0 deletions service/history/shard/context_test.go
Expand Up @@ -499,3 +499,59 @@ func (s *contextSuite) TestHandoverNamespace() {
_, ok = handoverNS[namespaceEntry.Name().String()]
s.False(ok)
}

func (s *contextSuite) TestShardStopReasonAssertOwnership() {
s.mockShard.state = contextStateAcquired
s.mockShardManager.EXPECT().AssertShardOwnership(gomock.Any(), gomock.Any()).
Return(&persistence.ShardOwnershipLostError{}).Times(1)

err := s.mockShard.AssertOwnership(context.Background())
s.Error(err)

s.False(s.mockShard.IsValid())
s.True(s.mockShard.stoppedForOwnershipLost())
}

func (s *contextSuite) TestShardStopReasonShardRead() {
s.mockShard.state = contextStateAcquired
s.mockExecutionManager.EXPECT().GetCurrentExecution(gomock.Any(), gomock.Any()).
Return(nil, &persistence.ShardOwnershipLostError{}).Times(1)

_, err := s.mockShard.GetCurrentExecution(context.Background(), nil)
s.Error(err)

s.False(s.mockShard.IsValid())
s.True(s.mockShard.stoppedForOwnershipLost())
}

func (s *contextSuite) TestShardStopReasonAcquireShard() {
s.mockShard.state = contextStateAcquiring
s.mockShardManager.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).
Return(&persistence.ShardOwnershipLostError{}).Times(1)

s.mockShard.acquireShard()

s.Assert().Equal(contextStateStopping, s.mockShard.state)
s.False(s.mockShard.IsValid())
s.True(s.mockShard.stoppedForOwnershipLost())
}

func (s *contextSuite) TestShardStopReasonUnload() {
s.mockShard.state = contextStateAcquired

s.mockShard.UnloadForOwnershipLost()

s.Assert().Equal(contextStateStopping, s.mockShard.state)
s.False(s.mockShard.IsValid())
s.True(s.mockShard.stoppedForOwnershipLost())
}

func (s *contextSuite) TestShardStopReasonCloseShard() {
s.mockShard.state = contextStateAcquired
s.mockHistoryEngine.EXPECT().Stop().Times(1)

s.mockShard.finishStop()

s.False(s.mockShard.IsValid())
s.False(s.mockShard.stoppedForOwnershipLost())
}
5 changes: 4 additions & 1 deletion service/history/shard/controller_impl.go
Expand Up @@ -281,7 +281,10 @@ func (c *ControllerImpl) getOrCreateShardContext(shardID int32) (*ContextImpl, e
return shard, nil
}

shard.Unload()
// If the shard was invalid and still in the historyShards map, it should
// be because the shardClosedCallback hasn't yet executed its call to
// finishStop, so this call to requestStop should be redundant.
shard.requestStop()
delete(c.historyShards, shardID)
}

Expand Down
8 changes: 4 additions & 4 deletions service/history/shard/controller_test.go
Expand Up @@ -483,7 +483,7 @@ func (s *controllerSuite) TestShardExplicitUnload() {
s.NoError(err)
s.Equal(1, len(s.shardController.ShardIDs()))

shard.Unload()
shard.UnloadForOwnershipLost()

for tries := 0; tries < 100 && len(s.shardController.ShardIDs()) != 0; tries++ {
// removal from map happens asynchronously
Expand Down Expand Up @@ -529,7 +529,7 @@ func (s *controllerSuite) TestShardExplicitUnloadCancelGetOrCreate() {
s.False(shard.engineFuture.Ready())

start := time.Now()
shard.Unload() // this cancels the context so GetOrCreateShard returns immediately
shard.UnloadForOwnershipLost() // this cancels the context so GetOrCreateShard returns immediately
s.True(<-wasCanceled)
s.Less(time.Since(start), 500*time.Millisecond)
}
Expand Down Expand Up @@ -582,7 +582,7 @@ func (s *controllerSuite) TestShardExplicitUnloadCancelAcquire() {
s.False(shard.engineFuture.Ready())

start := time.Now()
shard.Unload() // this cancels the context so UpdateShard returns immediately
shard.UnloadForOwnershipLost() // this cancels the context so UpdateShard returns immediately
s.True(<-wasCanceled)
s.Less(time.Since(start), 500*time.Millisecond)
}
Expand Down Expand Up @@ -680,7 +680,7 @@ func (s *controllerSuite) TestShardControllerFuzz() {
}
case 2:
if _, shard := randomLoadedShard(); shard != nil {
shard.Unload()
shard.UnloadForOwnershipLost()
}
case 3:
if id, _ := randomLoadedShard(); id >= 0 {
Expand Down

0 comments on commit 9c483cd

Please sign in to comment.