diff --git a/client/clientfactory.go b/client/clientfactory.go index 1b89069b7b0..29f9662baef 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -21,6 +21,7 @@ package client import ( + "github.com/uber/cadence/common/peerresolver" "time" adminv1 "github.com/uber/cadence-idl/go/proto/admin/v1" @@ -116,7 +117,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) ( rawClient = thrift.NewHistoryClient(historyserviceclient.New(outboundConfig)) } - peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort) + peerResolver := peerresolver.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort) client := history.NewClient( cf.numberOfHistoryShards, diff --git a/client/history/client.go b/client/history/client.go index bb5ee0fb8ca..82f16e073ec 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -22,6 +22,7 @@ package history import ( "context" + "github.com/uber/cadence/common/peerresolver" "math/rand" "sync" "time" @@ -50,7 +51,7 @@ type ( rpcMaxSizeInBytes int // This value currently only used in GetReplicationMessage API tokenSerializer common.TaskTokenSerializer client Client - peerResolver PeerResolver + peerResolver peerresolver.PeerResolver logger log.Logger } @@ -66,7 +67,7 @@ func NewClient( numberOfShards int, rpcMaxSizeInBytes int, client Client, - peerResolver PeerResolver, + peerResolver peerresolver.PeerResolver, logger log.Logger, ) Client { return &clientImpl{ @@ -84,7 +85,7 @@ func (c *clientImpl) StartWorkflowExecution( request *types.HistoryStartWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.StartWorkflowExecutionResponse, error) { - peer, err := c.peerResolver.FromWorkflowID(request.StartRequest.WorkflowID) + peer, err := c.peerResolver.FromRedirectKey(request.ToRedirectKey()) if err != nil { return nil, err } diff --git a/client/history/peerResolver.go b/common/peerresolver/peerResolver.go similarity index 88% rename from client/history/peerResolver.go rename to common/peerresolver/peerResolver.go index 8a542e42cc7..eead565c2c0 100644 --- a/client/history/peerResolver.go +++ b/common/peerresolver/peerResolver.go @@ -18,12 +18,14 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package peerresolver import ( + "fmt" "github.com/uber/cadence/common" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/types" ) // PeerResolver is used to resolve history peers. @@ -100,3 +102,20 @@ func (pr PeerResolver) GetAllPeers() ([]string, error) { } return peers, nil } + +// FromRedirectKey resolve the peer from types.RedirectKey by a specific order +func (pr PeerResolver) FromRedirectKey(key types.RedirectKey) (string, error) { + if key.ShardID != nil { + return pr.FromShardID(*key.ShardID) + } + if key.WorkflowID != "" { + return pr.FromWorkflowID(key.WorkflowID) + } + if key.DomainID != "" { + return pr.FromDomainID(key.DomainID) + } + if key.HostAddress != "" { + return pr.FromHostAddress(key.HostAddress) + } + return "", fmt.Errorf("RedirectKey is not valid: %v", key) +} diff --git a/client/history/peerResolver_test.go b/common/peerresolver/peerResolver_test.go similarity index 99% rename from client/history/peerResolver_test.go rename to common/peerresolver/peerResolver_test.go index 3cc22c319a3..80ec7437e2b 100644 --- a/client/history/peerResolver_test.go +++ b/common/peerresolver/peerResolver_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package peerresolver import ( "errors" diff --git a/common/types/history.go b/common/types/history.go index 5541dccdbfe..983df0dcb83 100644 --- a/common/types/history.go +++ b/common/types/history.go @@ -20,6 +20,10 @@ package types +var ( + _ RedirectRequest = (*HistoryStartWorkflowExecutionRequest)(nil) +) + // DescribeMutableStateRequest is an internal type (TBD...) type DescribeMutableStateRequest struct { DomainUUID string `json:"domainUUID,omitempty"` @@ -950,6 +954,11 @@ func (v *HistoryStartWorkflowExecutionRequest) GetPartitionConfig() (o map[strin return } +// ToRedirectKey convert request to redirect key +func (v *HistoryStartWorkflowExecutionRequest) ToRedirectKey() RedirectKey { + return RedirectKey{WorkflowID: v.StartRequest.WorkflowID} +} + // SyncActivityRequest is an internal type (TBD...) type SyncActivityRequest struct { DomainID string `json:"domainId,omitempty"` diff --git a/common/types/redirect.go b/common/types/redirect.go new file mode 100644 index 00000000000..5a83890fb29 --- /dev/null +++ b/common/types/redirect.go @@ -0,0 +1,14 @@ +package types + +// RedirectRequest interface for a redirect-able request +type RedirectRequest interface { + ToRedirectKey() RedirectKey +} + +// RedirectKey entity of keys used for redirect +type RedirectKey struct { + ShardID *int + WorkflowID string + DomainID string + HostAddress string +}