Skip to content

Commit

Permalink
Merge pull request #601 from ystia/bugfix/GH-592-inline-workflow
Browse files Browse the repository at this point in the history
Failure running workflow with inline activity
  • Loading branch information
laurentganne committed Feb 12, 2020
2 parents 4fc9a29 + 5e230af commit 6894311
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

* Tosca public_ip_address attribute is wrongly set with private address for hosts pool computes ([GH-593](https://github.com/ystia/yorc/issues/593))
* REQ_TARGET keyword on TOSCA doesn't work with requirement type ([GH-598](https://github.com/ystia/yorc/issues/598))
* Failure running workflow with inline activity ([GH-592](https://github.com/ystia/yorc/issues/592))

## 4.0.0-M8 (January 24, 2020)

Expand Down
4 changes: 2 additions & 2 deletions tasks/workflow/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func TestRunConsulWorkflowPackageTests(t *testing.T) {
t.Run("testRunStep", func(t *testing.T) {
testRunStep(t, srv, client)
})
t.Run("testRegisterInlineWorkflow", func(t *testing.T) {
testRegisterInlineWorkflow(t, srv, client)
t.Run("testInlineWorkflow", func(t *testing.T) {
testInlineWorkflow(t, srv, client)
})
t.Run("testDeleteExecutionTreeSamePrefix", func(t *testing.T) {
testDeleteExecutionTreeSamePrefix(t, client)
Expand Down
35 changes: 30 additions & 5 deletions tasks/workflow/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ystia/yorc/v4/prov/scheduling"
"github.com/ystia/yorc/v4/registry"
"github.com/ystia/yorc/v4/tasks"
"github.com/ystia/yorc/v4/tasks/collector"
"github.com/ystia/yorc/v4/tasks/workflow/builder"
"github.com/ystia/yorc/v4/tosca"
)
Expand Down Expand Up @@ -371,22 +372,46 @@ func (s *step) runActivity(wfCtx context.Context, cfg config.Configuration, depl
}
case builder.ActivityTypeInline:
// Register inline workflow associated to the original task
return s.registerInlineWorkflow(wfCtx, activity.Value())
return s.registerInlineWorkflow(wfCtx, deploymentID, activity.Value())
}
return nil
}

func (s *step) registerInlineWorkflow(ctx context.Context, workflowName string) error {
func (s *step) registerInlineWorkflow(ctx context.Context, deploymentID, workflowName string) error {
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelINFO, s.t.targetID).RegisterAsString(fmt.Sprintf("Register workflow %q from taskID:%q, deploymentID:%q", workflowName, s.t.taskID, s.t.targetID))
wfOps, err := builder.BuildInitExecutionOperations(ctx, s.t.targetID, s.t.taskID, workflowName, true)

// Preparing a new task with its own data referencing the parent workflow step
parentData, err := tasks.GetAllTaskData(s.t.taskID)
if err != nil {
return err
}
err = tasks.StoreOperations(s.t.taskID, wfOps)
data := make(map[string]string)
for k, v := range parentData {
data[k] = v
}
data[taskDataParentWorkflowName] = s.WorkflowName
data[taskDataParentStepName] = s.Name
data[taskDataParentTaskID] = s.t.taskID
data[taskDataDeploymentID] = deploymentID
data[taskDataWorkflowName] = workflowName

taskID, err := collector.NewCollector(s.cc).RegisterTaskWithData(deploymentID, tasks.TaskTypeCustomWorkflow, data)
if err != nil {
err = errors.Wrapf(err, "Failed to register workflow init operations with workflow:%q, targetID:%q, taskID:%q", workflowName, s.t.targetID, s.t.taskID)
err = errors.Wrapf(err, "Failed to register inline workflow %s in parent workflow %s step %s, targetID %s, taskID %s",
workflowName, s.WorkflowName, s.Name, s.t.targetID, s.t.taskID)
_ = s.setStatus(tasks.TaskStepStatusERROR)
return err
}

log.Debugf("Registered task %s for inline workflow %s in parent workflow %s step %s",
taskID, workflowName, s.WorkflowName, s.Name)
_ = s.setStatus(tasks.TaskStepStatusRUNNING)

// Marking this step as asynchronous as it should not be considered as
// done by the caller
s.Async = true
// No final function as here the workflow is not done
s.t.finalFunction = nil
return err
}

Expand Down
27 changes: 27 additions & 0 deletions tasks/workflow/testdata/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ node_types:
implementation:
type: ystia.yorc.tests.artifacts.Implementation.Custom
file: whatever
custom:
operation1:
implementation:
type: ystia.yorc.tests.artifacts.Implementation.Custom
file: whatever
operation2:
implementation:
type: ystia.yorc.tests.artifacts.Implementation.Custom
file: whatever

topology_template:
node_templates:
Expand Down Expand Up @@ -53,6 +62,24 @@ topology_template:
network_name: PRIVATE
initiator: source
workflows:
custom_wf1:
steps:
step_wf1:
target: WFNode
activities:
- call_operation: custom.operation1
custom_wf2:
steps:
first_step_wf2:
target: WFNode
activities:
- inline: custom_wf1
on_success:
- second_step_wf2
second_step_wf2:
target: WFNode
activities:
- call_operation: custom.operation1
install:
steps:
Compute_install:
Expand Down
34 changes: 18 additions & 16 deletions tasks/workflow/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func (w *worker) runUndeploy(ctx context.Context, t *taskExecution) error {
}
return nil
}
bypassErrors, err := w.checkByPassErrors(t, "uninstall")
bypassErrors, err := checkByPassErrors(t, "uninstall")
if err != nil {
return err
}
Expand Down Expand Up @@ -736,38 +736,40 @@ func (w *worker) runCustomWorkflow(ctx context.Context, t *taskExecution, wfName
if wfName == "" {
return errors.New("workflow name missing")
}
bypassErrors, err := w.checkByPassErrors(t, wfName)
bypassErrors, err := checkByPassErrors(t, wfName)
if err != nil {
return err
}
t.finalFunction = func() error {
_, err := updateTaskStatusAccordingToWorkflowStatus(ctx, t.targetID, t.taskID, wfName)
taskStatus, err := updateTaskStatusAccordingToWorkflowStatus(ctx, t.targetID, t.taskID, wfName)
if err != nil {
return err
}

// Check if this workflow was launched as an inline workflow by a parent workflow
parentWorkflow, err := getParentWorkflow(ctx, t, wfName)
if err != nil {
return err
}

if parentWorkflow != "" {
err = updateParentWorkflowStepAndRegisterNextSteps(ctx, t, parentWorkflow, taskStatus)
}

return err
}

return w.runWorkflowStep(ctx, t, wfName, bypassErrors)
}

func (w *worker) checkByPassErrors(t *taskExecution, wfName string) (bool, error) {
continueOnError, err := tasks.GetTaskData(t.taskID, "continueOnError")
if err != nil {
return false, err
}
bypassErrors, err := strconv.ParseBool(continueOnError)
if err != nil {
return false, errors.Wrapf(err, "failed to parse \"continueOnError\" flag for workflow:%q", wfName)
}
return bypassErrors, nil
}

// bool return indicates if the workflow is done
func (w *worker) runWorkflowStep(ctx context.Context, t *taskExecution, workflowName string, continueOnError bool) error {
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelINFO, t.targetID).RegisterAsString(fmt.Sprintf("Start processing workflow step %s:%s", workflowName, t.step))
wfSteps, err := builder.BuildWorkFlow(ctx, t.targetID, workflowName)
if err != nil {
return errors.Wrapf(err, "Failed to build step:%q for workflow:%q", t.step, workflowName)
}
if wfSteps == nil || len(wfSteps) == 0 {
if len(wfSteps) == 0 {
// Nothing to do
return nil
}
Expand Down
120 changes: 119 additions & 1 deletion tasks/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
"context"
"fmt"
"path"
"strconv"

"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"github.com/satori/go.uuid"
uuid "github.com/satori/go.uuid"

"github.com/ystia/yorc/v4/events"
"github.com/ystia/yorc/v4/helper/consulutil"
Expand All @@ -30,6 +31,19 @@ import (
"github.com/ystia/yorc/v4/tasks/workflow/builder"
)

const (
// Task data providing the parent worflow name for an inline workflow
taskDataParentWorkflowName = "parentWorkflowName"
// Task data providing the parent step name for an inline workflow
taskDataParentStepName = "parentStepName"
// Task data providing the parent task ID
taskDataParentTaskID = "parentTaskID"
// Task data providing the deployment ID
taskDataDeploymentID = "deploymentID"
// Task data providing the current workflow name
taskDataWorkflowName = "workflowName"
)

// createWorkflowStepsOperations returns Consul transactional KV operations for initiating workflow execution
func createWorkflowStepsOperations(taskID string, steps []*step) api.KVTxnOps {
ops := make(api.KVTxnOps, 0)
Expand Down Expand Up @@ -107,3 +121,107 @@ func updateTaskStatusAccordingToWorkflowStatus(ctx context.Context, deploymentID
}
return status, errors.Wrapf(checkAndSetTaskStatus(ctx, deploymentID, taskID, status), "Failed to update task status to %q with TaskID: %q", status, taskID)
}

// Get the parent workflow of an inline workflow, returns an empty string if there
// is no parent workflow
func getParentWorkflow(ctx context.Context, t *taskExecution, wfName string) (string, error) {
parentWorkflow, err := tasks.GetTaskData(t.taskID, taskDataParentWorkflowName)
if err != nil && !tasks.IsTaskDataNotFoundError(err) {
return parentWorkflow, err
}

return parentWorkflow, nil
}

// Update a parent workflow step for which an inline workflow just finished with
// the status in argument.
// Register next steps depending on the status of the workflow execution
func updateParentWorkflowStepAndRegisterNextSteps(ctx context.Context, t *taskExecution, wfName string, taskStatus tasks.TaskStatus) error {

// Get the parent step to update
parentStepName, err := tasks.GetTaskData(t.taskID, taskDataParentStepName)
if err != nil && !tasks.IsTaskDataNotFoundError(err) {
return err
}

if parentStepName == "" {
return errors.Errorf("Found no inline step name in task %s data for parent workflow %s", t.taskID, wfName)
}

// Get the parent task ID
parentTaskID, err := tasks.GetTaskData(t.taskID, taskDataParentTaskID)
if err != nil && !tasks.IsTaskDataNotFoundError(err) {
return err
}

if parentTaskID == "" {
return errors.Errorf("Found no parent task ID in task %s data for parent workflow %s", t.taskID, wfName)
}

stepStatus := tasks.TaskStepStatusDONE
if taskStatus == tasks.TaskStatusFAILED {
stepStatus = tasks.TaskStepStatusERROR
} else if taskStatus == tasks.TaskStatusCANCELED {
stepStatus = tasks.TaskStepStatusCANCELED
}
err = tasks.UpdateTaskStepWithStatus(parentTaskID, parentStepName, stepStatus)
if err != nil {
return err
}

if stepStatus == tasks.TaskStepStatusCANCELED {
return err
}

if stepStatus == tasks.TaskStepStatusERROR {
// Check the option continue on error
continueOnError, err := checkByPassErrors(t, wfName)
if err != nil || !continueOnError {
return err
}
}

// Add next steps
err = registerParentStepNextSteps(ctx, t, parentTaskID, wfName, parentStepName)
return err

}

func registerParentStepNextSteps(ctx context.Context, t *taskExecution, parentTaskID, wfName, stepName string) error {

// Get the deployment ID
deploymentID, err := tasks.GetTaskData(t.taskID, taskDataDeploymentID)
if err != nil {
return err
}

wfSteps, err := builder.BuildWorkFlow(ctx, deploymentID, wfName)
if err != nil {
return errors.Wrapf(err, "Failed to build steps for workflow:%q", wfName)
}
if len(wfSteps) == 0 {
// Nothing to do
return nil
}

bs, ok := wfSteps[stepName]
if !ok {
return errors.Errorf("Failed to build step: %q for workflow: %q, unknown step", stepName, wfName)
}
t.taskID = parentTaskID
s := wrapBuilderStep(bs, t.cc, t)
return s.registerNextSteps(ctx, wfName)

}

func checkByPassErrors(t *taskExecution, wfName string) (bool, error) {
continueOnError, err := tasks.GetTaskData(t.taskID, "continueOnError")
if err != nil {
return false, err
}
bypassErrors, err := strconv.ParseBool(continueOnError)
if err != nil {
return false, errors.Wrapf(err, "failed to parse \"continueOnError\" flag for workflow:%q", wfName)
}
return bypassErrors, nil
}

0 comments on commit 6894311

Please sign in to comment.