From 20854b804f1853ae4c5b9c0a345c8c262602ae5f Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 22 Jun 2023 12:27:11 -0700 Subject: [PATCH] Implement update/get worker versioning commands (#166) --- .golangci.yml | 1 + app/app_test.go | 3 +- app/build_id_compat_cli_test.go | 125 +++++++++++++++++++++ common/defs-cmds.go | 27 ++++- common/defs-flags.go | 11 ++ common/flags.go | 5 + common/util.go | 14 +++ common/util_test.go | 17 +++ go.mod | 2 +- go.sum | 4 +- headers/headers.go | 6 +- taskqueue/task_queue.go | 170 +++++++++++++++++++++++++++- taskqueue/task_queue_commands.go | 184 +++++++++++++++++++++++++++++++ tests/e2e_test.go | 39 +------ tests/workflow_test.go | 1 + workflow/workflow_commands.go | 28 ++--- 16 files changed, 575 insertions(+), 62 deletions(-) create mode 100644 app/build_id_compat_cli_test.go diff --git a/.golangci.yml b/.golangci.yml index 9e72900d..41ca92fb 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -82,6 +82,7 @@ linters-settings: - name: unhandled-error arguments: - "fmt.Printf" + - "fmt.Println" issues: # Exclude cyclomatic and cognitive complexity rules for functional tests in the `tests` root directory. exclude-rules: diff --git a/app/app_test.go b/app/app_test.go index abcee95b..babb908f 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -305,7 +305,7 @@ func newServerAndClientOpts(port int, customArgs ...string) ([]string, sdkclient } } -func assertServerHealth(ctx context.Context, t *testing.T, opts sdkclient.Options) { +func assertServerHealth(ctx context.Context, t *testing.T, opts sdkclient.Options) sdkclient.Client { var ( c sdkclient.Client clientErr error @@ -339,6 +339,7 @@ func assertServerHealth(ctx context.Context, t *testing.T, opts sdkclient.Option } time.Sleep(time.Millisecond * 100) } + return c } func TestCreateDataDirectory_MissingDirectory(t *testing.T) { diff --git a/app/build_id_compat_cli_test.go b/app/build_id_compat_cli_test.go new file mode 100644 index 00000000..f0015eb8 --- /dev/null +++ b/app/build_id_compat_cli_test.go @@ -0,0 +1,125 @@ +package app_test + +import ( + "context" + "fmt" + "testing" + + "github.com/temporalio/cli/common" + + "github.com/stretchr/testify/suite" + "github.com/temporalio/cli/app" + sconfig "github.com/temporalio/cli/server/config" + "github.com/urfave/cli/v2" + "go.temporal.io/sdk/client" +) + +type buildIdCompatSuite struct { + suite.Suite + app *cli.App + stopServerCancel context.CancelFunc + client client.Client + port int + writer *common.MemWriter +} + +func TestBuildIdCompatSuite(t *testing.T) { + suite.Run(t, new(buildIdCompatSuite)) +} + +func (s *buildIdCompatSuite) SetupSuite() { + s.app = app.BuildApp() + mw := &common.MemWriter{} + s.app.Writer = mw + s.writer = mw + // Don't call os.Exit + s.app.ExitErrHandler = func(_ *cli.Context, _ error) {} + portProvider := sconfig.NewPortProvider() + port := portProvider.MustGetFreePort() + s.port = port + portProvider.Close() + ctx, cancel := context.WithCancel(context.Background()) + s.stopServerCancel = cancel + + args, clientOpts := newServerAndClientOpts(port) + args = append(args, + "--dynamic-config-value", + "frontend.workerVersioningDataAPIs=true", + "--dynamic-config-value", + "frontend.workerVersioningWorkflowAPIs=true", + ) + + go func() { + if err := s.app.RunContext(ctx, args); err != nil { + fmt.Println("Server closed with error:", err) + } + }() + + s.client = assertServerHealth(ctx, s.T(), clientOpts) +} + +func (s *buildIdCompatSuite) TearDownSuite() { + s.stopServerCancel() +} + +func (s *buildIdCompatSuite) testTqName() string { + return "build-id-tq-" + s.T().Name() +} + +func (s *buildIdCompatSuite) makeArgs(args ...string) []string { + allArgs := []string{""} + allArgs = append(allArgs, args...) + return append(allArgs, + "--address", fmt.Sprintf("localhost:%d", s.port), + "--task-queue", s.testTqName(), "--namespace", "default") +} + +func (s *buildIdCompatSuite) TestAddNewDefaultBuildIdAndGet() { + err := s.app.Run(s.makeArgs( + "task-queue", "update-build-ids", "add-new-default", "--build-id", "foo")) + s.Nil(err) + err = s.app.Run(s.makeArgs("task-queue", "get-build-ids")) + s.Nil(err) +} + +func (s *buildIdCompatSuite) TestAddNewCompatBuildId() { + err := s.app.Run(s.makeArgs( + "task-queue", "update-build-ids", "add-new-default", "--build-id", "foo")) + s.Nil(err) + err = s.app.Run(s.makeArgs( + "task-queue", "update-build-ids", "add-new-compatible", + "--build-id", "bar", "--existing-compatible-build-id", "foo")) + s.Nil(err) +} + +func (s *buildIdCompatSuite) TestPromoteBuildIdSet() { + err := s.app.Run(s.makeArgs( + "task-queue", "update-build-ids", "add-new-default", "--build-id", "foo")) + s.Nil(err) + err = s.app.Run(s.makeArgs( + "task-queue", "update-build-ids", "promote-set", + "--build-id", "foo")) + s.Nil(err) +} + +func (s *buildIdCompatSuite) TestPromoteBuildIdInSet() { + err := s.app.Run(s.makeArgs( + "task-queue", "update-build-ids", "add-new-default", "--build-id", "foo")) + s.Nil(err) + err = s.app.Run(s.makeArgs( + "task-queue", "update-build-ids", "promote-id-in-set", + "--build-id", "foo")) + s.Nil(err) +} + +func (s *buildIdCompatSuite) TestReachability() { + err := s.app.Run(s.makeArgs( + "task-queue", "update-build-ids", "add-new-default", "--build-id", "foo")) + s.Nil(err) + err = s.app.Run(s.makeArgs("task-queue", "get-build-id-reachability", "--build-id", "foo")) + s.Nil(err) + writtenContent := s.writer.GetContent() + println(writtenContent) + s.Contains(writtenContent, "foo") + s.Contains(writtenContent, "[NewWorkflows]") +} diff --git a/common/defs-cmds.go b/common/defs-cmds.go index 4c930c38..13548cfc 100644 --- a/common/defs-cmds.go +++ b/common/defs-cmds.go @@ -34,6 +34,9 @@ const ( // Task Queue subcommand definitions DescribeTaskQueueDefinition = "Provides information for Workers that have recently polled on this Task Queue." ListPartitionTaskQueueDefinition = "Lists the Task Queue's partitions and the matching nodes they are assigned to." + UpdateBuildIDsDefinition = "Operations to update the sets of worker Build ID versions on the Task Queue" + GetBuildIDsDefinition = "Fetch the sets of worker Build ID versions on the Task Queue" + GetBuildIDReachabilityDefinition = "Retrieves information about the reachability of Build IDs on one or more Task Queues" // Batch subcommand definitions DescribeBatchJobDefinition = "Provide information about a Batch operation job." @@ -81,6 +84,16 @@ const ( ScheduleDescribeDefinition = "Get Schedule configuration and current state." ScheduleDeleteDefinition = "Deletes a Schedule." ScheduleListDefinition = "Lists Schedules." + + // Update build id subcommand definitions + AddNewDefaultBuildIDDefinition = "Add a new default (incompatible) build ID to the Task Queue version sets." + AddNewDefaultBuildIDDefinitionUsage = "Creates a new build id set which will become the new overall default for the queue with the provided build id as its only member. This new set is incompatible with all previous sets/versions." + AddNewCompatibleBuildIDDefinition = "Add a new build ID compatible with an existing ID to the Task Queue version sets." + AddNewCompatibleBuildIDDefinitionUsage = "The new build ID will become the default for the set containing the existing ID. See per-flag help for more." + PromoteSetDefinition = "Promote an existing build ID set to become the default for the Task Queue." + PromoteSetDefinitionUsage = "If the set is already the default, this command has no effect." + PromoteIDInSetDefinition = "Promote an existing build ID to become the default for its containing set." + PromoteIDInSetDefinitionUsage = "New tasks compatible with the the set will be dispatched to the default id." ) const BatchUsageText = `Batch commands change multiple [Workflow Executions](/concepts/what-is-a-workflow-execution) @@ -113,7 +126,7 @@ Batch Jobs can be returned for an entire Cluster or a single Namespace. Use the command options below to change the information returned by this command.` -const TerminateBatchUsageText = `The ` + "`" + `temporal batch terminate` + "`" + ` command terminates a Batch job with the provided Job ID. +const TerminateBatchUsageText = `The ` + "`" + `temporal batch terminate` + "`" + ` command terminates a Batch job with the provided Job ID. For future reference, provide a reason for terminating the Batch Job. ` + "`" + `temporal batch terminate --job-id=MyJobId --reason=JobReason` + "`" + ` @@ -334,7 +347,7 @@ Print a single property: tls-key-path /home/my-user/certs/cluster.key` -const EnvSetUsageText = "`" + `temporal env set [environment.property name] [property value]` + "`" + ` +const EnvSetUsageText = "`" + `temporal env set [environment.property name] [property value]` + "`" + ` Property names match CLI option names, for example '--address' and '--tls-cert-path': @@ -458,8 +471,8 @@ The Overlap Policy of the Schedule can be overridden as well. Use the options provided below to change this command's behavior.` -const ScheduleBackfillUsageText = `The ` + "`" + `temporal schedule backfill` + "`" + ` command executes Actions ahead of their specified time range. -Backfilling can fill in [Workflow Runs](/concepts/what-is-a-run-id) from a time period when the Schedule was paused, or from before the Schedule was created. +const ScheduleBackfillUsageText = `The ` + "`" + `temporal schedule backfill` + "`" + ` command executes Actions ahead of their specified time range. +Backfilling can fill in [Workflow Runs](/concepts/what-is-a-run-id) from a time period when the Schedule was paused, or from before the Schedule was created. Schedule backfills require a valid Schedule ID, along with the time in which to run the Schedule and a change to the overlap policy. ` + "`" + `` + "`" + `` + "`" + ` @@ -564,6 +577,12 @@ const StartDevUsageText = `The ` + "`" + `temporal server start-dev` + "`" + ` c The results of any command run on the Server can be viewed at http://localhost:7233. ` +const UpdateBuildIDsDefinitionText = "Provides various commands for adding or changing the sets of compatible build IDs associated with a Task Queue. See the help of each sub-command for more." +const GetBuildIDsDefinitionText = "Fetch the sets of compatible build IDs associated with a Task Queue and associated information." +const GetBuildIDReachabilityDefinitionText = "This command can tell you whether or not Build IDs may be used for for new, existing, or closed workflows. " + + "Both the --build-id and --task-queue flags may be specified multiple times. " + + "If you do not provide a task queue, reachability for the provided Build IDs will be checked against all task queues." + const CustomTemplateHelpCLI = `NAME: {{template "helpNameTemplate" .}}{{if .Description}} diff --git a/common/defs-flags.go b/common/defs-flags.go index 6c6a7f7f..27c91135 100644 --- a/common/defs-flags.go +++ b/common/defs-flags.go @@ -135,4 +135,15 @@ const ( // Task Queue flags FlagTaskQueueName = "Name of the Task Queue." FlagTaskQueueTypeDefinition = "Task Queue type [workflow|activity]" + + // Build id based versioning flags + FlagNewBuildIDUsage = "The new build id to be added." + FlagExistingCompatibleBuildIDUsage = "A build id which must already exist in the version sets known by the task queue. The new id will be stored in the set containing this id, marking it as compatible with the versions within." + FlagSetBuildIDAsDefaultUsage = "When set, establishes the compatible set being targeted as the overall default for the queue. If a different set was the current default, the targeted set will replace it as the new default." + FlagPromoteSetBuildIDUsage = "An existing build id whose containing set will be promoted." + FlagPromoteBuildIDUsage = "An existing build id which will be promoted to be the default inside its containing set." + FlagMaxBuildIDSetsUsage = "Limits how many compatible sets will be returned. Specify 1 to only return the current default major version set. 0 returns all sets." + FlagBuildIDReachabilityUsage = "Which Build ID to get reachability information for. May be specified multiple times." + FlagTaskQueueForReachabilityUsage = "Which Task Queue(s) to constrain the reachability search to. May be specified multiple times." + FlagReachabilityTypeUsage = "Specify how you'd like to filter the reachability of Build IDs. Valid choices are `open` (reachable by one or more open workflows), `closed` (reachable by one or more closed workflows), or `existing` (reachable by either). If a Build ID is reachable by new workflows, that is always reported." ) diff --git a/common/flags.go b/common/flags.go index 430a480e..31579cba 100644 --- a/common/flags.go +++ b/common/flags.go @@ -135,6 +135,11 @@ var ( FlagWorkflowType = "workflow-type" FlagYes = "yes" FlagYesAlias = []string{"y"} + FlagBuildID = "build-id" + FlagExistingCompatibleBuildID = "existing-compatible-build-id" + FlagSetBuildIDAsDefault = "set-as-default" + FlagMaxBuildIDSets = "max-sets" + FlagReachabilityType = "reachability-type" ) var SharedFlags = []cli.Flag{ diff --git a/common/util.go b/common/util.go index d4c91e5b..ac8a913b 100644 --- a/common/util.go +++ b/common/util.go @@ -2,6 +2,7 @@ package common import ( "bufio" + "bytes" "context" "encoding/json" "errors" @@ -553,3 +554,16 @@ func AddBeforeHandler(cmd *cli.Command, h func(*cli.Context) error) { AddBeforeHandler(subcmd, h) } } + +// MemWriter is an io.Writer implementation that stores the written content. +type MemWriter struct { + content bytes.Buffer +} + +func (mw *MemWriter) Write(p []byte) (n int, err error) { + return mw.content.Write(p) +} + +func (mw *MemWriter) GetContent() string { + return mw.content.String() +} diff --git a/common/util_test.go b/common/util_test.go index c3ec5663..b1f60a34 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -1,6 +1,8 @@ package common import ( + "fmt" + "strings" "testing" "time" @@ -239,3 +241,18 @@ func (s *utilSuite) TestGetCliIdentity() { identity := GetCliIdentity() s.Contains(identity, "temporal-cli") } + +func TestMemWriter(t *testing.T) { + mw := &MemWriter{} + _, err := fmt.Fprintln(mw, "This message is written to the MemWriter.") + if err != nil { + t.Fatal(err) + } + + expected := "This message is written to the MemWriter." + content := mw.GetContent() + + if !strings.Contains(content, expected) { + t.Errorf("Expected log content to contain '%s', but it doesn't. Content: '%s'", expected, content) + } +} diff --git a/go.mod b/go.mod index 276a0277..3b86e7d7 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( go.temporal.io/api v1.23.0 go.temporal.io/sdk v1.23.1 go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20230328164709-88a40de39c33 - go.temporal.io/server v1.20.1-0.20230616203625-3392a7ab579a + go.temporal.io/server v1.20.1-0.20230622163242-9405cf84817e go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb google.golang.org/grpc v1.56.1 diff --git a/go.sum b/go.sum index 1e232512..d8ce0c2e 100644 --- a/go.sum +++ b/go.sum @@ -1153,8 +1153,8 @@ go.temporal.io/sdk v1.23.1 h1:HzOaw5+f6QgDW/HH1jzwgupII7nVz+fzxFPjmFJqKiQ= go.temporal.io/sdk v1.23.1/go.mod h1:S7vWxU01lGcCny0sWx03bkkYw4VtVrpzeqBTn2A6y+E= go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20230328164709-88a40de39c33 h1:tpDvC3HKzoPGmYZT7LBkYtBWrbZa8GNiLR2LG5iG5sw= go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20230328164709-88a40de39c33/go.mod h1:CW0zVy7oLeWxBo3wG5bMU2dy4xaprM2net3/DkBzruw= -go.temporal.io/server v1.20.1-0.20230616203625-3392a7ab579a h1:ckFeiF8XdIVde74T9+TNIzLmKuIZDlk0KTKQ4XuG8NA= -go.temporal.io/server v1.20.1-0.20230616203625-3392a7ab579a/go.mod h1:Wtl0Io+CzQ56261Na3UnFcT6oXFsmkdOQ7OddEknnq4= +go.temporal.io/server v1.20.1-0.20230622163242-9405cf84817e h1:lprYw79A30E97M7nR+/IbTc6FGFEwE4M/jjY3zdmaQA= +go.temporal.io/server v1.20.1-0.20230622163242-9405cf84817e/go.mod h1:Wtl0Io+CzQ56261Na3UnFcT6oXFsmkdOQ7OddEknnq4= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= diff --git a/headers/headers.go b/headers/headers.go index 12d67c74..af368c69 100644 --- a/headers/headers.go +++ b/headers/headers.go @@ -15,8 +15,10 @@ const ( SupportedFeaturesHeaderDelim = "," ) +const DEV_VERSION = "0.0.0-DEV" + // Set by GoReleaser using ldflags -var Version = "0.0.0-DEV" +var Version = DEV_VERSION const ( ClientNameCLI = "temporal-cli" @@ -36,7 +38,7 @@ var ( ) func Init() { - if Version == "0.0.0-DEV" { + if Version == DEV_VERSION { if info, ok := debug.ReadBuildInfo(); ok && info.Main.Version != "(devel)" { Version = info.Main.Version } diff --git a/taskqueue/task_queue.go b/taskqueue/task_queue.go index 7b2711f5..8c5d295a 100644 --- a/taskqueue/task_queue.go +++ b/taskqueue/task_queue.go @@ -9,9 +9,9 @@ import ( func NewTaskQueueCommands() []*cli.Command { return []*cli.Command{ { - Name: "describe", - Usage: common.DescribeTaskQueueDefinition, - UsageText:common.DescribeTaskQueueUsageText, + Name: "describe", + Usage: common.DescribeTaskQueueDefinition, + UsageText: common.DescribeTaskQueueUsageText, Flags: append([]cli.Flag{ &cli.StringFlag{ Name: common.FlagTaskQueue, @@ -32,8 +32,8 @@ func NewTaskQueueCommands() []*cli.Command { }, }, { - Name: "list-partition", - Usage: common.ListPartitionTaskQueueDefinition, + Name: "list-partition", + Usage: common.ListPartitionTaskQueueDefinition, UsageText: common.TaskQueueListPartitionUsageText, Flags: []cli.Flag{ &cli.StringFlag{ @@ -55,5 +55,165 @@ func NewTaskQueueCommands() []*cli.Command { return ListTaskQueuePartitions(c) }, }, + { + Name: "update-build-ids", + Usage: common.UpdateBuildIDsDefinition, + UsageText: common.UpdateBuildIDsDefinitionText, + Subcommands: []*cli.Command{ + { + Name: "add-new-default", + Usage: common.AddNewDefaultBuildIDDefinition, + UsageText: common.AddNewDefaultBuildIDDefinitionUsage, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: common.FlagTaskQueue, + Aliases: common.FlagTaskQueueAlias, + Usage: common.FlagTaskQueueName, + Required: true, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagBuildID, + Usage: common.FlagNewBuildIDUsage, + Required: true, + Category: common.CategoryMain, + }, + }, + Action: func(c *cli.Context) error { + return BuildIDAddNewDefault(c) + }, + }, + { + Name: "add-new-compatible", + Usage: common.AddNewCompatibleBuildIDDefinition, + UsageText: common.AddNewCompatibleBuildIDDefinitionUsage, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: common.FlagTaskQueue, + Aliases: common.FlagTaskQueueAlias, + Usage: common.FlagTaskQueueName, + Required: true, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagBuildID, + Usage: common.FlagNewBuildIDUsage, + Required: true, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagExistingCompatibleBuildID, + Usage: common.FlagExistingCompatibleBuildIDUsage, + Required: true, + Category: common.CategoryMain, + }, + &cli.BoolFlag{ + Name: common.FlagSetBuildIDAsDefault, + Usage: common.FlagSetBuildIDAsDefaultUsage, + Category: common.CategoryMain, + }, + }, + Action: func(c *cli.Context) error { + return BuildIDAddNewCompatible(c) + }, + }, + { + Name: "promote-set", + Usage: common.PromoteSetDefinition, + UsageText: common.PromoteSetDefinitionUsage, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: common.FlagTaskQueue, + Aliases: common.FlagTaskQueueAlias, + Usage: common.FlagTaskQueueName, + Required: true, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagBuildID, + Usage: common.FlagPromoteSetBuildIDUsage, + Required: true, + Category: common.CategoryMain, + }, + }, + Action: func(c *cli.Context) error { + return BuildIDPromoteSet(c) + }, + }, + { + Name: "promote-id-in-set", + Usage: common.PromoteIDInSetDefinition, + UsageText: common.PromoteIDInSetDefinitionUsage, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: common.FlagTaskQueue, + Aliases: common.FlagTaskQueueAlias, + Usage: common.FlagTaskQueueName, + Required: true, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagBuildID, + Usage: common.FlagPromoteBuildIDUsage, + Required: true, + Category: common.CategoryMain, + }, + }, + Action: func(c *cli.Context) error { + return BuildIDPromoteInSet(c) + }, + }, + }, + }, + { + Name: "get-build-ids", + Usage: common.GetBuildIDsDefinition, + UsageText: common.GetBuildIDsDefinitionText, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: common.FlagTaskQueue, + Aliases: common.FlagTaskQueueAlias, + Usage: common.FlagTaskQueueName, + Required: true, + Category: common.CategoryMain, + }, + &cli.IntFlag{ + Name: common.FlagMaxBuildIDSets, + Usage: common.FlagMaxBuildIDSetsUsage, + Category: common.CategoryMain, + Value: 0, + }, + }, + Action: func(c *cli.Context) error { + return GetBuildIDs(c) + }, + }, + { + Name: "get-build-id-reachability", + Usage: common.GetBuildIDReachabilityDefinition, + UsageText: common.GetBuildIDReachabilityDefinitionText, + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: common.FlagBuildID, + Usage: common.FlagBuildIDReachabilityUsage, + Category: common.CategoryMain, + }, + &cli.StringSliceFlag{ + Name: common.FlagTaskQueue, + Aliases: common.FlagTaskQueueAlias, + Usage: common.FlagTaskQueueForReachabilityUsage, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagReachabilityType, + Usage: common.FlagReachabilityTypeUsage, + Category: common.CategoryMain, + Value: "existing", + }, + }, + Action: func(c *cli.Context) error { + return GetBuildIDReachability(c) + }, + }, } } diff --git a/taskqueue/task_queue_commands.go b/taskqueue/task_queue_commands.go index da77fb4e..597dd8c3 100644 --- a/taskqueue/task_queue_commands.go +++ b/taskqueue/task_queue_commands.go @@ -91,6 +91,190 @@ func ListTaskQueuePartitions(c *cli.Context) error { return output.PrintItems(c, items, optsA) } +// BuildIDAddNewDefault is implements the `update-build-ids add-new-default` subcommand +func BuildIDAddNewDefault(c *cli.Context) error { + newBuildID := c.String(common.FlagBuildID) + operation := workflowservice.UpdateWorkerBuildIdCompatibilityRequest{ + Operation: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_AddNewBuildIdInNewDefaultSet{ + AddNewBuildIdInNewDefaultSet: newBuildID, + }, + } + return updateBuildIDs(c, operation) +} + +// BuildIDAddNewCompatible implements the `update-build-ids add-new-compatible` subcommand +func BuildIDAddNewCompatible(c *cli.Context) error { + newBuildID := c.String(common.FlagBuildID) + existingBuildID := c.String(common.FlagExistingCompatibleBuildID) + setAsDefault := c.Bool(common.FlagSetBuildIDAsDefault) + operation := workflowservice.UpdateWorkerBuildIdCompatibilityRequest{ + Operation: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_AddNewCompatibleBuildId{ + AddNewCompatibleBuildId: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_AddNewCompatibleVersion{ + NewBuildId: newBuildID, + ExistingCompatibleBuildId: existingBuildID, + MakeSetDefault: setAsDefault, + }, + }, + } + return updateBuildIDs(c, operation) +} + +// BuildIDPromoteSet implements the `update-build-ids promote-set` subcommand +func BuildIDPromoteSet(c *cli.Context) error { + buildID := c.String(common.FlagBuildID) + operation := workflowservice.UpdateWorkerBuildIdCompatibilityRequest{ + Operation: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_PromoteSetByBuildId{ + PromoteSetByBuildId: buildID, + }, + } + return updateBuildIDs(c, operation) +} + +// BuildIDPromoteInSet implements the `update-build-ids promote-id-in-set` subcommand +func BuildIDPromoteInSet(c *cli.Context) error { + buildID := c.String(common.FlagBuildID) + operation := workflowservice.UpdateWorkerBuildIdCompatibilityRequest{ + Operation: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_PromoteBuildIdWithinSet{ + PromoteBuildIdWithinSet: buildID, + }, + } + return updateBuildIDs(c, operation) +} + +// GetBuildIDs is implements the `get-build-ids` subcommand +func GetBuildIDs(c *cli.Context) error { + frontendClient := client.Factory(c.App).FrontendClient(c) + namespace, err := common.RequiredFlag(c, common.FlagNamespace) + if err != nil { + return err + } + taskQueue := c.String(common.FlagTaskQueue) + + ctx, cancel := common.NewContext(c) + defer cancel() + request := &workflowservice.GetWorkerBuildIdCompatibilityRequest{ + Namespace: namespace, + TaskQueue: taskQueue, + } + + resp, err := frontendClient.GetWorkerBuildIdCompatibility(ctx, request) + if err != nil { + return fmt.Errorf("unable to get task queue build ids: %w", err) + } + + type rowtype struct { + VersionSetId string + BuildIds []string + IsDefaultSet bool + DefaultForSet string + } + opts := &output.PrintOptions{ + Fields: []string{"BuildIds", "DefaultForSet", "IsDefaultSet"}, + } + var items []interface{} + for ix, e := range resp.GetMajorVersionSets() { + row := rowtype{ + BuildIds: e.GetBuildIds(), + IsDefaultSet: ix == len(resp.GetMajorVersionSets())-1, + DefaultForSet: e.GetBuildIds()[len(e.GetBuildIds())-1], + } + items = append(items, row) + } + return output.PrintItems(c, items, opts) +} + +// GetBuildIDReachability implements the `get-build-id-reachability` subcommand +func GetBuildIDReachability(c *cli.Context) error { + frontendClient := client.Factory(c.App).FrontendClient(c) + namespace, err := common.RequiredFlag(c, common.FlagNamespace) + if err != nil { + return err + } + buildIDs := c.StringSlice(common.FlagBuildID) + taskQueues := c.StringSlice(common.FlagTaskQueue) + reachabilityType := strings.ToLower(c.String(common.FlagReachabilityType)) + reachability := enumspb.TASK_REACHABILITY_UNSPECIFIED + if reachabilityType != "" { + if reachabilityType == "open" { + reachability = enumspb.TASK_REACHABILITY_OPEN_WORKFLOWS + } else if reachabilityType == "closed" { + reachability = enumspb.TASK_REACHABILITY_CLOSED_WORKFLOWS + } else if reachabilityType == "existing" { + reachability = enumspb.TASK_REACHABILITY_EXISTING_WORKFLOWS + } else { + return fmt.Errorf("invalid reachability type: %v", reachabilityType) + } + } + + ctx, cancel := common.NewContext(c) + defer cancel() + request := &workflowservice.GetWorkerTaskReachabilityRequest{ + Namespace: namespace, + BuildIds: buildIDs, + TaskQueues: taskQueues, + Reachability: reachability, + } + + resp, err := frontendClient.GetWorkerTaskReachability(ctx, request) + if err != nil { + return fmt.Errorf("unable to get Build ID reachability: %w", err) + } + + type rowtype struct { + BuildId string + TaskQueue string + Reachability []string + } + opts := &output.PrintOptions{ + Fields: []string{"BuildId", "TaskQueue", "Reachability"}, + } + var items []interface{} + for _, e := range resp.GetBuildIdReachability() { + for _, r := range e.GetTaskQueueReachability() { + reachability := make([]string, len(r.GetReachability())) + for i, v := range r.GetReachability() { + reachability[i] = v.String() + } + row := rowtype{ + BuildId: e.GetBuildId(), + TaskQueue: r.GetTaskQueue(), + Reachability: reachability, + } + items = append(items, row) + } + } + return output.PrintItems(c, items, opts) +} + +// updateBuildIDs manipulates the build ids of a given taskqueue. `partialReq` is a partial request +// containing only the operation field filled out. +func updateBuildIDs(c *cli.Context, partialReq workflowservice.UpdateWorkerBuildIdCompatibilityRequest) error { + frontendClient := client.Factory(c.App).FrontendClient(c) + namespace, err := common.RequiredFlag(c, common.FlagNamespace) + if err != nil { + return err + } + taskQueue := c.String(common.FlagTaskQueue) + + ctx, cancel := common.NewContext(c) + defer cancel() + + request := &workflowservice.UpdateWorkerBuildIdCompatibilityRequest{ + Namespace: namespace, + TaskQueue: taskQueue, + Operation: partialReq.Operation, + } + + resp, err := frontendClient.UpdateWorkerBuildIdCompatibility(ctx, request) + if err != nil { + return fmt.Errorf("error updating task queue build ids: %w", err) + } + + fmt.Println(color.Green(c, "Successfully updated task queue build ids. Set ID: %v", resp.GetVersionSetId())) + + return nil +} + func strToTaskQueueType(str string) enumspb.TaskQueueType { if strings.ToLower(str) == "activity" { return enumspb.TASK_QUEUE_TYPE_ACTIVITY diff --git a/tests/e2e_test.go b/tests/e2e_test.go index e2086699..62e3bb40 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -1,17 +1,16 @@ package tests import ( - "bytes" "context" - "fmt" "os" "os/exec" "path/filepath" "runtime" - "strings" "sync" "testing" + "github.com/temporalio/cli/common" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/temporalio/cli/app" @@ -70,11 +69,11 @@ func (s *e2eSuite) SetupTest() { func (s *e2eSuite) TearDownTest() { } -func (s *e2eSuite) setUpTestEnvironment() (*testsuite.DevServer, *cli.App, *MemWriter) { +func (s *e2eSuite) setUpTestEnvironment() (*testsuite.DevServer, *cli.App, *common.MemWriter) { server, err := s.createServer() s.Require().NoError(err) - writer := &MemWriter{} + writer := &common.MemWriter{} tcli := s.createApp(server, writer) return server, tcli, writer @@ -101,7 +100,7 @@ func (s *e2eSuite) createServer() (*testsuite.DevServer, error) { return server, err } -func (s *e2eSuite) createApp(server *testsuite.DevServer, writer *MemWriter) *cli.App { +func (s *e2eSuite) createApp(server *testsuite.DevServer, writer *common.MemWriter) *cli.App { tcli := app.BuildApp() tcli.Writer = writer @@ -145,31 +144,3 @@ func (m *clientFactory) SDKClient(c *cli.Context, namespace string) sdkclient.Cl func (m *clientFactory) HealthClient(_ *cli.Context) healthpb.HealthClient { panic("HealthClient mock is not supported") } - -// MemWriter is an io.Writer implementation that stores the written content. -type MemWriter struct { - content bytes.Buffer -} - -func (mw *MemWriter) Write(p []byte) (n int, err error) { - return mw.content.Write(p) -} - -func (mw *MemWriter) GetContent() string { - return mw.content.String() -} - -func TestMemWriter(t *testing.T) { - mw := &MemWriter{} - _, err := fmt.Fprintln(mw, "This message is written to the MemWriter.") - if err != nil { - t.Fatal(err) - } - - expected := "This message is written to the MemWriter." - content := mw.GetContent() - - if !strings.Contains(content, expected) { - t.Errorf("Expected log content to contain '%s', but it doesn't. Content: '%s'", expected, content) - } -} diff --git a/tests/workflow_test.go b/tests/workflow_test.go index e882f571..1d6a6413 100644 --- a/tests/workflow_test.go +++ b/tests/workflow_test.go @@ -68,6 +68,7 @@ func (s *e2eSuite) TestWorkflowShow_ReplayableHistory() { } func (s *e2eSuite) TestWorkflowUpdate() { + s.T().Skip("Skipped because downloaded dev server has old update implementation") s.T().Parallel() testserver, app, writer := s.setUpTestEnvironment() diff --git a/workflow/workflow_commands.go b/workflow/workflow_commands.go index 434fc14b..3228a030 100644 --- a/workflow/workflow_commands.go +++ b/workflow/workflow_commands.go @@ -736,19 +736,21 @@ func convertDescribeWorkflowExecutionResponse(c *cli.Context, resp *workflowserv info := resp.GetWorkflowExecutionInfo() executionInfo := &clispb.WorkflowExecutionInfo{ - Execution: info.GetExecution(), - Type: info.GetType(), - CloseTime: info.GetCloseTime(), - StartTime: info.GetStartTime(), - Status: info.GetStatus(), - HistoryLength: info.GetHistoryLength(), - ParentNamespaceId: info.GetParentNamespaceId(), - ParentExecution: info.GetParentExecution(), - Memo: info.GetMemo(), - SearchAttributes: convertSearchAttributes(c, info.GetSearchAttributes()), - AutoResetPoints: info.GetAutoResetPoints(), - StateTransitionCount: info.GetStateTransitionCount(), - ExecutionTime: info.GetExecutionTime(), + Execution: info.GetExecution(), + Type: info.GetType(), + CloseTime: info.GetCloseTime(), + StartTime: info.GetStartTime(), + Status: info.GetStatus(), + HistoryLength: info.GetHistoryLength(), + ParentNamespaceId: info.GetParentNamespaceId(), + ParentExecution: info.GetParentExecution(), + Memo: info.GetMemo(), + SearchAttributes: convertSearchAttributes(c, info.GetSearchAttributes()), + AutoResetPoints: info.GetAutoResetPoints(), + StateTransitionCount: info.GetStateTransitionCount(), + ExecutionTime: info.GetExecutionTime(), + HistorySizeBytes: info.GetHistorySizeBytes(), + MostRecentWorkerVersionStamp: info.GetMostRecentWorkerVersionStamp(), } var pendingActivitiesStr []*clispb.PendingActivityInfo