diff --git a/temporalcli/commands.activity.go b/temporalcli/commands.activity.go new file mode 100644 index 000000000..6750d0b8d --- /dev/null +++ b/temporalcli/commands.activity.go @@ -0,0 +1,72 @@ +package temporalcli + +import ( + "fmt" + + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" + "go.temporal.io/api/workflowservice/v1" +) + +func (c *TemporalActivityCompleteCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + metadata := map[string][]byte{"encoding": []byte("json/plain")} + resultPayloads, err := CreatePayloads([][]byte{[]byte(c.Result)}, metadata, false) + if err != nil { + return err + } + + _, err = cl.WorkflowService().RespondActivityTaskCompletedById(cctx, &workflowservice.RespondActivityTaskCompletedByIdRequest{ + Namespace: c.Parent.Namespace, + WorkflowId: c.WorkflowId, + RunId: c.RunId, + ActivityId: c.ActivityId, + Result: resultPayloads, + Identity: c.Identity, + }) + if err != nil { + return fmt.Errorf("unable to complete Activity: %w", err) + } + return nil +} + +func (c *TemporalActivityFailCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + var detailPayloads *common.Payloads + if len(c.Detail) > 0 { + metadata := map[string][]byte{"encoding": []byte("json/plain")} + detailPayloads, err = CreatePayloads([][]byte{[]byte(c.Detail)}, metadata, false) + if err != nil { + return err + } + } + _, err = cl.WorkflowService().RespondActivityTaskFailedById(cctx, &workflowservice.RespondActivityTaskFailedByIdRequest{ + Namespace: c.Parent.Namespace, + WorkflowId: c.WorkflowId, + RunId: c.RunId, + ActivityId: c.ActivityId, + Failure: &failure.Failure{ + Message: c.Reason, + Source: "CLI", + FailureInfo: &failure.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failure.ApplicationFailureInfo{ + NonRetryable: true, + Details: detailPayloads, + }}, + }, + Identity: c.Identity, + }) + if err != nil { + return fmt.Errorf("unable to fail Activity: %w", err) + } + return nil +} diff --git a/temporalcli/commands.activity_test.go b/temporalcli/commands.activity_test.go new file mode 100644 index 000000000..d747092a5 --- /dev/null +++ b/temporalcli/commands.activity_test.go @@ -0,0 +1,150 @@ +package temporalcli_test + +import ( + "context" + "time" + + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" + "go.temporal.io/sdk/client" +) + +func (s *SharedServerSuite) TestActivity_Complete() { + run := s.waitActivityStarted() + wid := run.GetID() + aid := "dev-activity-id" + identity := "MyIdentity" + res := s.Execute( + "activity", "complete", + "--activity-id", aid, + "--workflow-id", wid, + "--result", "\"complete-activity-result\"", + "--identity", identity, + "--address", s.Address(), + ) + s.NoError(res.Err) + var actual string + s.NoError(run.Get(s.Context, &actual)) + s.Equal("complete-activity-result", actual) + + started, completed, failed := s.getActivityEvents(wid, aid) + s.NotNil(started) + s.Nil(failed) + s.NotNil(completed) + s.Equal("\"complete-activity-result\"", string(completed.Result.Payloads[0].GetData())) + s.Equal(identity, completed.GetIdentity()) +} + +func (s *SharedServerSuite) TestActivity_Fail() { + run := s.waitActivityStarted() + wid := run.GetID() + aid := "dev-activity-id" + detail := "{\"myKey\": \"myValue\"}" + reason := "MyReason" + identity := "MyIdentity" + res := s.Execute( + "activity", "fail", + "--activity-id", aid, + "--workflow-id", wid, + "--run-id", run.GetRunID(), + "--detail", detail, + "--reason", reason, + "--identity", identity, + "--address", s.Address(), + ) + s.NoError(res.Err) + err := run.Get(s.Context, nil) + s.NotNil(err) + + started, completed, failed := s.getActivityEvents(wid, aid) + s.NotNil(started) + s.Nil(completed) + s.NotNil(failed) + s.Equal( + detail, + string(failed.GetFailure().GetApplicationFailureInfo().GetDetails().Payloads[0].GetData()), + ) + s.Equal(reason, failed.GetFailure().Message) + s.Equal(identity, failed.GetIdentity()) +} + +func (s *SharedServerSuite) TestActivity_Complete_InvalidResult() { + run := s.waitActivityStarted() + wid := run.GetID() + aid := "dev-activity-id" + res := s.Execute( + "activity", "complete", + "--activity-id", aid, + "--workflow-id", wid, + "--result", "{not json}", + "--address", s.Address(), + ) + s.ErrorContains(res.Err, "is not valid JSON") + + started, completed, failed := s.getActivityEvents(wid, aid) + s.Nil(started) + s.Nil(completed) + s.Nil(failed) +} + +func (s *SharedServerSuite) TestActivity_Fail_InvalidDetail() { + run := s.waitActivityStarted() + wid := run.GetID() + aid := "dev-activity-id" + res := s.Execute( + "activity", "fail", + "--activity-id", aid, + "--workflow-id", wid, + "--detail", "{not json}", + "--address", s.Address(), + ) + s.ErrorContains(res.Err, "is not valid JSON") + + started, completed, failed := s.getActivityEvents(wid, aid) + s.Nil(started) + s.Nil(completed) + s.Nil(failed) +} + +// Test helpers + +func (s *SharedServerSuite) waitActivityStarted() client.WorkflowRun { + s.Worker.OnDevActivity(func(ctx context.Context, a any) (any, error) { + time.Sleep(0xFFFF * time.Hour) + return nil, nil + }) + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue}, + DevWorkflow, + "ignored", + ) + s.NoError(err) + s.Eventually(func() bool { + resp, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), run.GetRunID()) + s.NoError(err) + return len(resp.PendingActivities) > 0 + }, 5*time.Second, 100*time.Millisecond) + return run +} + +func (s *SharedServerSuite) getActivityEvents(workflowID, activityID string) ( + started *history.ActivityTaskStartedEventAttributes, + completed *history.ActivityTaskCompletedEventAttributes, + failed *history.ActivityTaskFailedEventAttributes, +) { + iter := s.Client.GetWorkflowHistory(s.Context, workflowID, "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + for iter.HasNext() { + event, err := iter.Next() + s.NoError(err) + if attrs := event.GetActivityTaskStartedEventAttributes(); attrs != nil { + started = attrs + } else if attrs := event.GetActivityTaskCompletedEventAttributes(); attrs != nil { + completed = attrs + s.Equal("json/plain", string(completed.Result.Payloads[0].Metadata["encoding"])) + } else if attrs := event.GetActivityTaskFailedEventAttributes(); attrs != nil { + failed = attrs + } + } + return started, completed, failed +} diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 347ce01d8..424833681 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -34,6 +34,7 @@ func NewTemporalCommand(cctx *CommandContext) *TemporalCommand { s.Command.Short = "Temporal command-line interface and development server." s.Command.Long = "" s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalActivityCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalEnvCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalServerCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalTaskQueueCommand(cctx, &s).Command) @@ -56,6 +57,96 @@ func NewTemporalCommand(cctx *CommandContext) *TemporalCommand { return &s } +type TemporalActivityCommand struct { + Parent *TemporalCommand + Command cobra.Command + ClientOptions +} + +func NewTemporalActivityCommand(cctx *CommandContext, parent *TemporalCommand) *TemporalActivityCommand { + var s TemporalActivityCommand + s.Parent = parent + s.Command.Use = "activity" + s.Command.Short = "Complete or fail an Activity." + s.Command.Long = "" + s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalActivityCompleteCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityFailCommand(cctx, &s).Command) + s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags()) + return &s +} + +type TemporalActivityCompleteCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + WorkflowReferenceOptions + ActivityId string + Identity string + Result string +} + +func NewTemporalActivityCompleteCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityCompleteCommand { + var s TemporalActivityCompleteCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "complete [flags]" + s.Command.Short = "Complete an Activity." + if hasHighlighting { + s.Command.Long = "Complete an Activity.\n\n\x1b[1mtemporal activity complete --activity-id=MyActivityId --workflow-id=MyWorkflowId --result='{\"MyResultKey\": \"MyResultVal\"}'\x1b[0m" + } else { + s.Command.Long = "Complete an Activity.\n\n`temporal activity complete --activity-id=MyActivityId --workflow-id=MyWorkflowId --result='{\"MyResultKey\": \"MyResultVal\"}'`" + } + s.Command.Args = cobra.NoArgs + s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().StringVar(&s.ActivityId, "activity-id", "", "The Activity to be completed.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id") + s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of user submitting this request.") + s.Command.Flags().StringVar(&s.Result, "result", "", "The result with which to complete the Activity (JSON).") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "result") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalActivityFailCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + WorkflowReferenceOptions + ActivityId string + Detail string + Identity string + Reason string +} + +func NewTemporalActivityFailCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityFailCommand { + var s TemporalActivityFailCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "fail [flags]" + s.Command.Short = "Fail an Activity." + if hasHighlighting { + s.Command.Long = "Fail an Activity.\n\n\x1b[1mtemporal activity fail --activity-id=MyActivityId --workflow-id=MyWorkflowId\x1b[0m" + } else { + s.Command.Long = "Fail an Activity.\n\n`temporal activity fail --activity-id=MyActivityId --workflow-id=MyWorkflowId`" + } + s.Command.Args = cobra.NoArgs + s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().StringVar(&s.ActivityId, "activity-id", "", "The Activity to be failed.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id") + s.Command.Flags().StringVar(&s.Detail, "detail", "", "JSON data describing reason for failing the Activity.") + s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of user submitting this request.") + s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for failing the Activity.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalEnvCommand struct { Parent *TemporalCommand Command cobra.Command diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index a13f26fb0..844bc10ce 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -2,7 +2,6 @@ package temporalcli import ( "context" - "encoding/base64" "encoding/json" "fmt" "os" @@ -312,24 +311,7 @@ func (p *PayloadInputOptions) buildRawInputPayloads() (*common.Payloads, error) } metadata[metaPieces[0]] = []byte(metaPieces[1]) } - - // Create payloads - ret := &common.Payloads{Payloads: make([]*common.Payload, len(inData))} - for i, in := range inData { - // First, if it's JSON, validate that it is accurate - if strings.HasPrefix(string(metadata["encoding"]), "json/") && !json.Valid(in) { - return nil, fmt.Errorf("input #%v is not valid JSON", i+1) - } - // Decode base64 if base64'd (std encoding only for now) - if p.InputBase64 { - var err error - if in, err = base64.StdEncoding.DecodeString(string(in)); err != nil { - return nil, fmt.Errorf("input #%v is not valid base64", i+1) - } - } - ret.Payloads[i] = &common.Payload{Data: in, Metadata: metadata} - } - return ret, nil + return CreatePayloads(inData, metadata, p.InputBase64) } // Rules: diff --git a/temporalcli/commands_test.go b/temporalcli/commands_test.go index 64cd39f34..848e2f494 100644 --- a/temporalcli/commands_test.go +++ b/temporalcli/commands_test.go @@ -434,7 +434,10 @@ func (d *devOperations) DevWorkflow(ctx workflow.Context, input any) (any, error if callback != nil { return callback(ctx, input) } - ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}) + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + ActivityID: "dev-activity-id", + }) var res any err := workflow.ExecuteActivity(ctx, DevActivity, input).Get(ctx, &res) return res, err diff --git a/temporalcli/commandsmd/commands.md b/temporalcli/commandsmd/commands.md index 7367b86c0..7e96b85bc 100644 --- a/temporalcli/commandsmd/commands.md +++ b/temporalcli/commandsmd/commands.md @@ -57,6 +57,43 @@ This document has a specific structure used by a parser. Here are the rules: * `--color` (string-enum) - Set coloring. Options: always, never, auto. Default: auto. * `--no-json-shorthand-payloads` (bool) - Always all payloads as raw payloads even if they are JSON. +### temporal activity: Complete or fail an Activity. + +#### Options + +Includes options set for [client](#options-set-for-client). + + +### temporal activity complete: Complete an Activity. + +Complete an Activity. + +`temporal activity complete --activity-id=MyActivityId --workflow-id=MyWorkflowId --result='{"MyResultKey": "MyResultVal"}'` + +#### Options + +* `--activity-id` (string) - The Activity to be completed. Required. +* `--identity` (string) - Identity of user submitting this request. +* `--result` (string) - The result with which to complete the Activity (JSON). Required. + +Includes options set for [workflow reference](#options-set-for-workflow-reference). + +### temporal activity fail: Fail an Activity. + +Fail an Activity. + +`temporal activity fail --activity-id=MyActivityId --workflow-id=MyWorkflowId` + +#### Options + +* `--activity-id` (string) - The Activity to be failed. Required. +* `--detail` (string) - JSON data describing reason for failing the Activity. +* `--identity` (string) - Identity of user submitting this request. +* `--reason` (string) - Reason for failing the Activity. + +Includes options set for [workflow reference](#options-set-for-workflow-reference). + + ### temporal env: Manage environments. Use the '--env ' option with other commands to point the CLI at a different Temporal Server instance. If --env diff --git a/temporalcli/payload.go b/temporalcli/payload.go new file mode 100644 index 000000000..cb5eb1eda --- /dev/null +++ b/temporalcli/payload.go @@ -0,0 +1,29 @@ +package temporalcli + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "strings" + + "go.temporal.io/api/common/v1" +) + +func CreatePayloads(data [][]byte, metadata map[string][]byte, isBase64 bool) (*common.Payloads, error) { + ret := &common.Payloads{Payloads: make([]*common.Payload, len(data))} + for i, in := range data { + // If it's JSON, validate it + if strings.HasPrefix(string(metadata["encoding"]), "json/") && !json.Valid(in) { + return nil, fmt.Errorf("input #%v is not valid JSON", i+1) + } + // Decode base64 if base64'd (std encoding only for now) + if isBase64 { + var err error + if in, err = base64.StdEncoding.DecodeString(string(in)); err != nil { + return nil, fmt.Errorf("input #%v is not valid base64", i+1) + } + } + ret.Payloads[i] = &common.Payload{Data: in, Metadata: metadata} + } + return ret, nil +}