Skip to content

Commit

Permalink
add shard lookup convenience
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred-landrum committed Jul 17, 2023
1 parent 7dd76fd commit 6c5a967
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions client/history/redirector.go
Expand Up @@ -74,6 +74,14 @@ func (t operationTarget) validate() error {
return nil
}

func shardLookup(resolver membership.ServiceResolver, shardID int32) (rpcAddress, error) {
hostInfo, err := resolver.Lookup(convert.Int32ToString(shardID))
if err != nil {
return "", err
}
return rpcAddress(hostInfo.GetAddress()), nil
}

func newBasicRedirector(
connections *clientConnections,
historyServiceResolver membership.ServiceResolver,
Expand All @@ -92,11 +100,11 @@ func (r *basicRedirector) clientForTarget(target operationTarget) (historyservic
connection := r.connections.getOrCreateClientConn(target.address)
return connection.historyClient, nil
}
hostInfo, err := r.historyServiceResolver.Lookup(convert.Int32ToString(target.shardID))
address, err := shardLookup(r.historyServiceResolver, target.shardID)
if err != nil {
return nil, err
}
clientConn := r.connections.getOrCreateClientConn(rpcAddress(hostInfo.GetAddress()))
clientConn := r.connections.getOrCreateClientConn(address)
return clientConn.historyClient, nil
}

Expand All @@ -108,11 +116,10 @@ func (r *basicRedirector) execute(ctx context.Context, target operationTarget, o
connection := r.connections.getOrCreateClientConn(target.address)
return op(ctx, connection.historyClient)
}
hostInfo, err := r.historyServiceResolver.Lookup(convert.Int32ToString(target.shardID))
address, err := shardLookup(r.historyServiceResolver, target.shardID)
if err != nil {
return err
}
address := rpcAddress(hostInfo.GetAddress())
return r.redirectLoop(ctx, address, op)
}

Expand Down

0 comments on commit 6c5a967

Please sign in to comment.