Skip to content

Commit

Permalink
Fix shard close error type (#3215)
Browse files Browse the repository at this point in the history
* Fix shard close error type: unavailable -> shard ownership lost
  • Loading branch information
wxing1292 authored and yycptt committed Aug 12, 2022
1 parent 51817aa commit 23e40a9
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 17 deletions.
2 changes: 1 addition & 1 deletion service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ processorPumpLoop:
p.options.UpdateAckInterval(),
p.options.UpdateAckIntervalJitterCoefficient(),
))
if err := p.ackMgr.updateQueueAckLevel(); err == shard.ErrShardClosed {
if err := p.ackMgr.updateQueueAckLevel(); shard.IsShardOwnershipLostError(err) {
// shard is no longer owned by this instance, bail out
go p.Stop()
break processorPumpLoop
Expand Down
13 changes: 9 additions & 4 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ type (
var _ Context = (*ContextImpl)(nil)

var (
// ErrShardClosed is returned when shard is closed and a req cannot be processed
ErrShardClosed = serviceerror.NewUnavailable("shard closed")

// ErrShardStatusUnknown means we're not sure if we have the shard lock or not. This may be returned
// during short windows at initialization and if we've lost the connection to the database.
ErrShardStatusUnknown = serviceerror.NewUnavailable("shard status unknown")
Expand Down Expand Up @@ -1073,7 +1070,7 @@ func (s *ContextImpl) errorByStateLocked() error {
case contextStateAcquired:
return nil
case contextStateStopping, contextStateStopped:
return ErrShardClosed
return s.newShardClosedErrorWithShardID()
default:
panic("invalid state")
}
Expand Down Expand Up @@ -1985,6 +1982,14 @@ func (s *ContextImpl) newIOContext() (context.Context, context.CancelFunc) {
return ctx, cancel
}

// newShardClosedErrorWithShardID when shard is closed and a req cannot be processed
func (s *ContextImpl) newShardClosedErrorWithShardID() *persistence.ShardOwnershipLostError {
return &persistence.ShardOwnershipLostError{
ShardID: s.shardID, // immutable
Msg: "shard closed",
}
}

func OperationPossiblySucceeded(err error) bool {
switch err.(type) {
case *persistence.CurrentWorkflowConditionFailedError,
Expand Down
13 changes: 6 additions & 7 deletions service/history/shard/controller_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,10 @@ func (c *ControllerImpl) removeShard(shardID int32, expected *ContextImpl) (*Con
// ControllerImpl. It is responsible for acquiring /
// releasing shards in response to any event that can
// change the shard ownership. These events are
// a. Ring membership change
// b. Periodic ticker
// c. ShardOwnershipLostError and subsequent ShardClosedEvents from engine
//
// a. Ring membership change
// b. Periodic ticker
// c. ShardOwnershipLostError and subsequent ShardClosedEvents from engine
func (c *ControllerImpl) shardManagementPump() {
defer c.shutdownWG.Done()

Expand Down Expand Up @@ -401,13 +402,11 @@ func (c *ControllerImpl) ShardIDs() []int32 {
}

func IsShardOwnershipLostError(err error) bool {
if err == ErrShardClosed {
return true
}

switch err.(type) {
case *persistence.ShardOwnershipLostError:
return true
case *serviceerrors.ShardOwnershipLost:
return true
}

return false
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (t *timerQueueProcessorImpl) completeTimersLoop() {
err := t.completeTimers()
if err != nil {
t.logger.Info("Failed to complete timers.", tag.Error(err))
if err == shard.ErrShardClosed {
if shard.IsShardOwnershipLostError(err) {
// shard is unloaded, timer processor should quit as well
go t.Stop()
return
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (t *timerQueueProcessorBase) internalProcessor() error {
t.config.TimerProcessorUpdateAckInterval(),
t.config.TimerProcessorUpdateAckIntervalJitterCoefficient(),
))
if err := t.timerQueueAckMgr.updateAckLevel(); err == shard.ErrShardClosed {
if err := t.timerQueueAckMgr.updateAckLevel(); shard.IsShardOwnershipLostError(err) {
// shard is closed, shutdown timerQProcessor and bail out
go t.Stop()
return err
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (t *transferQueueProcessorImpl) completeTransferLoop() {
err := t.completeTransfer()
if err != nil {
t.logger.Info("Failed to complete transfer task", tag.Error(err))
if err == shard.ErrShardClosed {
if shard.IsShardOwnershipLostError(err) {
// shard closed, trigger shutdown and bail out
t.Stop()
return
Expand Down
3 changes: 1 addition & 2 deletions service/history/visibilityQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package history

import (
"errors"
"sync/atomic"
"time"

Expand Down Expand Up @@ -268,7 +267,7 @@ func (t *visibilityQueueProcessorImpl) completeTaskLoop() {
}

t.logger.Info("Failed to complete visibility task", tag.Error(err))
if errors.Is(err, shard.ErrShardClosed) {
if shard.IsShardOwnershipLostError(err) {
// shard closed, trigger shutdown and bail out
t.Stop()
return
Expand Down

0 comments on commit 23e40a9

Please sign in to comment.