Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental cloud operations client #1462

Merged
merged 11 commits into from
Jul 8, 2024
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ jobs:
TEMPORAL_NAMESPACE: sdk-ci.a2dd6
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
TEMPORAL_CLIENT_CLOUD_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
TEMPORAL_CLIENT_CLOUD_API_VERSION: 2024-05-13-00
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -114,6 +116,9 @@ jobs:
- name: Single integration test against cloud
run: 'go test -v --count 1 -p 1 . -run "TestIntegrationSuite/TestBasic$"'
working-directory: test
- name: Cloud operations tests
run: 'go test -v --count 1 -p 1 . -run "TestCloudOperationsSuite/.*" -cloud-operations-tests'
working-directory: test

features-test:
uses: temporalio/features/.github/workflows/go.yaml@main
Expand Down
35 changes: 31 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"crypto/tls"
"io"

"go.temporal.io/api/cloud/cloudservice/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
Expand Down Expand Up @@ -146,6 +147,11 @@ type (
// Options are optional parameters for Client creation.
Options = internal.ClientOptions

// CloudOperationsClientOptions are parameters for CloudOperationsClient creation.
//
// WARNING: Cloud operations client is currently experimental.
CloudOperationsClientOptions = internal.CloudOperationsClientOptions

// ConnectionOptions are optional parameters that can be specified in ClientOptions
ConnectionOptions = internal.ConnectionOptions

Expand Down Expand Up @@ -830,6 +836,17 @@ type (
Close()
}

// CloudOperationsClient is the client for cloud operations.
//
// WARNING: Cloud operations client is currently experimental.
CloudOperationsClient interface {
// CloudService provides access to the underlying gRPC service.
CloudService() cloudservice.CloudServiceClient

// Close client and clean up underlying resources.
Close()
}

// NamespaceClient is the client for managing operations on the namespace.
// CLI, tools, ... can use this layer to manager operations on namespace.
NamespaceClient interface {
Expand Down Expand Up @@ -946,6 +963,14 @@ func NewClientFromExistingWithContext(ctx context.Context, existingClient Client
return internal.NewClientFromExisting(ctx, existingClient, options)
}

// DialCloudOperationsClient creates a cloud client to perform cloud-management
// operations. Users should provide Credentials in the options.
//
// WARNING: Cloud operations client is currently experimental.
func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClientOptions) (CloudOperationsClient, error) {
return internal.DialCloudOperationsClient(ctx, options)
}

// NewNamespaceClient creates an instance of a namespace client, to manage
// lifecycle of namespaces. This will not attempt to connect to the server
// eagerly and therefore may not fail for an unreachable server until a call is
Expand All @@ -956,10 +981,12 @@ func NewNamespaceClient(options Options) (NamespaceClient, error) {

// make sure if new methods are added to internal.Client they are also added to public Client.
var (
_ Client = internal.Client(nil)
_ internal.Client = Client(nil)
_ NamespaceClient = internal.NamespaceClient(nil)
_ internal.NamespaceClient = NamespaceClient(nil)
_ Client = internal.Client(nil)
_ internal.Client = Client(nil)
_ CloudOperationsClient = internal.CloudOperationsClient(nil)
_ internal.CloudOperationsClient = CloudOperationsClient(nil)
_ NamespaceClient = internal.NamespaceClient(nil)
_ internal.NamespaceClient = NamespaceClient(nil)
)

// NewValue creates a new [converter.EncodedValue] which can be used to decode binary data returned by Temporal. For example:
Expand Down
112 changes: 104 additions & 8 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sync/atomic"
"time"

"go.temporal.io/api/cloud/cloudservice/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -489,6 +490,48 @@ type (
DisableErrorCodeMetricTags bool
}

CloudOperationsClient interface {
CloudService() cloudservice.CloudServiceClient
Close()
}

// CloudOperationsClientOptions are parameters for CloudOperationsClient creation.
//
// WARNING: Cloud operations client is currently experimental.
CloudOperationsClientOptions struct {
// Optional: The credentials for this client. This is essentially required.
// See [go.temporal.io/sdk/client.NewAPIKeyStaticCredentials],
// [go.temporal.io/sdk/client.NewAPIKeyDynamicCredentials], and
// [go.temporal.io/sdk/client.NewMTLSCredentials].
// Default: No credentials.
Credentials Credentials

// Optional: Version header for safer mutations. May or may not be required
// depending on cloud settings.
// Default: No header.
Version string

// Optional: Advanced server connection options such as TLS settings. Not
// usually needed.
ConnectionOptions ConnectionOptions

// Optional: Logger framework can use to log.
// Default: Default logger provided.
Logger log.Logger

// Optional: Metrics handler for reporting metrics.
// Default: No metrics
MetricsHandler metrics.Handler

// Optional: Overrides the specific host to connect to. Not usually needed.
// Default: saas-api.tmprl.cloud:443
HostPort string

// Optional: Disable TLS.
// Default: false (i.e. TLS enabled)
DisableTLS bool
}

// HeadersProvider returns a map of gRPC headers that should be used on every request.
HeadersProvider interface {
GetHeaders(ctx context.Context) (map[string]string, error)
Expand Down Expand Up @@ -728,7 +771,7 @@ type (

// Credentials are optional credentials that can be specified in ClientOptions.
type Credentials interface {
applyToOptions(*ClientOptions) error
applyToOptions(*ConnectionOptions) error
// Can return nil to have no interceptor
gRPCInterceptor() grpc.UnaryClientInterceptor
}
Expand Down Expand Up @@ -783,7 +826,7 @@ func newClient(ctx context.Context, options ClientOptions, existing *WorkflowCli
}

if options.Credentials != nil {
if err := options.Credentials.applyToOptions(&options); err != nil {
if err := options.Credentials.applyToOptions(&options.ConnectionOptions); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -897,6 +940,59 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
return client
}

// DialCloudOperationsClient creates a cloud client to perform cloud-management
// operations.
func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClientOptions) (CloudOperationsClient, error) {
// Set defaults
if options.MetricsHandler == nil {
options.MetricsHandler = metrics.NopHandler
}
if options.Logger == nil {
options.Logger = ilog.NewDefaultLogger()
}
if options.HostPort == "" {
options.HostPort = "saas-api.tmprl.cloud:443"
}
if options.Version != "" {
options.ConnectionOptions.DialOptions = append(
options.ConnectionOptions.DialOptions,
grpc.WithChainUnaryInterceptor(func(
ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, "temporal-cloud-api-version", options.Version)
return invoker(ctx, method, req, reply, cc, opts...)
}),
)
}
if options.Credentials != nil {
if err := options.Credentials.applyToOptions(&options.ConnectionOptions); err != nil {
return nil, err
}
}
if options.ConnectionOptions.TLS == nil && !options.DisableTLS {
options.ConnectionOptions.TLS = &tls.Config{}
}
// Exclude internal from retry by default
options.ConnectionOptions.excludeInternalFromRetry = &atomic.Bool{}
options.ConnectionOptions.excludeInternalFromRetry.Store(true)
// TODO(cretz): Pass through context on dial
conn, err := dial(newDialParameters(&ClientOptions{
HostPort: options.HostPort,
ConnectionOptions: options.ConnectionOptions,
MetricsHandler: options.MetricsHandler,
Credentials: options.Credentials,
}, options.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
}
return &cloudOperationsClient{
conn: conn,
logger: options.Logger,
cloudServiceClient: cloudservice.NewCloudServiceClient(conn),
}, nil
}

// NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces.
func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) {
// Initialize root tags
Expand Down Expand Up @@ -964,7 +1060,7 @@ func NewAPIKeyDynamicCredentials(apiKeyCallback func(context.Context) (string, e
return apiKeyCredentials(apiKeyCallback)
}

func (apiKeyCredentials) applyToOptions(*ClientOptions) error { return nil }
func (apiKeyCredentials) applyToOptions(*ConnectionOptions) error { return nil }

func (a apiKeyCredentials) gRPCInterceptor() grpc.UnaryClientInterceptor { return a.gRPCIntercept }

Expand Down Expand Up @@ -992,13 +1088,13 @@ type mTLSCredentials tls.Certificate

func NewMTLSCredentials(certificate tls.Certificate) Credentials { return mTLSCredentials(certificate) }

func (m mTLSCredentials) applyToOptions(opts *ClientOptions) error {
if opts.ConnectionOptions.TLS == nil {
opts.ConnectionOptions.TLS = &tls.Config{}
} else if len(opts.ConnectionOptions.TLS.Certificates) != 0 {
func (m mTLSCredentials) applyToOptions(opts *ConnectionOptions) error {
if opts.TLS == nil {
opts.TLS = &tls.Config{}
} else if len(opts.TLS.Certificates) != 0 {
return fmt.Errorf("cannot apply mTLS credentials, certificates already exist on TLS options")
}
opts.ConnectionOptions.TLS.Certificates = append(opts.ConnectionOptions.TLS.Certificates, tls.Certificate(m))
opts.TLS.Certificates = append(opts.TLS.Certificates, tls.Certificate(m))
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,14 @@ func TestCredentialsMTLS(t *testing.T) {
// No TLS set
var clientOptions ClientOptions
creds := NewMTLSCredentials(tls.Certificate{Certificate: [][]byte{[]byte("somedata1")}})
require.NoError(t, creds.applyToOptions(&clientOptions))
require.NoError(t, creds.applyToOptions(&clientOptions.ConnectionOptions))
require.Equal(t, "somedata1", string(clientOptions.ConnectionOptions.TLS.Certificates[0].Certificate[0]))

// TLS already set
clientOptions = ClientOptions{}
clientOptions.ConnectionOptions.TLS = &tls.Config{ServerName: "my-server-name"}
creds = NewMTLSCredentials(tls.Certificate{Certificate: [][]byte{[]byte("somedata2")}})
require.NoError(t, creds.applyToOptions(&clientOptions))
require.NoError(t, creds.applyToOptions(&clientOptions.ConnectionOptions))
require.Equal(t, "my-server-name", clientOptions.ConnectionOptions.TLS.ServerName)
require.Equal(t, "somedata2", string(clientOptions.ConnectionOptions.TLS.Certificates[0].Certificate[0]))

Expand All @@ -553,7 +553,7 @@ func TestCredentialsMTLS(t *testing.T) {
Certificates: []tls.Certificate{{Certificate: [][]byte{[]byte("somedata3")}}},
}
creds = NewMTLSCredentials(tls.Certificate{Certificate: [][]byte{[]byte("somedata4")}})
require.Error(t, creds.applyToOptions(&clientOptions))
require.Error(t, creds.applyToOptions(&clientOptions.ConnectionOptions))
}

type testGRPCServer struct {
Expand Down
18 changes: 18 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

"go.temporal.io/api/cloud/cloudservice/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
Expand Down Expand Up @@ -100,6 +101,13 @@ type (
unclosedClients *int32
}

// cloudOperationsClient is the client for managing cloud.
cloudOperationsClient struct {
conn *grpc.ClientConn
logger log.Logger
cloudServiceClient cloudservice.CloudServiceClient
}

// namespaceClient is the client for managing namespaces.
namespaceClient struct {
workflowService workflowservice.WorkflowServiceClient
Expand Down Expand Up @@ -1289,6 +1297,16 @@ func (wc *WorkflowClient) Close() {
}
}

func (c *cloudOperationsClient) CloudService() cloudservice.CloudServiceClient {
return c.cloudServiceClient
}

func (c *cloudOperationsClient) Close() {
if err := c.conn.Close(); err != nil {
c.logger.Warn("unable to close connection", tagError, err)
}
}

// Register a namespace with temporal server
// The errors it can throw:
// - NamespaceAlreadyExistsError
Expand Down
Loading
Loading