Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][history] refactor history client with redirect wrapper #5762

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package client

import (
"github.com/uber/cadence/common/peerresolver"
"time"

adminv1 "github.com/uber/cadence-idl/go/proto/admin/v1"
Expand Down Expand Up @@ -116,7 +117,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
rawClient = thrift.NewHistoryClient(historyserviceclient.New(outboundConfig))
}

peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort)
peerResolver := peerresolver.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort)

client := history.NewClient(
cf.numberOfHistoryShards,
Expand Down
7 changes: 4 additions & 3 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"context"
"github.com/uber/cadence/common/peerresolver"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make fmt

"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -50,7 +51,7 @@ type (
rpcMaxSizeInBytes int // This value currently only used in GetReplicationMessage API
tokenSerializer common.TaskTokenSerializer
client Client
peerResolver PeerResolver
peerResolver peerresolver.PeerResolver
logger log.Logger
}

Expand All @@ -66,7 +67,7 @@ func NewClient(
numberOfShards int,
rpcMaxSizeInBytes int,
client Client,
peerResolver PeerResolver,
peerResolver peerresolver.PeerResolver,
logger log.Logger,
) Client {
return &clientImpl{
Expand All @@ -84,7 +85,7 @@ func (c *clientImpl) StartWorkflowExecution(
request *types.HistoryStartWorkflowExecutionRequest,
opts ...yarpc.CallOption,
) (*types.StartWorkflowExecutionResponse, error) {
peer, err := c.peerResolver.FromWorkflowID(request.StartRequest.WorkflowID)
peer, err := c.peerResolver.FromRedirectKey(request.ToRedirectKey())
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history
package peerresolver

import (
"fmt"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
)

// PeerResolver is used to resolve history peers.
Expand Down Expand Up @@ -100,3 +102,20 @@
}
return peers, nil
}

// FromRedirectKey resolve the peer from types.RedirectKey by a specific order
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's document the "specific order" so it's more explicit in documentation

func (pr PeerResolver) FromRedirectKey(key types.RedirectKey) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the other PeerResolver.From* functions on can be made private once all usage is refactored. Is that the plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a good suggestion

if key.ShardID != nil {
return pr.FromShardID(*key.ShardID)

Check warning on line 109 in common/peerresolver/peerResolver.go

View check run for this annotation

Codecov / codecov/patch

common/peerresolver/peerResolver.go#L107-L109

Added lines #L107 - L109 were not covered by tests
}
if key.WorkflowID != "" {
return pr.FromWorkflowID(key.WorkflowID)

Check warning on line 112 in common/peerresolver/peerResolver.go

View check run for this annotation

Codecov / codecov/patch

common/peerresolver/peerResolver.go#L111-L112

Added lines #L111 - L112 were not covered by tests
}
if key.DomainID != "" {
return pr.FromDomainID(key.DomainID)

Check warning on line 115 in common/peerresolver/peerResolver.go

View check run for this annotation

Codecov / codecov/patch

common/peerresolver/peerResolver.go#L114-L115

Added lines #L114 - L115 were not covered by tests
}
if key.HostAddress != "" {
return pr.FromHostAddress(key.HostAddress)

Check warning on line 118 in common/peerresolver/peerResolver.go

View check run for this annotation

Codecov / codecov/patch

common/peerresolver/peerResolver.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}
return "", fmt.Errorf("RedirectKey is not valid: %v", key)

Check warning on line 120 in common/peerresolver/peerResolver.go

View check run for this annotation

Codecov / codecov/patch

common/peerresolver/peerResolver.go#L120

Added line #L120 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add unit tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history
package peerresolver

import (
"errors"
Expand Down
9 changes: 9 additions & 0 deletions common/types/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

package types

var (
_ RedirectRequest = (*HistoryStartWorkflowExecutionRequest)(nil)
)

// DescribeMutableStateRequest is an internal type (TBD...)
type DescribeMutableStateRequest struct {
DomainUUID string `json:"domainUUID,omitempty"`
Expand Down Expand Up @@ -950,6 +954,11 @@
return
}

// ToRedirectKey convert request to redirect key
func (v *HistoryStartWorkflowExecutionRequest) ToRedirectKey() RedirectKey {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unit test please :)

return RedirectKey{WorkflowID: v.StartRequest.WorkflowID}

Check warning on line 959 in common/types/history.go

View check run for this annotation

Codecov / codecov/patch

common/types/history.go#L958-L959

Added lines #L958 - L959 were not covered by tests
}

// SyncActivityRequest is an internal type (TBD...)
type SyncActivityRequest struct {
DomainID string `json:"domainId,omitempty"`
Expand Down
14 changes: 14 additions & 0 deletions common/types/redirect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package types

// RedirectRequest interface for a redirect-able request
type RedirectRequest interface {
ToRedirectKey() RedirectKey
}

// RedirectKey entity of keys used for redirect
type RedirectKey struct {
ShardID *int
WorkflowID string
DomainID string
HostAddress string
}