Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions internal/temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,8 @@ func NewTemporalWorkerDeploymentCommand(cctx *CommandContext, parent *TemporalWo
s.Command.Long = "Deployment commands perform operations on Worker Deployments:\n\n```\ntemporal worker deployment [command] [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment list\n```\n\nLists the Deployments in the client's namespace.\n\nArguments can be Worker Deployment Versions associated with\na Deployment, specified using the Deployment name and Build ID.\n\nFor example:\n\n```\ntemporal worker deployment set-current-version \\\n --deployment-name YourDeploymentName --build-id YourBuildID\n```\n\nSets the current Deployment Version for a given Deployment."
}
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalWorkerDeploymentCreateCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkerDeploymentCreateVersionCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkerDeploymentDeleteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkerDeploymentDeleteVersionCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkerDeploymentDescribeCommand(cctx, &s).Command)
Expand All @@ -2890,6 +2892,66 @@ func NewTemporalWorkerDeploymentCommand(cctx *CommandContext, parent *TemporalWo
return &s
}

type TemporalWorkerDeploymentCreateCommand struct {
Parent *TemporalWorkerDeploymentCommand
Command cobra.Command
DeploymentNameOptions
}

func NewTemporalWorkerDeploymentCreateCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentCreateCommand {
var s TemporalWorkerDeploymentCreateCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "create [flags]"
s.Command.Short = "Create a new Worker Deployment"
if hasHighlighting {
s.Command.Long = "Create a new Worker Deployment:\n\n\x1b[1mtemporal worker deployment create [options]\x1b[0m\n\nWorker Deployments are lazily created the first time a Worker polls the\nTemporal Server and specifies a VersionOverride. However, if you need to\npre-define a compute configuration (for instance to set up a serverless\nWorker), you need to call \x1b[1mtemporal worker deployment create-version\x1b[0m and\npass in the name of the Worker Deployment. The \x1b[1mtemporal worker\ndeployment create\x1b[0m command allows you to pre-define a Worker Deployment\nso that calls to \x1b[1mtemporal worker deployment create-version\x1b[0m will\nsucceed.\n\nIf a Worker Deployment with the supplied name already exists, this\ncommand will return an error.\n\nNote: This is an experimental feature and may change in the future."
} else {
s.Command.Long = "Create a new Worker Deployment:\n\n```\ntemporal worker deployment create [options]\n```\n\nWorker Deployments are lazily created the first time a Worker polls the\nTemporal Server and specifies a VersionOverride. However, if you need to\npre-define a compute configuration (for instance to set up a serverless\nWorker), you need to call `temporal worker deployment create-version` and\npass in the name of the Worker Deployment. The `temporal worker\ndeployment create` command allows you to pre-define a Worker Deployment\nso that calls to `temporal worker deployment create-version` will\nsucceed.\n\nIf a Worker Deployment with the supplied name already exists, this\ncommand will return an error.\n\nNote: This is an experimental feature and may change in the future."
}
s.Command.Args = cobra.NoArgs
s.DeploymentNameOptions.BuildFlags(s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkerDeploymentCreateVersionCommand struct {
Parent *TemporalWorkerDeploymentCommand
Command cobra.Command
DeploymentVersionOptions
AwsLambdaFunctionArn string
AwsLambdaAssumeRoleArn string
AwsLambdaAssumeRoleExternalId string
}

func NewTemporalWorkerDeploymentCreateVersionCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentCreateVersionCommand {
var s TemporalWorkerDeploymentCreateVersionCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "create-version [flags]"
s.Command.Short = "Create a new Worker Deployment Version"
if hasHighlighting {
s.Command.Long = "\nCreate a new Worker Deployment Version:\n\n\x1b[1mtemporal worker deployment create-version [options]\x1b[0m\n\nConfigure a Worker Deployment Version's compute configuration as needed.\nFor example, pass compute provider information for an AWS Lambda function\nthat spawns a Worker in the Worker Deployment:\n\n\x1b[1mtemporal worker deployment create-version \\\n --namespace YourNamespaceName \\\n --deployment-name YourDeploymentName \\\n --build-id YourBuildID \\\n --aws-lambda-function-arn LambdaFunctionARN \\\n --aws-lambda-assume-role-arn LambdaAssumeRoleARN \\\n --aws-lambda-assume-role-external-id LambdaAssumeRoleExternalID\x1b[0m\n\nIf a Worker Deployment Version with the supplied BuildID already exists,\nthis command will return an error.\n\nNote: This is an experimental feature and may change in the future."
} else {
s.Command.Long = "\nCreate a new Worker Deployment Version:\n\n```\ntemporal worker deployment create-version [options]\n```\n\nConfigure a Worker Deployment Version's compute configuration as needed.\nFor example, pass compute provider information for an AWS Lambda function\nthat spawns a Worker in the Worker Deployment:\n\n```\ntemporal worker deployment create-version \\\n --namespace YourNamespaceName \\\n --deployment-name YourDeploymentName \\\n --build-id YourBuildID \\\n --aws-lambda-function-arn LambdaFunctionARN \\\n --aws-lambda-assume-role-arn LambdaAssumeRoleARN \\\n --aws-lambda-assume-role-external-id LambdaAssumeRoleExternalID\n```\n\nIf a Worker Deployment Version with the supplied BuildID already exists,\nthis command will return an error.\n\nNote: This is an experimental feature and may change in the future."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.AwsLambdaFunctionArn, "aws-lambda-function-arn", "", "Qualified (contains version suffix) or unqualified AWS Lambda function ARN to invoke when there are no active pollers for task queue targets in the Worker Deployment.")
s.Command.Flags().StringVar(&s.AwsLambdaAssumeRoleArn, "aws-lambda-assume-role-arn", "", "AWS IAM role ARN that the Temporal server will assume when invoking the Lambda function that spawns a new Worker in this Worker Deployment Version. Required when --aws-lambda-function-arn is specified.")
s.Command.Flags().StringVar(&s.AwsLambdaAssumeRoleExternalId, "aws-lambda-assume-role-external-id", "", "Temporal server will enforce that the AWS IAM trust policy associated with the AWS IAM role specified in --aws-lambda-assume-role-arn has an aws:ExternalId condition that matches the supplied value. Required when --aws-lambda-function-arn is specified.")
s.DeploymentVersionOptions.BuildFlags(s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkerDeploymentDeleteCommand struct {
Parent *TemporalWorkerDeploymentCommand
Command cobra.Command
Expand Down
196 changes: 196 additions & 0 deletions internal/temporalcli/commands.worker.deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ package temporalcli
import (
"errors"
"fmt"
"slices"
"strings"
"time"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/internal/printer"
"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
computepb "go.temporal.io/api/compute/v1"
"go.temporal.io/api/deployment/v1"
deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/worker"
)

Expand Down Expand Up @@ -111,6 +118,24 @@ type formattedWorkerDeploymentVersionInfoType struct {
DrainageInfo formattedDrainageInfo `json:"drainageInfo"`
TaskQueuesInfos []formattedTaskQueueInfoRowType `json:"taskQueuesInfos"`
Metadata map[string]*common.Payload `json:"metadata"`
ComputeConfig *formattedComputeConfig `json:"computeConfig,omitempty"`
}

type formattedComputeConfig struct {
ScalingGroups map[string]formattedComputeConfigScalingGroup `json:"scalingGroups"`
}

type formattedComputeConfigScalingGroup struct {
Provider *formattedComputeConfigProvider `json:"provider,omitempty"`
Scaler *formattedComputeConfigScaler `json:"scaler,omitempty"`
}

type formattedComputeConfigProvider struct {
Type string `json:"type"`
}

type formattedComputeConfigScaler struct {
Type string `json:"type"`
}

func drainageStatusToStr(drainage client.WorkerDeploymentVersionDrainageStatus) (string, error) {
Expand Down Expand Up @@ -354,6 +379,39 @@ func formatDrainageInfoProto(drainageInfo *deploymentpb.VersionDrainageInfo) (fo
}, nil
}

func formatComputeConfigProto(cc *computepb.ComputeConfig) *formattedComputeConfig {
if cc == nil {
return nil
}
msgSGs := cc.GetScalingGroups()
if len(msgSGs) == 0 {
return nil
}
sgs := make(map[string]formattedComputeConfigScalingGroup, len(msgSGs))
for name, msgSG := range msgSGs {
p := msgSG.GetProvider()
s := msgSG.GetScaler()
if p == nil && s == nil {
continue
}
sg := formattedComputeConfigScalingGroup{}
if p != nil {
sg.Provider = &formattedComputeConfigProvider{
Type: p.GetType(),
}
}
if p != nil {
sg.Scaler = &formattedComputeConfigScaler{
Type: s.GetType(),
}
}
sgs[name] = sg
}
return &formattedComputeConfig{
ScalingGroups: sgs,
}
}

// workerDeploymentVersionInfoProtoToRows converts gRPC proto types to formatted types for display.
func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerDeploymentVersionInfo, taskQueueInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, includeStats bool) (formattedWorkerDeploymentVersionInfoType, error) {
tqi, err := formatTaskQueuesInfosProto(taskQueueInfos, includeStats)
Expand All @@ -366,6 +424,8 @@ func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerD
return formattedWorkerDeploymentVersionInfoType{}, err
}

computeConfig := formatComputeConfigProto(deploymentInfo.GetComputeConfig())

return formattedWorkerDeploymentVersionInfoType{
DeploymentName: deploymentInfo.GetDeploymentVersion().GetDeploymentName(),
BuildID: deploymentInfo.GetDeploymentVersion().GetBuildId(),
Expand All @@ -377,9 +437,28 @@ func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerD
DrainageInfo: drainage,
TaskQueuesInfos: tqi,
Metadata: deploymentInfo.GetMetadata().GetEntries(),
ComputeConfig: computeConfig,
}, nil
}

func computeConfigSummaryStr(cc *computepb.ComputeConfig) string {
if cc == nil {
return ""
}
providers := []string{}
for _, sg := range cc.GetScalingGroups() {
p := sg.GetProvider()
if p == nil {
continue
}
pt := p.GetType()
if !slices.Contains(providers, pt) {
providers = append(providers, pt)
}
}
return strings.Join(providers, ",")
}

// printWorkerDeploymentVersionInfoProto prints worker deployment version info from proto types.
func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo *deploymentpb.WorkerDeploymentVersionInfo, taskQueueInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, msg string, opts printVersionInfoOptions) error {
fDeploymentInfo, err := workerDeploymentVersionInfoProtoToRows(deploymentInfo, taskQueueInfos, opts.showStats)
Expand All @@ -400,6 +479,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo
drainageLastChangedTime = deploymentInfo.GetDrainageInfo().GetLastChangedTime().AsTime()
drainageLastCheckedTime = deploymentInfo.GetDrainageInfo().GetLastCheckedTime().AsTime()
}
computeConfigSummary := computeConfigSummaryStr(deploymentInfo.GetComputeConfig())

printMe := struct {
DeploymentName string
Expand All @@ -413,6 +493,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo
DrainageLastChangedTime time.Time `cli:",cardOmitEmpty"`
DrainageLastCheckedTime time.Time `cli:",cardOmitEmpty"`
Metadata map[string]*common.Payload `cli:",cardOmitEmpty"`
ComputeConfigSummary string `cli:",cardOmitEmpty"`
}{
DeploymentName: deploymentInfo.GetDeploymentVersion().GetDeploymentName(),
BuildID: deploymentInfo.GetDeploymentVersion().GetBuildId(),
Expand All @@ -425,6 +506,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo
DrainageLastChangedTime: drainageLastChangedTime,
DrainageLastCheckedTime: drainageLastCheckedTime,
Metadata: deploymentInfo.GetMetadata().GetEntries(),
ComputeConfigSummary: computeConfigSummary,
}
err := cctx.Printer.PrintStructured(printMe, printer.StructuredOptions{})
if err != nil {
Expand Down Expand Up @@ -601,6 +683,34 @@ func (c *TemporalWorkerDeploymentCommand) getConflictToken(cctx *CommandContext,
return resp.ConflictToken, nil
}

func (c *TemporalWorkerDeploymentCreateCommand) run(cctx *CommandContext, args []string) error {
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
if err != nil {
return err
}
defer cl.Close()

ns := c.Parent.Parent.Namespace
identity := c.Parent.Parent.Identity
deploymentName := c.Name
requestID := uuid.NewString()

request := &workflowservice.CreateWorkerDeploymentRequest{
Namespace: ns,
DeploymentName: deploymentName,
Identity: identity,
RequestId: requestID,
}

_, err = cl.WorkflowService().CreateWorkerDeployment(cctx, request)
if err != nil {
return fmt.Errorf("error creating worker deployment: %w", err)
}

cctx.Printer.Println("Successfully created worker deployment")
return nil
}

func (c *TemporalWorkerDeploymentDescribeCommand) run(cctx *CommandContext, args []string) error {
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
if err != nil {
Expand Down Expand Up @@ -773,6 +883,92 @@ func (c *TemporalWorkerDeploymentManagerIdentityUnsetCommand) run(cctx *CommandC
return nil
}

func validateAWSLambdaProviderDetails(details map[string]any) error {
for _, key := range []string{"arn", "role", "role_external_id"} {
if _, ok := details[key]; !ok {
return fmt.Errorf("missing required AWS Lambda provider detail: %s", key)
}
}
return nil
}

// awsLambdaProviderDetailsPayload returns the encoded Payload representing AWS
// Lambda compute provider details.
func (c *TemporalWorkerDeploymentCreateVersionCommand) awsLambdaProviderDetailsPayload() (*commonpb.Payload, error) {
// Map keys from temporal-auto-scaled-workers:
// https://github.com/temporalio/temporal-auto-scaled-workers/blob/c4a7e69b6504365d7e5326b0b8e6cd95e3293f96/wci/workflow/compute_provider/aws_lambda.go#L16-L20
providerDetails := map[string]any{
"arn": c.AwsLambdaFunctionArn,
}
if c.AwsLambdaAssumeRoleArn != "" {
providerDetails["role"] = c.AwsLambdaAssumeRoleArn
}
if c.AwsLambdaAssumeRoleExternalId != "" {
providerDetails["role_external_id"] = c.AwsLambdaAssumeRoleExternalId
}
err := validateAWSLambdaProviderDetails(providerDetails)
if err != nil {
return nil, err
}
dc := converter.GetDefaultDataConverter()
return dc.ToPayload(&providerDetails)
}

func (c *TemporalWorkerDeploymentCreateVersionCommand) run(cctx *CommandContext, args []string) error {
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
if err != nil {
return err
}
defer cl.Close()

ns := c.Parent.Parent.Namespace
buildID := c.BuildId
identity := c.Parent.Parent.Identity
deploymentName := c.DeploymentName
requestID := uuid.NewString()

var cc *computepb.ComputeConfig
if c.AwsLambdaFunctionArn != "" {
detailsPayload, err := c.awsLambdaProviderDetailsPayload()
if err != nil {
return err
}
cc = &computepb.ComputeConfig{
ScalingGroups: map[string]*computepb.ComputeConfigScalingGroup{
"default": {
Provider: &computepb.ComputeProvider{
Type: "aws-lambda",
Details: detailsPayload,
},
Scaler: &computepb.ComputeScaler{
// Hard-coded: no-sync is the only supported algorithm
// in temporal-auto-scaled-workers as of 2026-04-01.
Type: "no-sync",
},
},
},
}
}
request := &workflowservice.CreateWorkerDeploymentVersionRequest{
Namespace: ns,
DeploymentVersion: &deployment.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildId: buildID,
},
Identity: identity,
ComputeConfig: cc,
RequestId: requestID,
}

_, err = cl.WorkflowService().CreateWorkerDeploymentVersion(cctx, request)
if err != nil {
return fmt.Errorf("error creating worker deployment version: %w", err)
}

cctx.Printer.Println("Successfully created worker deployment version")
return nil
}

func (c *TemporalWorkerDeploymentDeleteVersionCommand) run(cctx *CommandContext, args []string) error {
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
if err != nil {
Expand Down
Loading
Loading