From 76b624f936856aa742d38f1dcddcab51b6eff926 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 7 Mar 2024 16:59:51 -0800 Subject: [PATCH] WIP --- client/clientfactory.go | 3 ++- client/history/client.go | 7 ++++--- .../peerresolver}/peerResolver.go | 21 ++++++++++++++++++- .../peerresolver}/peerResolver_test.go | 2 +- common/types/history.go | 9 ++++++++ common/types/redirect.go | 14 +++++++++++++ 6 files changed, 50 insertions(+), 6 deletions(-) rename {client/history => common/peerresolver}/peerResolver.go (88%) rename {client/history => common/peerresolver}/peerResolver_test.go (99%) create mode 100644 common/types/redirect.go diff --git a/client/clientfactory.go b/client/clientfactory.go index 1b89069b7b..29f9662bae 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -21,6 +21,7 @@ package client import ( + "github.com/uber/cadence/common/peerresolver" "time" adminv1 "github.com/uber/cadence-idl/go/proto/admin/v1" @@ -116,7 +117,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) ( rawClient = thrift.NewHistoryClient(historyserviceclient.New(outboundConfig)) } - peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort) + peerResolver := peerresolver.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort) client := history.NewClient( cf.numberOfHistoryShards, diff --git a/client/history/client.go b/client/history/client.go index bb5ee0fb8c..82f16e073e 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -22,6 +22,7 @@ package history import ( "context" + "github.com/uber/cadence/common/peerresolver" "math/rand" "sync" "time" @@ -50,7 +51,7 @@ type ( rpcMaxSizeInBytes int // This value currently only used in GetReplicationMessage API tokenSerializer common.TaskTokenSerializer client Client - peerResolver PeerResolver + peerResolver peerresolver.PeerResolver logger log.Logger } @@ -66,7 +67,7 @@ func NewClient( numberOfShards int, rpcMaxSizeInBytes int, client Client, - peerResolver PeerResolver, + peerResolver peerresolver.PeerResolver, logger log.Logger, ) Client { return &clientImpl{ @@ -84,7 +85,7 @@ func (c *clientImpl) StartWorkflowExecution( request *types.HistoryStartWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.StartWorkflowExecutionResponse, error) { - peer, err := c.peerResolver.FromWorkflowID(request.StartRequest.WorkflowID) + peer, err := c.peerResolver.FromRedirectKey(request.ToRedirectKey()) if err != nil { return nil, err } diff --git a/client/history/peerResolver.go b/common/peerresolver/peerResolver.go similarity index 88% rename from client/history/peerResolver.go rename to common/peerresolver/peerResolver.go index 8a542e42cc..eead565c2c 100644 --- a/client/history/peerResolver.go +++ b/common/peerresolver/peerResolver.go @@ -18,12 +18,14 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package peerresolver import ( + "fmt" "github.com/uber/cadence/common" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/types" ) // PeerResolver is used to resolve history peers. @@ -100,3 +102,20 @@ func (pr PeerResolver) GetAllPeers() ([]string, error) { } return peers, nil } + +// FromRedirectKey resolve the peer from types.RedirectKey by a specific order +func (pr PeerResolver) FromRedirectKey(key types.RedirectKey) (string, error) { + if key.ShardID != nil { + return pr.FromShardID(*key.ShardID) + } + if key.WorkflowID != "" { + return pr.FromWorkflowID(key.WorkflowID) + } + if key.DomainID != "" { + return pr.FromDomainID(key.DomainID) + } + if key.HostAddress != "" { + return pr.FromHostAddress(key.HostAddress) + } + return "", fmt.Errorf("RedirectKey is not valid: %v", key) +} diff --git a/client/history/peerResolver_test.go b/common/peerresolver/peerResolver_test.go similarity index 99% rename from client/history/peerResolver_test.go rename to common/peerresolver/peerResolver_test.go index 3cc22c319a..80ec7437e2 100644 --- a/client/history/peerResolver_test.go +++ b/common/peerresolver/peerResolver_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package peerresolver import ( "errors" diff --git a/common/types/history.go b/common/types/history.go index 5541dccdbf..983df0dcb8 100644 --- a/common/types/history.go +++ b/common/types/history.go @@ -20,6 +20,10 @@ package types +var ( + _ RedirectRequest = (*HistoryStartWorkflowExecutionRequest)(nil) +) + // DescribeMutableStateRequest is an internal type (TBD...) type DescribeMutableStateRequest struct { DomainUUID string `json:"domainUUID,omitempty"` @@ -950,6 +954,11 @@ func (v *HistoryStartWorkflowExecutionRequest) GetPartitionConfig() (o map[strin return } +// ToRedirectKey convert request to redirect key +func (v *HistoryStartWorkflowExecutionRequest) ToRedirectKey() RedirectKey { + return RedirectKey{WorkflowID: v.StartRequest.WorkflowID} +} + // SyncActivityRequest is an internal type (TBD...) type SyncActivityRequest struct { DomainID string `json:"domainId,omitempty"` diff --git a/common/types/redirect.go b/common/types/redirect.go new file mode 100644 index 0000000000..5a83890fb2 --- /dev/null +++ b/common/types/redirect.go @@ -0,0 +1,14 @@ +package types + +// RedirectRequest interface for a redirect-able request +type RedirectRequest interface { + ToRedirectKey() RedirectKey +} + +// RedirectKey entity of keys used for redirect +type RedirectKey struct { + ShardID *int + WorkflowID string + DomainID string + HostAddress string +}