Skip to content

Commit

Permalink
Improve history client retry logic (#2762)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Apr 25, 2022
1 parent 58103d7 commit cd88c61
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 22 deletions.
29 changes: 14 additions & 15 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,28 +1156,27 @@ func (c *clientImpl) getClientForShardID(shardID int32) (historyservice.HistoryS

func (c *clientImpl) executeWithRedirect(ctx context.Context,
client historyservice.HistoryServiceClient,
op func(ctx context.Context, client historyservice.HistoryServiceClient) error) error {
op func(ctx context.Context, client historyservice.HistoryServiceClient) error,
) error {

var err error
redirectLoop:
for {
err = common.IsValidContext(ctx)
err := common.IsValidContext(ctx)
if err != nil {
break redirectLoop
return err
}

err = op(ctx, client)
if err != nil {
if s, ok := err.(*serviceerrors.ShardOwnershipLost); ok {
// TODO: consider emitting a metric for number of redirects
ret, err := c.clients.GetClientForClientKey(s.OwnerHost)
if err != nil {
return err
}
client = ret.(historyservice.HistoryServiceClient)
continue redirectLoop
if s, ok := err.(*serviceerrors.ShardOwnershipLost); ok && len(s.OwnerHost) != 0 {
// TODO: consider emitting a metric for number of redirects
ret, err := c.clients.GetClientForClientKey(s.OwnerHost)
if err != nil {
return err
}
client = ret.(historyservice.HistoryServiceClient)
continue redirectLoop
} else {
return err
}
break redirectLoop
}
return err
}
2 changes: 1 addition & 1 deletion common/serviceerror/shardOwnershipLost.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type (
)

// NewShardOwnershipLost returns new ShardOwnershipLost error.
func NewShardOwnershipLost(ownerHost, currentHost string) error {
func NewShardOwnershipLost(ownerHost string, currentHost string) error {
return &ShardOwnershipLost{
Message: fmt.Sprintf("Shard is owned by:%v but not by %v", ownerHost, currentHost),
OwnerHost: ownerHost,
Expand Down
6 changes: 3 additions & 3 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1728,10 +1728,10 @@ func (h *Handler) convertError(err error) error {
switch err := err.(type) {
case *persistence.ShardOwnershipLostError:
hostInfo := h.hostInfoProvider.HostInfo()
if info, err := h.historyServiceResolver.Lookup(convert.Int32ToString(err.ShardID)); err == nil {
return serviceerrors.NewShardOwnershipLost(hostInfo.GetAddress(), info.GetAddress())
if ownerInfo, err := h.historyServiceResolver.Lookup(convert.Int32ToString(err.ShardID)); err == nil {
return serviceerrors.NewShardOwnershipLost(ownerInfo.GetAddress(), hostInfo.GetAddress())
}
return serviceerrors.NewShardOwnershipLost(hostInfo.GetAddress(), "<unknown>")
return serviceerrors.NewShardOwnershipLost("", hostInfo.GetAddress())
case *persistence.WorkflowConditionFailedError:
return serviceerror.NewUnavailable(err.Msg)
case *persistence.CurrentWorkflowConditionFailedError:
Expand Down
6 changes: 3 additions & 3 deletions service/history/shard/controller_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,14 @@ func (c *ControllerImpl) getOrCreateShardContext(shardID int32) (*ContextImpl, e
}
c.RUnlock()

info, err := c.historyServiceResolver.Lookup(convert.Int32ToString(shardID))
ownerInfo, err := c.historyServiceResolver.Lookup(convert.Int32ToString(shardID))
if err != nil {
return nil, err
}

hostInfo := c.hostInfoProvider.HostInfo()
if info.Identity() != hostInfo.Identity() {
return nil, serviceerrors.NewShardOwnershipLost(hostInfo.Identity(), info.GetAddress())
if ownerInfo.Identity() != hostInfo.Identity() {
return nil, serviceerrors.NewShardOwnershipLost(ownerInfo.Identity(), hostInfo.GetAddress())
}

c.Lock()
Expand Down

0 comments on commit cd88c61

Please sign in to comment.