Skip to content

Commit

Permalink
Deprecate NewClient, add Dial and NewLazyClient and CheckHealth (#795)
Browse files Browse the repository at this point in the history
Fixes #793, fixes #753, fixes #595
  • Loading branch information
cretz committed May 6, 2022
1 parent fdb99eb commit ee5ae6f
Show file tree
Hide file tree
Showing 17 changed files with 389 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/wait_for_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
log.Print("Waiting for server availability")
var lastErr error
for start := time.Now(); time.Since(start) < 5*time.Minute; time.Sleep(2 * time.Second) {
_, lastErr = client.NewClient(client.Options{})
_, lastErr = client.Dial(client.Options{})
if lastErr == nil {
log.Print("Connected to server")
return
Expand Down
4 changes: 2 additions & 2 deletions activity/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ returned from the workflow.ExecuteActivity function as the details field of Time
It is also possible to heartbeat an Activity from an external source:
// instantiate a Temporal service Client
client.Client client = client.NewClient(...)
client.Client client = client.Dial(...)
// record heartbeat
err := client.RecordActivityHeartbeat(ctx, taskToken, details)
Expand Down Expand Up @@ -181,7 +181,7 @@ In order to for some Workflow execution to be able to invoke an Activity type, t
all the implementations it has access to. To do that, create a Worker and register the Activity like so:
```
c, err := client.NewClient(client.Options{})
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
Expand Down
25 changes: 25 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ type (
// QueryWorkflowWithOptionsResponse defines the response to QueryWorkflowWithOptions.
QueryWorkflowWithOptionsResponse = internal.QueryWorkflowWithOptionsResponse

// CheckHealthRequest is a request for Client.CheckHealth.
CheckHealthRequest = internal.CheckHealthRequest

// CheckHealthResponse is a response for Client.CheckHealth.
CheckHealthResponse = internal.CheckHealthResponse

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -368,6 +374,10 @@ type (
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)

// WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases
// that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the
// service are not configured with internal semantics such as automatic retries.
Expand Down Expand Up @@ -434,9 +444,24 @@ type MetricsTimer = metrics.Timer
// MetricsNopHandler is a noop handler that does nothing with the metrics.
var MetricsNopHandler = metrics.NopHandler

// Dial creates an instance of a workflow client. This will attempt to connect
// to the server eagerly and will return an error if the server is not
// available.
func Dial(options Options) (Client, error) {
return internal.DialClient(options)
}

// NewLazyClient creates an instance of a workflow client. Unlike Dial, this
// will not eagerly connect to the server.
func NewLazyClient(options Options) (Client, error) {
return internal.NewLazyClient(options)
}

// NewClient creates an instance of a workflow client. This will attempt to
// connect to the server eagerly and will return an error if the server is not
// available.
//
// Deprecated: Use Dial or NewLazyClient instead.
func NewClient(options Options) (Client, error) {
return internal.NewClient(options)
}
Expand Down
1 change: 1 addition & 0 deletions evictiontest/workflow_cache_eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type (
func (s *CacheEvictionSuite) SetupTest() {
s.mockCtrl = gomock.NewController(s.T())
s.service = workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl)
s.service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes()
}

func (s *CacheEvictionSuite) TearDownTest() {
Expand Down
95 changes: 53 additions & 42 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ package internal
import (
"context"
"crypto/tls"
"fmt"
"io"
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
uberatomic "go.uber.org/atomic"
"google.golang.org/grpc"
Expand All @@ -55,8 +52,6 @@ const (
// QueryTypeOpenSessions is the build in query type for Client.QueryWorkflow() call. Use this query type to get all open
// sessions in the workflow. The result will be a list of SessionInfo encoded in the EncodedValue.
QueryTypeOpenSessions string = "__open_sessions"

getSystemInfoTimeout = 5 * time.Second
)

type (
Expand Down Expand Up @@ -338,6 +333,10 @@ type (
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)

// WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases
// that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the
// service are not configured with internal semantics such as automatic retries.
Expand Down Expand Up @@ -365,7 +364,7 @@ type (
// builder := manual.NewBuilderWithScheme("myresolver")
// builder.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: "1.2.3.4:1234"}, {Addr: "2.3.4.5:2345"}}})
// resolver.Register(builder)
// c, err := client.NewClient(client.Options{HostPort: "myresolver:///ignoredvalue"})
// c, err := client.Dial(client.Options{HostPort: "myresolver:///ignoredvalue"})
// Other more advanced resolvers can also be registered.
HostPort string

Expand Down Expand Up @@ -467,6 +466,15 @@ type (
// grpc.WithUnaryInterceptor option can be added since grpc.WithUnaryInterceptor is prepended to chains set with
// grpc.WithChainUnaryInterceptor.
DialOptions []grpc.DialOption

// Hidden for use by client overloads.
disableEagerConnection bool

// Internal atomic that, when true, will not retry internal errors like
// other gRPC errors. If not present during service client creation, it will
// be created as false. This is set to true when server capabilities are
// fetched.
excludeInternalFromRetry *uberatomic.Bool
}

// StartWorkflowOptions configuration parameters for starting a workflow execution.
Expand Down Expand Up @@ -609,7 +617,21 @@ type (
}
)

// DialClient creates a client and attempts to connect to the server.
func DialClient(options ClientOptions) (Client, error) {
options.ConnectionOptions.disableEagerConnection = false
return NewClient(options)
}

// NewLazyClient creates a client and does not attempt to connect to the server.
func NewLazyClient(options ClientOptions) (Client, error) {
options.ConnectionOptions.disableEagerConnection = true
return NewClient(options)
}

// NewClient creates an instance of a workflow client
//
// Deprecated: Use DialClient or NewLazyClient instead.
func NewClient(options ClientOptions) (Client, error) {
if options.Namespace == "" {
options.Namespace = DefaultNamespace
Expand All @@ -630,22 +652,22 @@ func NewClient(options ClientOptions) (Client, error) {
options.Logger.Info("No logger configured for temporal client. Created default one.")
}

var excludeInternalFromRetry uberatomic.Bool
connection, err := dial(newDialParameters(&options, &excludeInternalFromRetry))
options.ConnectionOptions.excludeInternalFromRetry = uberatomic.NewBool(false)
connection, err := dial(newDialParameters(&options, options.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
}

client := NewServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options)

// Get server capabilities eagerly. This has replaced health checking and we
// have accepted that this forces an eager connection with the server.
client.capabilities, err = getServerCapabilities(client.workflowService)
if err != nil {
client.Close()
return nil, err
// Load server capabilities eagerly if not disabled
if !options.ConnectionOptions.disableEagerConnection {
if _, err := client.loadCapabilities(); err != nil {
client.Close()
return nil, err
}
}
excludeInternalFromRetry.Store(client.capabilities.InternalErrorDifferentiation)

return client, nil
}

Expand All @@ -664,7 +686,7 @@ func newDialParameters(options *ClientOptions, excludeInternalFromRetry *uberato
}

// NewServiceClient creates workflow client from workflowservice.WorkflowServiceClient. Must be used internally in unit tests only.
func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, connectionCloser io.Closer, options ClientOptions) *WorkflowClient {
func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, conn *grpc.ClientConn, options ClientOptions) *WorkflowClient {
// Namespace can be empty in unit tests.
if options.Namespace == "" {
options.Namespace = DefaultNamespace
Expand All @@ -682,6 +704,10 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
options.MetricsHandler = metrics.NopHandler
}

if options.ConnectionOptions.excludeInternalFromRetry == nil {
options.ConnectionOptions.excludeInternalFromRetry = uberatomic.NewBool(false)
}

// Collect set of applicable worker interceptors
var workerInterceptors []WorkerInterceptor
for _, interceptor := range options.Interceptors {
Expand All @@ -691,16 +717,17 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
}

client := &WorkflowClient{
workflowService: workflowServiceClient,
connectionCloser: connectionCloser,
namespace: options.Namespace,
registry: newRegistry(),
metricsHandler: options.MetricsHandler,
logger: options.Logger,
identity: options.Identity,
dataConverter: options.DataConverter,
contextPropagators: options.ContextPropagators,
workerInterceptors: workerInterceptors,
workflowService: workflowServiceClient,
conn: conn,
namespace: options.Namespace,
registry: newRegistry(),
metricsHandler: options.MetricsHandler,
logger: options.Logger,
identity: options.Identity,
dataConverter: options.DataConverter,
contextPropagators: options.ContextPropagators,
workerInterceptors: workerInterceptors,
excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry,
}

// Create outbound interceptor by wrapping backwards through chain
Expand Down Expand Up @@ -766,19 +793,3 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue {
func NewValues(data *commonpb.Payloads) converter.EncodedValues {
return newEncodedValues(data, nil)
}

func getServerCapabilities(
client workflowservice.WorkflowServiceClient,
) (cap workflowservice.GetSystemInfoResponse_Capabilities, err error) {
ctx, cancel := context.WithTimeout(context.Background(), getSystemInfoTimeout)
defer cancel()
resp, err := client.GetSystemInfo(ctx, &workflowservice.GetSystemInfoRequest{})
// We ignore unimplemented
if _, isUnimplemented := err.(*serviceerror.Unimplemented); err != nil && !isUnimplemented {
return cap, fmt.Errorf("get system info failed: %w - %T", err, err)
}
if resp != nil && resp.Capabilities != nil {
cap = *resp.Capabilities
}
return cap, nil
}
Loading

0 comments on commit ee5ae6f

Please sign in to comment.