Skip to content

Commit

Permalink
subtask: auto-resume parent task when the subtask finished running
Browse files Browse the repository at this point in the history
In case we have a template running some subtasks, we should wake up the
parent task after one of the subtasks has reached completion

Signed-off-by: Romain Beuque <rbeuque74@gmail.com>
  • Loading branch information
rbeuque74 committed Jul 14, 2020
1 parent 345d017 commit 70cbec9
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 3 deletions.
5 changes: 5 additions & 0 deletions api/handler/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
"github.com/ovh/utask/pkg/taskutils"
"github.com/ovh/utask/pkg/utils"
)

type createBatchIn struct {
Expand All @@ -34,6 +35,10 @@ func CreateBatch(c *gin.Context, in *createBatchIn) (*task.Batch, error) {
return nil, err
}

if err := utils.ValidateTags(in.Tags); err != nil {
return nil, err
}

if err := dbp.Tx(); err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions api/handler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ovh/utask/models/tasktemplate"
"github.com/ovh/utask/pkg/auth"
"github.com/ovh/utask/pkg/taskutils"
"github.com/ovh/utask/pkg/utils"
)

type createTaskIn struct {
Expand Down Expand Up @@ -51,6 +52,10 @@ func CreateTask(c *gin.Context, in *createTaskIn) (*task.Task, error) {
return nil, err
}

if err := utils.ValidateTags(in.Tags); err != nil {
return nil, err
}

t, err := taskutils.CreateTask(c, dbp, tt, in.WatcherUsernames, []string{}, in.Input, nil, in.Comment, in.Delay, in.Tags)
if err != nil {
dbp.Rollback()
Expand Down Expand Up @@ -264,6 +269,10 @@ func UpdateTask(c *gin.Context, in *updateTaskIn) (*task.Task, error) {

t.SetInput(clearInput)
t.SetWatcherUsernames(in.WatcherUsernames)

if err := utils.ValidateTags(in.Tags); err != nil {
return nil, err
}
t.SetTags(in.Tags, nil)

if err := t.Update(dbp,
Expand Down
37 changes: 37 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ovh/utask/models/runnerinstance"
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
"github.com/ovh/utask/pkg/constants"
"github.com/ovh/utask/pkg/jsonschema"
"github.com/ovh/utask/pkg/now"
"github.com/ovh/utask/pkg/utils"
Expand Down Expand Up @@ -530,6 +531,42 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm
}

utask.ReleaseExecutionSlot()
if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil {
debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err)
}
}

func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphore.Weighted, debugLogger *logrus.Entry) error {
switch currentTask.State {
case task.StateDone, task.StateWontfix, task.StateCancelled:
default:
return nil
}
if currentTask.Tags == nil {
return nil
}
parentTaskID, ok := currentTask.Tags[constants.SubtaskTagParentTaskID]
if !ok {
return nil
}

parentTask, err := task.LoadFromPublicID(dbp, parentTaskID)
if err != nil {
return err
}
switch parentTask.State {
case task.StateBlocked, task.StateRunning:
default:
// not allowed to resume a parent task that is not either Running or Blocked.
// Todo state should not be runned as it might need manual resolution from a granted resolver
return nil
}
if parentTask.Resolution == nil {
return nil
}

debugLogger.Debugf("resuming parent task %q resolution %q", parentTask.PublicID, *parentTask.Resolution)
return GetEngine().Resolve(*parentTask.Resolution, sm)
}

func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) error {
Expand Down
73 changes: 72 additions & 1 deletion engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/ovh/utask/db/pgjuju"
"github.com/ovh/utask/engine"
"github.com/ovh/utask/engine/functions"
"github.com/ovh/utask/engine/functions/runner"
functionrunner "github.com/ovh/utask/engine/functions/runner"
"github.com/ovh/utask/engine/step"
"github.com/ovh/utask/engine/values"
"github.com/ovh/utask/models/resolution"
Expand All @@ -33,6 +33,7 @@ import (
"github.com/ovh/utask/pkg/now"
"github.com/ovh/utask/pkg/plugins/builtin/echo"
"github.com/ovh/utask/pkg/plugins/builtin/script"
pluginsubtask "github.com/ovh/utask/pkg/plugins/builtin/subtask"
)

const (
Expand Down Expand Up @@ -67,6 +68,7 @@ func TestMain(m *testing.M) {

step.RegisterRunner(echo.Plugin.PluginName(), echo.Plugin)
step.RegisterRunner(script.Plugin.PluginName(), script.Plugin)
step.RegisterRunner(pluginsubtask.Plugin.PluginName(), pluginsubtask.Plugin)

os.Exit(m.Run())
}
Expand Down Expand Up @@ -859,3 +861,72 @@ func TestBaseBaseConfiguration(t *testing.T) {

assert.NotEqual(t, res.Steps["stepOne"].Error, res.Steps["stepTwo"].Error)
}

func TestResolveSubTask(t *testing.T) {
dbp, err := zesty.NewDBProvider(utask.DBName)
require.Nil(t, err)

tt, err := templateFromYAML(dbp, "variables.yaml")
require.Nil(t, err)
tt.Normalize()
assert.Equal(t, "variableeval", tt.Name)
require.Nil(t, tt.Valid())

err = dbp.DB().Insert(tt)
require.Nil(t, err)

res, err := createResolution("subtask.yaml", map[string]interface{}{}, nil)
require.Nil(t, err, "failed to create resolution: %s", err)

res, err = runResolution(res)
require.Nil(t, err)
require.NotNil(t, res)
assert.Equal(t, resolution.StateError, res.State)

subtaskCreationOutput := res.Steps["subtaskCreation"].Output.(map[string]interface{})
subtaskPublicID := subtaskCreationOutput["id"].(string)

subtask, err := task.LoadFromPublicID(dbp, subtaskPublicID)
require.Nil(t, err)
assert.Equal(t, task.StateTODO, subtask.State)

subtaskResolution, err := resolution.Create(dbp, subtask, nil, "", false, nil)
require.Nil(t, err)

subtaskResolution, err = runResolution(subtaskResolution)
require.Nil(t, err)
assert.Equal(t, task.StateDone, subtaskResolution.State)
for k, v := range subtaskResolution.Steps {
assert.Equal(t, step.StateDone, v.State, "not valid state for step %s", k)
}

// checking if the parent task is picked up after that the subtask is resolved.
// need to sleep a bit because the parent task is resumed asynchronously
ti := time.Second
i := time.Duration(0)
for i < ti {
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
if res.State != resolution.StateError {
break
}

time.Sleep(time.Millisecond * 10)
i += time.Millisecond * 10
}

ti = time.Second
i = time.Duration(0)
for i < ti {
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
if res.State != resolution.StateRunning {
break
}

time.Sleep(time.Millisecond * 10)
i += time.Millisecond * 10

}
assert.Equal(t, resolution.StateDone, res.State)
}
19 changes: 19 additions & 0 deletions engine/templates_tests/subtask.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: subtaskTemplate
description: Template that spawns a subtask
title_format: "[test] subtask template test"
steps:
subtaskCreation:
description: creating a subtask
action:
type: subtask
configuration:
template: variableeval
echoOK:
description: everything is OK
action:
type: echo
configuration:
output:
foo: OK
result_format:
foo: "{{.step.echoOK.output.foo}}"
4 changes: 4 additions & 0 deletions models/tasktemplate/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ func (tt *TaskTemplate) Valid() (err error) {
}
}

if err := utils.ValidateTags(tt.Tags); err != nil {
return err
}

if tt.LongDescription != nil {
if err := utils.ValidText("template long description", *tt.LongDescription); err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package constants

const (
// SubtaskTagParentTaskID is the tag key that utask will use to lookup
// if a completed task has a parent task, and that parent task should be
// resumed.
SubtaskTagParentTaskID = "_utask_parent_task_id"
)
17 changes: 16 additions & 1 deletion pkg/plugins/builtin/subtask/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
"github.com/ovh/utask/pkg/auth"
"github.com/ovh/utask/pkg/constants"
"github.com/ovh/utask/pkg/plugins/taskplugin"
"github.com/ovh/utask/pkg/taskutils"
"github.com/ovh/utask/pkg/templateimport"
Expand Down Expand Up @@ -39,12 +40,14 @@ type SubtaskConfig struct {

// SubtaskContext is the metadata inherited from the "parent" task"
type SubtaskContext struct {
ParentTaskID string `json:"parent_task_id"`
TaskID string `json:"task_id"`
RequesterUsername string `json:"requester_username"`
}

func ctx(stepName string) interface{} {
return &SubtaskContext{
ParentTaskID: "{{ .task.task_id }}",
TaskID: fmt.Sprintf("{{ if (index .step `%s` ) }}{{ if (index .step `%s` `output`) }}{{ index .step `%s` `output` `id` }}{{ end }}{{ end }}", stepName, stepName, stepName),
RequesterUsername: "{{.task.requester_username}}",
}
Expand All @@ -53,16 +56,24 @@ func ctx(stepName string) interface{} {
func validConfig(config interface{}) error {
cfg := config.(*SubtaskConfig)

if err := utils.ValidateTags(cfg.Tags); err != nil {
return err
}

dbp, err := zesty.NewDBProvider(utask.DBName)
if err != nil {
return fmt.Errorf("can't retrieve connexion to DB: %s", err)
}

_, err = tasktemplate.LoadFromName(dbp, cfg.Template)
if err != nil && !errors.IsNotFound(err) {
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("can't load template from name: %s", err)
}

// searching into currently imported templates
templates := templateimport.GetTemplates()
for _, template := range templates {
if template == cfg.Template {
Expand Down Expand Up @@ -117,6 +128,10 @@ func exec(stepName string, config interface{}, ctx interface{}) (interface{}, in

// TODO inherit watchers from parent task
ctx := auth.WithIdentity(context.Background(), stepContext.RequesterUsername)
if cfg.Tags == nil {
cfg.Tags = map[string]string{}
}
cfg.Tags[constants.SubtaskTagParentTaskID] = stepContext.ParentTaskID
t, err = taskutils.CreateTask(ctx, dbp, tt, watcherUsernames, resolverUsernames, cfg.Input, nil, "Auto created subtask", cfg.Delay, cfg.Tags)
if err != nil {
dbp.Rollback()
Expand Down
9 changes: 8 additions & 1 deletion pkg/plugins/builtin/tag/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugintag

import (
"github.com/ovh/utask/pkg/plugins/taskplugin"
"github.com/ovh/utask/pkg/utils"
)

// The tag plugin allow to update the tags of a task.
Expand All @@ -17,7 +18,13 @@ type Config struct {
Tags map[string]string `json:"tags"`
}

func validConfig(_ interface{}) error {
func validConfig(config interface{}) error {
cfg := config.(*Config)

if err := utils.ValidateTags(cfg.Tags); err != nil {
return err
}

return nil
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/ovh/utask"
"github.com/ovh/utask/pkg/constants"

"github.com/juju/errors"
)
Expand Down Expand Up @@ -39,6 +40,18 @@ func NormalizeName(s string) string {
return strings.ToLower(strings.TrimSpace(s))
}

func ValidateTags(tags map[string]string) error {
if tags == nil {
return nil
}
for k := range tags {
if k == constants.SubtaskTagParentTaskID {
return errors.BadRequestf("tag name %q not allowed", k)
}
}
return nil
}

// ListContainsString asserts that a string slice contains a given string
func ListContainsString(list []string, item string) bool {
if list != nil {
Expand Down

0 comments on commit 70cbec9

Please sign in to comment.