Skip to content

Commit

Permalink
Expose gRPC (temporalio#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Dec 2, 2021
1 parent 960d3e9 commit a707cbc
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 61 deletions.
5 changes: 5 additions & 0 deletions client/client.go
Expand Up @@ -364,6 +364,11 @@ type (
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases
// that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the
// service are not configured with internal semantics such as automatic retries.
WorkflowService() workflowservice.WorkflowServiceClient

// Close client and clean up underlying resources.
Close()
}
Expand Down
15 changes: 15 additions & 0 deletions internal/client.go
Expand Up @@ -341,6 +341,11 @@ type (
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases
// that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the
// service are not configured with internal semantics such as automatic retries.
WorkflowService() workflowservice.WorkflowServiceClient

// Close client and clean up underlying resources.
Close()
}
Expand Down Expand Up @@ -498,6 +503,16 @@ type (

// MaxPayloadSize is a number of bytes that gRPC would allow to travel to and from server. Defaults to 64 MB.
MaxPayloadSize int

// Advanced dial options for gRPC connections. These are applied after the internal default dial options are
// applied. Therefore any dial options here may override internal ones.
//
// For gRPC interceptors, internal interceptors such as error handling, metrics, and retrying are done via
// grpc.WithChainUnaryInterceptor. Therefore to add inner interceptors that are wrapped by those, a
// grpc.WithChainUnaryInterceptor can be added as an option here. To add a single outer interceptor, a
// grpc.WithUnaryInterceptor option can be added since grpc.WithUnaryInterceptor is prepended to chains set with
// grpc.WithChainUnaryInterceptor.
DialOptions []grpc.DialOption
}

// StartWorkflowOptions configuration parameters for starting a workflow execution.
Expand Down
4 changes: 4 additions & 0 deletions internal/grpc_dialer.go
Expand Up @@ -121,6 +121,10 @@ func dial(params dialParameters) (*grpc.ClientConn, error) {
}
opts = append(opts, grpc.WithKeepaliveParams(kap))
}

// Append any user-supplied options
opts = append(opts, params.UserConnectionOptions.DialOptions...)

return grpc.Dial(params.HostPort, opts...)
}

Expand Down
53 changes: 53 additions & 0 deletions internal/grpc_dialer_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"errors"
"log"
"net"
"strings"
"testing"

"github.com/gogo/status"
Expand Down Expand Up @@ -124,6 +125,58 @@ func TestHeadersProvider_IncludedWithHeadersProvider(t *testing.T) {
require.Equal(t, 6, len(interceptors))
}

func TestDialOptions(t *testing.T) {
// Start an unimplemented gRPC server
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
srv := grpc.NewServer()
workflowservice.RegisterWorkflowServiceServer(srv, &workflowservice.UnimplementedWorkflowServiceServer{})
healthServer := health.NewServer()
healthServer.SetServingStatus(healthCheckServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(srv, healthServer)
defer srv.Stop()
go func() { _ = srv.Serve(l) }()

// Connect with unary outer and unary inner interceptors
var trace []string
tracer := func(name string) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req interface{},
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
if strings.HasSuffix(method, "/SignalWorkflowExecution") {
trace = append(trace, "begin "+name)
defer func() { trace = append(trace, "end "+name) }()
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}
client, err := NewClient(ClientOptions{
HostPort: l.Addr().String(),
ConnectionOptions: ConnectionOptions{
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(tracer("outer")),
grpc.WithChainUnaryInterceptor(tracer("inner1"), tracer("inner2")),
},
},
})
require.NoError(t, err)
defer client.Close()

// Make call we know will error (ignore error)
_, _ = client.WorkflowService().SignalWorkflowExecution(context.TODO(),
&workflowservice.SignalWorkflowExecutionRequest{})

// Confirm trace
expected := []string{"begin outer", "begin inner1", "begin inner2", "end inner2", "end inner1", "end outer"}
require.Equal(t, expected, trace)
}

func TestCustomResolver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
5 changes: 5 additions & 0 deletions internal/internal_workflow_client.go
Expand Up @@ -730,6 +730,11 @@ func (wc *WorkflowClient) ResetWorkflowExecution(ctx context.Context, request *w
return resp, nil
}

// WorkflowService implements Client.WorkflowService.
func (wc *WorkflowClient) WorkflowService() workflowservice.WorkflowServiceClient {
return wc.workflowService
}

// Close client and clean up underlying resources.
func (wc *WorkflowClient) Close() {
if wc.connectionCloser == nil {
Expand Down
134 changes: 73 additions & 61 deletions mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a707cbc

Please sign in to comment.