From b53efcdcaf7d85f0f7bc2a59ec67d79fc376214b Mon Sep 17 00:00:00 2001 From: Jay Pipes Date: Tue, 31 Mar 2026 11:55:23 -0400 Subject: [PATCH 1/2] `temporal worker deployment create|create-version` Adds implementation of the `temporal worker deployment create` and `temporal worker deployment create-version` CLI commands using only direct gRPC API calls, not the sdk-go client code. Adds some basic unit tests for both commands though due to the server-side validation of proper AWS IAM credentials, the happy-path `temporal worker deployment create-version` call with AWS Lambda compute config is skipped until such time as we can figure out adding real AWS test fixtures. Signed-off-by: Jay Pipes --- internal/temporalcli/commands.gen.go | 62 ++++ .../temporalcli/commands.worker.deployment.go | 196 ++++++++++ .../commands.worker.deployment_test.go | 340 ++++++++++++++++++ internal/temporalcli/commands.yaml | 79 ++++ 4 files changed, 677 insertions(+) diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index ad01d3083..a06561320 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -2878,6 +2878,8 @@ func NewTemporalWorkerDeploymentCommand(cctx *CommandContext, parent *TemporalWo s.Command.Long = "Deployment commands perform operations on Worker Deployments:\n\n```\ntemporal worker deployment [command] [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment list\n```\n\nLists the Deployments in the client's namespace.\n\nArguments can be Worker Deployment Versions associated with\na Deployment, specified using the Deployment name and Build ID.\n\nFor example:\n\n```\ntemporal worker deployment set-current-version \\\n --deployment-name YourDeploymentName --build-id YourBuildID\n```\n\nSets the current Deployment Version for a given Deployment." } s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalWorkerDeploymentCreateCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkerDeploymentCreateVersionCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkerDeploymentDeleteCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkerDeploymentDeleteVersionCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkerDeploymentDescribeCommand(cctx, &s).Command) @@ -2890,6 +2892,66 @@ func NewTemporalWorkerDeploymentCommand(cctx *CommandContext, parent *TemporalWo return &s } +type TemporalWorkerDeploymentCreateCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + DeploymentNameOptions +} + +func NewTemporalWorkerDeploymentCreateCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentCreateCommand { + var s TemporalWorkerDeploymentCreateCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "create [flags]" + s.Command.Short = "Create a new Worker Deployment" + if hasHighlighting { + s.Command.Long = "Create a new Worker Deployment:\n\n\x1b[1mtemporal worker deployment create [options]\x1b[0m\n\nWorker Deployments are lazily created the first time a Worker polls the\nTemporal Server and specifies a VersionOverride. However, if you need to\npre-define a compute configuration (for instance to set up a serverless\nWorker), you need to call \x1b[1mtemporal worker deployment create-version\x1b[0m and\npass in the name of the Worker Deployment. The \x1b[1mtemporal worker\ndeployment create\x1b[0m command allows you to pre-define a Worker Deployment\nso that calls to \x1b[1mtemporal worker deployment create-version\x1b[0m will\nsucceed.\n\nIf a Worker Deployment with the supplied name already exists, this\ncommand will return an error.\n\nNote: This is an experimental feature and may change in the future." + } else { + s.Command.Long = "Create a new Worker Deployment:\n\n```\ntemporal worker deployment create [options]\n```\n\nWorker Deployments are lazily created the first time a Worker polls the\nTemporal Server and specifies a VersionOverride. However, if you need to\npre-define a compute configuration (for instance to set up a serverless\nWorker), you need to call `temporal worker deployment create-version` and\npass in the name of the Worker Deployment. The `temporal worker\ndeployment create` command allows you to pre-define a Worker Deployment\nso that calls to `temporal worker deployment create-version` will\nsucceed.\n\nIf a Worker Deployment with the supplied name already exists, this\ncommand will return an error.\n\nNote: This is an experimental feature and may change in the future." + } + s.Command.Args = cobra.NoArgs + s.DeploymentNameOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalWorkerDeploymentCreateVersionCommand struct { + Parent *TemporalWorkerDeploymentCommand + Command cobra.Command + DeploymentVersionOptions + AwsLambdaFunctionArn string + AwsLambdaAssumeRoleArn string + AwsLambdaAssumeRoleExternalId string +} + +func NewTemporalWorkerDeploymentCreateVersionCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentCreateVersionCommand { + var s TemporalWorkerDeploymentCreateVersionCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "create-version [flags]" + s.Command.Short = "Create a new Worker Deployment Version" + if hasHighlighting { + s.Command.Long = "\nCreate a new Worker Deployment Version:\n\n\x1b[1mtemporal worker deployment create-version [options]\x1b[0m\n\nConfigure a Worker Deployment Version's compute configuration as needed.\nFor example, pass compute provider information for an AWS Lambda function\nthat spawns a Worker in the Worker Deployment:\n\n\x1b[1mtemporal worker deployment create-version \\\n --namespace YourNamespaceName \\\n --deployment-name YourDeploymentName \\\n --build-id YourBuildID \\\n --aws-lambda-function-arn LambdaFunctionARN \\\n --aws-lambda-assume-role-arn LambdaAssumeRoleARN \\\n --aws-lambda-assume-role-external-id LambdaAssumeRoleExternalID\x1b[0m\n\nIf a Worker Deployment Version with the supplied BuildID already exists,\nthis command will return an error.\n\nNote: This is an experimental feature and may change in the future." + } else { + s.Command.Long = "\nCreate a new Worker Deployment Version:\n\n```\ntemporal worker deployment create-version [options]\n```\n\nConfigure a Worker Deployment Version's compute configuration as needed.\nFor example, pass compute provider information for an AWS Lambda function\nthat spawns a Worker in the Worker Deployment:\n\n```\ntemporal worker deployment create-version \\\n --namespace YourNamespaceName \\\n --deployment-name YourDeploymentName \\\n --build-id YourBuildID \\\n --aws-lambda-function-arn LambdaFunctionARN \\\n --aws-lambda-assume-role-arn LambdaAssumeRoleARN \\\n --aws-lambda-assume-role-external-id LambdaAssumeRoleExternalID\n```\n\nIf a Worker Deployment Version with the supplied BuildID already exists,\nthis command will return an error.\n\nNote: This is an experimental feature and may change in the future." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.AwsLambdaFunctionArn, "aws-lambda-function-arn", "", "Qualified (contains version suffix) or unqualified AWS Lambda function ARN to invoke when there are no active pollers for task queue targets in the Worker Deployment.") + s.Command.Flags().StringVar(&s.AwsLambdaAssumeRoleArn, "aws-lambda-assume-role-arn", "", "AWS IAM role ARN that the Temporal server will assume when invoking the Lambda function that spawns a new Worker in this Worker Deployment Version. Required when --aws-lambda-function-arn is specified.") + s.Command.Flags().StringVar(&s.AwsLambdaAssumeRoleExternalId, "aws-lambda-assume-role-external-id", "", "Temporal server will enforce that the AWS IAM trust policy associated with the AWS IAM role specified in --aws-lambda-assume-role-arn has an aws:ExternalId condition that matches the supplied value. Required when --aws-lambda-function-arn is specified.") + s.DeploymentVersionOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkerDeploymentDeleteCommand struct { Parent *TemporalWorkerDeploymentCommand Command cobra.Command diff --git a/internal/temporalcli/commands.worker.deployment.go b/internal/temporalcli/commands.worker.deployment.go index 26fb497fb..a13d7dfc9 100644 --- a/internal/temporalcli/commands.worker.deployment.go +++ b/internal/temporalcli/commands.worker.deployment.go @@ -3,17 +3,24 @@ package temporalcli import ( "errors" "fmt" + "slices" + "strings" "time" "github.com/fatih/color" + "github.com/google/uuid" "github.com/temporalio/cli/internal/printer" "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" + computepb "go.temporal.io/api/compute/v1" + "go.temporal.io/api/deployment/v1" deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/worker" ) @@ -111,6 +118,24 @@ type formattedWorkerDeploymentVersionInfoType struct { DrainageInfo formattedDrainageInfo `json:"drainageInfo"` TaskQueuesInfos []formattedTaskQueueInfoRowType `json:"taskQueuesInfos"` Metadata map[string]*common.Payload `json:"metadata"` + ComputeConfig *formattedComputeConfig `json:"computeConfig,omitempty"` +} + +type formattedComputeConfig struct { + ScalingGroups map[string]formattedComputeConfigScalingGroup `json:"scalingGroups"` +} + +type formattedComputeConfigScalingGroup struct { + Provider *formattedComputeConfigProvider `json:"provider,omitempty"` + Scaler *formattedComputeConfigScaler `json:"scaler,omitempty"` +} + +type formattedComputeConfigProvider struct { + Type string `json:"type"` +} + +type formattedComputeConfigScaler struct { + Type string `json:"type"` } func drainageStatusToStr(drainage client.WorkerDeploymentVersionDrainageStatus) (string, error) { @@ -354,6 +379,39 @@ func formatDrainageInfoProto(drainageInfo *deploymentpb.VersionDrainageInfo) (fo }, nil } +func formatComputeConfigProto(cc *computepb.ComputeConfig) *formattedComputeConfig { + if cc == nil { + return nil + } + msgSGs := cc.GetScalingGroups() + if len(msgSGs) == 0 { + return nil + } + sgs := make(map[string]formattedComputeConfigScalingGroup, len(msgSGs)) + for name, msgSG := range msgSGs { + p := msgSG.GetProvider() + s := msgSG.GetScaler() + if p == nil && s == nil { + continue + } + sg := formattedComputeConfigScalingGroup{} + if p != nil { + sg.Provider = &formattedComputeConfigProvider{ + Type: p.GetType(), + } + } + if p != nil { + sg.Scaler = &formattedComputeConfigScaler{ + Type: s.GetType(), + } + } + sgs[name] = sg + } + return &formattedComputeConfig{ + ScalingGroups: sgs, + } +} + // workerDeploymentVersionInfoProtoToRows converts gRPC proto types to formatted types for display. func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerDeploymentVersionInfo, taskQueueInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, includeStats bool) (formattedWorkerDeploymentVersionInfoType, error) { tqi, err := formatTaskQueuesInfosProto(taskQueueInfos, includeStats) @@ -366,6 +424,8 @@ func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerD return formattedWorkerDeploymentVersionInfoType{}, err } + computeConfig := formatComputeConfigProto(deploymentInfo.GetComputeConfig()) + return formattedWorkerDeploymentVersionInfoType{ DeploymentName: deploymentInfo.GetDeploymentVersion().GetDeploymentName(), BuildID: deploymentInfo.GetDeploymentVersion().GetBuildId(), @@ -377,9 +437,28 @@ func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerD DrainageInfo: drainage, TaskQueuesInfos: tqi, Metadata: deploymentInfo.GetMetadata().GetEntries(), + ComputeConfig: computeConfig, }, nil } +func computeConfigSummaryStr(cc *computepb.ComputeConfig) string { + if cc == nil { + return "" + } + providers := []string{} + for _, sg := range cc.GetScalingGroups() { + p := sg.GetProvider() + if p == nil { + continue + } + pt := p.GetType() + if !slices.Contains(providers, pt) { + providers = append(providers, pt) + } + } + return strings.Join(providers, ",") +} + // printWorkerDeploymentVersionInfoProto prints worker deployment version info from proto types. func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo *deploymentpb.WorkerDeploymentVersionInfo, taskQueueInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, msg string, opts printVersionInfoOptions) error { fDeploymentInfo, err := workerDeploymentVersionInfoProtoToRows(deploymentInfo, taskQueueInfos, opts.showStats) @@ -400,6 +479,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo drainageLastChangedTime = deploymentInfo.GetDrainageInfo().GetLastChangedTime().AsTime() drainageLastCheckedTime = deploymentInfo.GetDrainageInfo().GetLastCheckedTime().AsTime() } + computeConfigSummary := computeConfigSummaryStr(deploymentInfo.GetComputeConfig()) printMe := struct { DeploymentName string @@ -413,6 +493,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo DrainageLastChangedTime time.Time `cli:",cardOmitEmpty"` DrainageLastCheckedTime time.Time `cli:",cardOmitEmpty"` Metadata map[string]*common.Payload `cli:",cardOmitEmpty"` + ComputeConfigSummary string `cli:",cardOmitEmpty"` }{ DeploymentName: deploymentInfo.GetDeploymentVersion().GetDeploymentName(), BuildID: deploymentInfo.GetDeploymentVersion().GetBuildId(), @@ -425,6 +506,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo DrainageLastChangedTime: drainageLastChangedTime, DrainageLastCheckedTime: drainageLastCheckedTime, Metadata: deploymentInfo.GetMetadata().GetEntries(), + ComputeConfigSummary: computeConfigSummary, } err := cctx.Printer.PrintStructured(printMe, printer.StructuredOptions{}) if err != nil { @@ -601,6 +683,34 @@ func (c *TemporalWorkerDeploymentCommand) getConflictToken(cctx *CommandContext, return resp.ConflictToken, nil } +func (c *TemporalWorkerDeploymentCreateCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + ns := c.Parent.Parent.Namespace + identity := c.Parent.Parent.Identity + deploymentName := c.Name + requestID := uuid.NewString() + + request := &workflowservice.CreateWorkerDeploymentRequest{ + Namespace: ns, + DeploymentName: deploymentName, + Identity: identity, + RequestId: requestID, + } + + _, err = cl.WorkflowService().CreateWorkerDeployment(cctx, request) + if err != nil { + return fmt.Errorf("error creating worker deployment: %w", err) + } + + cctx.Printer.Println("Successfully created worker deployment") + return nil +} + func (c *TemporalWorkerDeploymentDescribeCommand) run(cctx *CommandContext, args []string) error { cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) if err != nil { @@ -773,6 +883,92 @@ func (c *TemporalWorkerDeploymentManagerIdentityUnsetCommand) run(cctx *CommandC return nil } +func validateAWSLambdaProviderDetails(details map[string]any) error { + for _, key := range []string{"arn", "role", "role_external_id"} { + if _, ok := details[key]; !ok { + return fmt.Errorf("missing required AWS Lambda provider detail: %s", key) + } + } + return nil +} + +// awsLambdaProviderDetailsPayload returns the encoded Payload representing AWS +// Lambda compute provider details. +func (c *TemporalWorkerDeploymentCreateVersionCommand) awsLambdaProviderDetailsPayload() (*commonpb.Payload, error) { + // Map keys from temporal-auto-scaled-workers: + // https://github.com/temporalio/temporal-auto-scaled-workers/blob/c4a7e69b6504365d7e5326b0b8e6cd95e3293f96/wci/workflow/compute_provider/aws_lambda.go#L16-L20 + providerDetails := map[string]any{ + "arn": c.AwsLambdaFunctionArn, + } + if c.AwsLambdaAssumeRoleArn != "" { + providerDetails["role"] = c.AwsLambdaAssumeRoleArn + } + if c.AwsLambdaAssumeRoleExternalId != "" { + providerDetails["role_external_id"] = c.AwsLambdaAssumeRoleExternalId + } + err := validateAWSLambdaProviderDetails(providerDetails) + if err != nil { + return nil, err + } + dc := converter.GetDefaultDataConverter() + return dc.ToPayload(&providerDetails) +} + +func (c *TemporalWorkerDeploymentCreateVersionCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + ns := c.Parent.Parent.Namespace + buildID := c.BuildId + identity := c.Parent.Parent.Identity + deploymentName := c.DeploymentName + requestID := uuid.NewString() + + var cc *computepb.ComputeConfig + if c.AwsLambdaFunctionArn != "" { + detailsPayload, err := c.awsLambdaProviderDetailsPayload() + if err != nil { + return err + } + cc = &computepb.ComputeConfig{ + ScalingGroups: map[string]*computepb.ComputeConfigScalingGroup{ + "default": { + Provider: &computepb.ComputeProvider{ + Type: "aws-lambda", + Details: detailsPayload, + }, + Scaler: &computepb.ComputeScaler{ + // Hard-coded: no-sync is the only supported algorithm + // in temporal-auto-scaled-workers as of 2026-04-01. + Type: "no-sync", + }, + }, + }, + } + } + request := &workflowservice.CreateWorkerDeploymentVersionRequest{ + Namespace: ns, + DeploymentVersion: &deployment.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildId: buildID, + }, + Identity: identity, + ComputeConfig: cc, + RequestId: requestID, + } + + _, err = cl.WorkflowService().CreateWorkerDeploymentVersion(cctx, request) + if err != nil { + return fmt.Errorf("error creating worker deployment version: %w", err) + } + + cctx.Printer.Println("Successfully created worker deployment version") + return nil +} + func (c *TemporalWorkerDeploymentDeleteVersionCommand) run(cctx *CommandContext, args []string) error { cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) if err != nil { diff --git a/internal/temporalcli/commands.worker.deployment_test.go b/internal/temporalcli/commands.worker.deployment_test.go index d3073ecee..86aa81c6e 100644 --- a/internal/temporalcli/commands.worker.deployment_test.go +++ b/internal/temporalcli/commands.worker.deployment_test.go @@ -68,6 +68,15 @@ type jsonTaskQueueInfoRowType struct { StatsByPriorityKey map[string]jsonVersionStatsType `json:"statsByPriorityKey,omitempty"` } +type jsonComputeConfigScalingGroupSummary struct { + TaskQueueTypes []string `json:"taskQueueTypes,omitempty"` + ProviderType string `json:"providerType"` +} + +type jsonComputeConfig struct { + ScalingGroups []jsonComputeConfigScalingGroupSummary `json:"scalingGroups,omitempty"` +} + type jsonDeploymentVersionInfoType struct { Version string `json:"version"` CreateTime time.Time `json:"createTime"` @@ -78,6 +87,7 @@ type jsonDeploymentVersionInfoType struct { DrainageInfo jsonDrainageInfo `json:"drainageInfo"` TaskQueuesInfos []jsonTaskQueueInfoRowType `json:"taskQueuesInfos"` Metadata map[string]*common.Payload `json:"metadata"` + ComputeConfig *jsonComputeConfig `json:"computeConfig,omitempty"` } func (s *SharedServerSuite) TestDeployment_Set_Current_Version() { @@ -1057,3 +1067,333 @@ func (s *SharedServerSuite) testDeploymentDescribeVersionTaskQueueStats(withPrio s.NoError(run.Get(s.Context, nil)) } } + +func (s *SharedServerSuite) TestCreateWorkerDeployment() { + deploymentName := uuid.NewString() + + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "create", + "--address", s.Address(), + "--name", deploymentName, + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), "Successfully created worker deployment") + }, 30*time.Second, 100*time.Millisecond) + + // Wait for the deployment to appear + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "describe", + "--address", s.Address(), + "--name", deploymentName, + ) + assert.NoError(t, res.Err) + }, 30*time.Second, 100*time.Millisecond) + + // Attempting to create a WD with the same name should fail with a conflict + // error. + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "create", + "--address", s.Address(), + "--name", deploymentName, + ) + assert.Error(t, res.Err) + assert.ErrorContains(t, res.Err, "already exists") + }, 30*time.Second, 100*time.Millisecond) +} + +func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_EmptyComputeConfig() { + deploymentName := uuid.NewString() + taskQueue := uuid.NewString() + + lazyCreatedBuildID := uuid.NewString() + lazyCreatedVer := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: lazyCreatedBuildID, + } + + // Create worker with explicit versioning. This will end up creating a + // WorkerDeployment with the specified name. We will then manually create a + // worker deployment version using the `temporal worker deployment + // create-version` command. + w1 := worker.New(s.Client, taskQueue, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: lazyCreatedVer, + }, + }) + + // Register a workflow with explicit Pinned versioning behavior to trigger + // creation of the worker deployment. + w1.RegisterWorkflowWithOptions( + func(ctx workflow.Context, input any) (any, error) { + workflow.GetSignalChannel(ctx, "complete-signal").Receive(ctx, nil) + return nil, nil + }, + workflow.RegisterOptions{ + Name: "TestCreateWorkerDeploymentVersion_NoComputeConfig", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }, + ) + + s.NoError(w1.Start()) + + // Wait for the lazily-created deployment to appear + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", lazyCreatedBuildID, + ) + assert.NoError(t, res.Err) + }, 30*time.Second, 100*time.Millisecond) + + // Now that we know the worker deployment exists (because the above + // lazily-created worker deployment version ended up creating it), we will + // manually create a new worker deployment version using the `temporal + // worker deployment create-version` CLI command. + noComputeConfigBuildID := uuid.NewString() + + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", noComputeConfigBuildID, + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), "Successfully created worker deployment version") + }, 30*time.Second, 100*time.Millisecond) + + // Wait for the deployment version to appear + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", noComputeConfigBuildID, + ) + assert.NoError(t, res.Err) + }, 30*time.Second, 100*time.Millisecond) + + // Check that there is no compute config returned for this WDV + res := s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", noComputeConfigBuildID, + "--output", "json", + ) + s.NoError(res.Err) + var jsonOut jsonDeploymentVersionInfoType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Nil(jsonOut.ComputeConfig, "ComputeConfig should be nil.") + + // Attempting to create a WDV with the same BuildID should fail with a + // conflict error. + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", noComputeConfigBuildID, + ) + assert.Error(t, res.Err) + assert.ErrorContains(t, res.Err, "already exists") + }, 30*time.Second, 100*time.Millisecond) +} + +func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_Errors() { + deploymentName := uuid.NewString() + taskQueue := uuid.NewString() + + lazyCreatedBuildID := uuid.NewString() + lazyCreatedVer := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: lazyCreatedBuildID, + } + + // Create worker with explicit versioning. This will end up creating a + // WorkerDeployment with the specified name. We will then manually create a + // worker deployment version using the `temporal worker deployment + // create-version` command. + w1 := worker.New(s.Client, taskQueue, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: lazyCreatedVer, + }, + }) + + // Register a workflow with explicit Pinned versioning behavior to trigger + // creation of the worker deployment. + w1.RegisterWorkflowWithOptions( + func(ctx workflow.Context, input any) (any, error) { + workflow.GetSignalChannel(ctx, "complete-signal").Receive(ctx, nil) + return nil, nil + }, + workflow.RegisterOptions{ + Name: "TestCreateWorkerDeploymentVersion_Errors", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }, + ) + + s.NoError(w1.Start()) + + // Wait for the lazily-created deployment to appear + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", lazyCreatedBuildID, + ) + assert.NoError(t, res.Err) + }, 30*time.Second, 100*time.Millisecond) + + // Create some WDVs with invalid compute config parameters. + assumeRoleFailureBuildID := uuid.NewString() + + invokeARN := "arn:aws:lambda:us-east-1:123456789012:function:MyExampleFunction:1" + assumeRoleARN := "arn:aws:iam::123456789012:role/MyServiceRole" + assumeRoleExternalID := "external-id" + + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", assumeRoleFailureBuildID, + "--aws-lambda-function-arn", invokeARN, + "--aws-lambda-assume-role-arn", assumeRoleARN, + "--aws-lambda-assume-role-external-id", assumeRoleExternalID, + ) + assert.Error(t, res.Err) + assert.ErrorContains(t, res.Err, "failed to assume role arn:aws:iam::123456789012:role/MyServiceRole: operation error STS: AssumeRole") + }, 90*time.Second, 100*time.Millisecond) + + missingExternalIDBuildID := uuid.NewString() + + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", missingExternalIDBuildID, + "--aws-lambda-function-arn", invokeARN, + "--aws-lambda-assume-role-arn", assumeRoleARN, + ) + assert.Error(t, res.Err) + assert.ErrorContains(t, res.Err, "missing required AWS Lambda provider detail: role_external_id") + }, 30*time.Second, 100*time.Millisecond) + + missingAssumeRoleBuildID := uuid.NewString() + + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", missingAssumeRoleBuildID, + "--aws-lambda-function-arn", invokeARN, + ) + assert.Error(t, res.Err) + assert.ErrorContains(t, res.Err, "missing required AWS Lambda provider detail: role") + }, 30*time.Second, 100*time.Millisecond) +} + +// TODO(jaypipes): Enable this test when we have a way of ensuring AWS resource +// fixtures since the CLI test harness uses a real Temporal Server and a real +// Temporal Server validates any supplied AWS Lambda Function and Assume Role +// ARNs are good... +func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_LambdaComputeConfig() { + s.T().Skip("AWS Lambda Function and Assume Role fixtures needed.") + deploymentName := uuid.NewString() + taskQueue := uuid.NewString() + + lazyCreatedBuildID := uuid.NewString() + lazyCreatedVer := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: lazyCreatedBuildID, + } + + // Create worker with explicit versioning. This will end up creating a + // WorkerDeployment with the specified name. We will then manually create a + // worker deployment version using the `temporal worker deployment + // create-version` command. + w1 := worker.New(s.Client, taskQueue, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: lazyCreatedVer, + }, + }) + + // Register a workflow with explicit Pinned versioning behavior to trigger + // creation of the worker deployment. + w1.RegisterWorkflowWithOptions( + func(ctx workflow.Context, input any) (any, error) { + workflow.GetSignalChannel(ctx, "complete-signal").Receive(ctx, nil) + return nil, nil + }, + workflow.RegisterOptions{ + Name: "TestCreateWorkerDeploymentVersion_LambdaComputeConfig", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }, + ) + + s.NoError(w1.Start()) + + // Now that we know the worker deployment exists (because the above + // lazily-created worker deployment version ended up creating it), we will + // manually create a new worker deployment version using the `temporal + // worker deployment create-version` CLI command. + // + // Create a WDV with a valid Compute Config specified and verify that the + // compute config provider is displayed in the output of `temporal worker + // deployment describe-version` + computeConfigBuildID := uuid.NewString() + + invokeARN := "arn:aws:lambda:us-east-1:123456789012:function:MyExampleFunction:1" + assumeRoleARN := "arn:aws:iam::123456789012:role/MyServiceRole" + assumeRoleExternalID := "external-id" + + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", computeConfigBuildID, + "--aws-lambda-function-arn", invokeARN, + "--aws-lambda-assume-role-arn", assumeRoleARN, + "--aws-lambda-assume-role-external-id", assumeRoleExternalID, + ) + assert.NoError(t, res.Err) + assert.Contains(t, res.Stdout.String(), "Successfully created worker deployment version") + }, 30*time.Second, 100*time.Millisecond) + + // Wait for the deployment version to appear + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", computeConfigBuildID, + ) + assert.NoError(t, res.Err) + }, 30*time.Second, 100*time.Millisecond) + + // Check that there is a compute config returned for this WDV + res := s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", computeConfigBuildID, + "--output", "json", + ) + s.NoError(res.Err) + jsonOut := jsonDeploymentVersionInfoType{} + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.NotNil(jsonOut.ComputeConfig, "ComputeConfig should not be nil.") +} diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index 5f4294f8f..87da2a4b4 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -788,9 +788,11 @@ commands: delete-version, set-current-version, or set-ramping-version. keywords: - worker deployment + - worker deployment create - worker deployment describe - worker deployment list - worker deployment delete + - worker deployment create-version - worker deployment describe-version - worker deployment set-current-version - worker deployment set-ramping-version @@ -798,6 +800,31 @@ commands: - worker deployment update-version-metadata - worker deployment manager-identity + - name: temporal worker deployment create + summary: Create a new Worker Deployment + description: | + Create a new Worker Deployment: + + ``` + temporal worker deployment create [options] + ``` + + Worker Deployments are lazily created the first time a Worker polls the + Temporal Server and specifies a VersionOverride. However, if you need to + pre-define a compute configuration (for instance to set up a serverless + Worker), you need to call `temporal worker deployment create-version` and + pass in the name of the Worker Deployment. The `temporal worker + deployment create` command allows you to pre-define a Worker Deployment + so that calls to `temporal worker deployment create-version` will + succeed. + + If a Worker Deployment with the supplied name already exists, this + command will return an error. + + Note: This is an experimental feature and may change in the future. + option-sets: + - deployment-name + - name: temporal worker deployment describe summary: Show properties of a Worker Deployment description: | @@ -856,6 +883,58 @@ commands: --namespace YourDeploymentNamespace ``` + - name: temporal worker deployment create-version + summary: Create a new Worker Deployment Version + description: | + + Create a new Worker Deployment Version: + + ``` + temporal worker deployment create-version [options] + ``` + + Configure a Worker Deployment Version's compute configuration as needed. + For example, pass compute provider information for an AWS Lambda function + that spawns a Worker in the Worker Deployment: + + ``` + temporal worker deployment create-version \ + --namespace YourNamespaceName \ + --deployment-name YourDeploymentName \ + --build-id YourBuildID \ + --aws-lambda-function-arn LambdaFunctionARN \ + --aws-lambda-assume-role-arn LambdaAssumeRoleARN \ + --aws-lambda-assume-role-external-id LambdaAssumeRoleExternalID + ``` + + If a Worker Deployment Version with the supplied BuildID already exists, + this command will return an error. + + Note: This is an experimental feature and may change in the future. + option-sets: + - deployment-version + options: + - name: aws-lambda-function-arn + type: string + description: | + Qualified (contains version suffix) or unqualified AWS Lambda + function ARN to invoke when there are no active pollers for task + queue targets in the Worker Deployment. + - name: aws-lambda-assume-role-arn + type: string + description: | + AWS IAM role ARN that the Temporal server will assume when invoking + the Lambda function that spawns a new Worker in this Worker + Deployment Version. Required when --aws-lambda-function-arn is + specified. + - name: aws-lambda-assume-role-external-id + type: string + description: | + Temporal server will enforce that the AWS IAM trust policy associated + with the AWS IAM role specified in --aws-lambda-assume-role-arn has + an aws:ExternalId condition that matches the supplied value. Required + when --aws-lambda-function-arn is specified. + - name: temporal worker deployment describe-version summary: Show properties of a Worker Deployment Version description: | From 409612487e3327564d727a7c6374fbdefe79e8a9 Mon Sep 17 00:00:00 2001 From: Jay Pipes Date: Wed, 22 Apr 2026 16:22:11 -0400 Subject: [PATCH 2/2] fix retries in worker deployment unit tests The previous code placed mutating `temporal` CLI calls inside calls to `SharedServerSuite.EventuallyWithT`. This meant that calls to `temporal worker deployment create` or `temporal worker deployment create-version` were erroneously being retried in a tight loop. The asserting read-only calls such as `temporal worker deployment describe-version` are the things that need to be in the EventuallyWithT retrying closure, not the mutating calls. Signed-off-by: Jay Pipes --- .../commands.worker.deployment_test.go | 162 ++++++++---------- 1 file changed, 73 insertions(+), 89 deletions(-) diff --git a/internal/temporalcli/commands.worker.deployment_test.go b/internal/temporalcli/commands.worker.deployment_test.go index 86aa81c6e..a3963dd23 100644 --- a/internal/temporalcli/commands.worker.deployment_test.go +++ b/internal/temporalcli/commands.worker.deployment_test.go @@ -1071,15 +1071,13 @@ func (s *SharedServerSuite) testDeploymentDescribeVersionTaskQueueStats(withPrio func (s *SharedServerSuite) TestCreateWorkerDeployment() { deploymentName := uuid.NewString() - s.EventuallyWithT(func(t *assert.CollectT) { - res := s.Execute( - "worker", "deployment", "create", - "--address", s.Address(), - "--name", deploymentName, - ) - assert.NoError(t, res.Err) - assert.Contains(t, res.Stdout.String(), "Successfully created worker deployment") - }, 30*time.Second, 100*time.Millisecond) + res := s.Execute( + "worker", "deployment", "create", + "--address", s.Address(), + "--name", deploymentName, + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "Successfully created worker deployment") // Wait for the deployment to appear s.EventuallyWithT(func(t *assert.CollectT) { @@ -1093,15 +1091,13 @@ func (s *SharedServerSuite) TestCreateWorkerDeployment() { // Attempting to create a WD with the same name should fail with a conflict // error. - s.EventuallyWithT(func(t *assert.CollectT) { - res := s.Execute( - "worker", "deployment", "create", - "--address", s.Address(), - "--name", deploymentName, - ) - assert.Error(t, res.Err) - assert.ErrorContains(t, res.Err, "already exists") - }, 30*time.Second, 100*time.Millisecond) + res = s.Execute( + "worker", "deployment", "create", + "--address", s.Address(), + "--name", deploymentName, + ) + s.Error(res.Err) + s.ErrorContains(res.Err, "already exists") } func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_EmptyComputeConfig() { @@ -1157,16 +1153,14 @@ func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_EmptyComputeConfig // worker deployment create-version` CLI command. noComputeConfigBuildID := uuid.NewString() - s.EventuallyWithT(func(t *assert.CollectT) { - res := s.Execute( - "worker", "deployment", "create-version", - "--address", s.Address(), - "--deployment-name", deploymentName, - "--build-id", noComputeConfigBuildID, - ) - assert.NoError(t, res.Err) - assert.Contains(t, res.Stdout.String(), "Successfully created worker deployment version") - }, 30*time.Second, 100*time.Millisecond) + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", noComputeConfigBuildID, + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "Successfully created worker deployment version") // Wait for the deployment version to appear s.EventuallyWithT(func(t *assert.CollectT) { @@ -1180,7 +1174,7 @@ func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_EmptyComputeConfig }, 30*time.Second, 100*time.Millisecond) // Check that there is no compute config returned for this WDV - res := s.Execute( + res = s.Execute( "worker", "deployment", "describe-version", "--address", s.Address(), "--deployment-name", deploymentName, @@ -1194,16 +1188,14 @@ func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_EmptyComputeConfig // Attempting to create a WDV with the same BuildID should fail with a // conflict error. - s.EventuallyWithT(func(t *assert.CollectT) { - res := s.Execute( - "worker", "deployment", "create-version", - "--address", s.Address(), - "--deployment-name", deploymentName, - "--build-id", noComputeConfigBuildID, - ) - assert.Error(t, res.Err) - assert.ErrorContains(t, res.Err, "already exists") - }, 30*time.Second, 100*time.Millisecond) + res = s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", noComputeConfigBuildID, + ) + s.Error(res.Err) + s.ErrorContains(res.Err, "already exists") } func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_Errors() { @@ -1260,48 +1252,42 @@ func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_Errors() { assumeRoleARN := "arn:aws:iam::123456789012:role/MyServiceRole" assumeRoleExternalID := "external-id" - s.EventuallyWithT(func(t *assert.CollectT) { - res := s.Execute( - "worker", "deployment", "create-version", - "--address", s.Address(), - "--deployment-name", deploymentName, - "--build-id", assumeRoleFailureBuildID, - "--aws-lambda-function-arn", invokeARN, - "--aws-lambda-assume-role-arn", assumeRoleARN, - "--aws-lambda-assume-role-external-id", assumeRoleExternalID, - ) - assert.Error(t, res.Err) - assert.ErrorContains(t, res.Err, "failed to assume role arn:aws:iam::123456789012:role/MyServiceRole: operation error STS: AssumeRole") - }, 90*time.Second, 100*time.Millisecond) + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", assumeRoleFailureBuildID, + "--aws-lambda-function-arn", invokeARN, + "--aws-lambda-assume-role-arn", assumeRoleARN, + "--aws-lambda-assume-role-external-id", assumeRoleExternalID, + ) + s.Error(res.Err) + s.ErrorContains(res.Err, "failed to assume role arn:aws:iam::123456789012:role/MyServiceRole: operation error STS: AssumeRole") missingExternalIDBuildID := uuid.NewString() - s.EventuallyWithT(func(t *assert.CollectT) { - res := s.Execute( - "worker", "deployment", "create-version", - "--address", s.Address(), - "--deployment-name", deploymentName, - "--build-id", missingExternalIDBuildID, - "--aws-lambda-function-arn", invokeARN, - "--aws-lambda-assume-role-arn", assumeRoleARN, - ) - assert.Error(t, res.Err) - assert.ErrorContains(t, res.Err, "missing required AWS Lambda provider detail: role_external_id") - }, 30*time.Second, 100*time.Millisecond) + res = s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", missingExternalIDBuildID, + "--aws-lambda-function-arn", invokeARN, + "--aws-lambda-assume-role-arn", assumeRoleARN, + ) + s.Error(res.Err) + s.ErrorContains(res.Err, "missing required AWS Lambda provider detail: role_external_id") missingAssumeRoleBuildID := uuid.NewString() - s.EventuallyWithT(func(t *assert.CollectT) { - res := s.Execute( - "worker", "deployment", "create-version", - "--address", s.Address(), - "--deployment-name", deploymentName, - "--build-id", missingAssumeRoleBuildID, - "--aws-lambda-function-arn", invokeARN, - ) - assert.Error(t, res.Err) - assert.ErrorContains(t, res.Err, "missing required AWS Lambda provider detail: role") - }, 30*time.Second, 100*time.Millisecond) + res = s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", missingAssumeRoleBuildID, + "--aws-lambda-function-arn", invokeARN, + ) + s.Error(res.Err) + s.ErrorContains(res.Err, "missing required AWS Lambda provider detail: role") } // TODO(jaypipes): Enable this test when we have a way of ensuring AWS resource @@ -1359,19 +1345,17 @@ func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_LambdaComputeConfi assumeRoleARN := "arn:aws:iam::123456789012:role/MyServiceRole" assumeRoleExternalID := "external-id" - s.EventuallyWithT(func(t *assert.CollectT) { - res := s.Execute( - "worker", "deployment", "create-version", - "--address", s.Address(), - "--deployment-name", deploymentName, - "--build-id", computeConfigBuildID, - "--aws-lambda-function-arn", invokeARN, - "--aws-lambda-assume-role-arn", assumeRoleARN, - "--aws-lambda-assume-role-external-id", assumeRoleExternalID, - ) - assert.NoError(t, res.Err) - assert.Contains(t, res.Stdout.String(), "Successfully created worker deployment version") - }, 30*time.Second, 100*time.Millisecond) + res := s.Execute( + "worker", "deployment", "create-version", + "--address", s.Address(), + "--deployment-name", deploymentName, + "--build-id", computeConfigBuildID, + "--aws-lambda-function-arn", invokeARN, + "--aws-lambda-assume-role-arn", assumeRoleARN, + "--aws-lambda-assume-role-external-id", assumeRoleExternalID, + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "Successfully created worker deployment version") // Wait for the deployment version to appear s.EventuallyWithT(func(t *assert.CollectT) { @@ -1385,7 +1369,7 @@ func (s *SharedServerSuite) TestCreateWorkerDeploymentVersion_LambdaComputeConfi }, 30*time.Second, 100*time.Millisecond) // Check that there is a compute config returned for this WDV - res := s.Execute( + res = s.Execute( "worker", "deployment", "describe-version", "--address", s.Address(), "--deployment-name", deploymentName,