Skip to content

Commit

Permalink
Merge 58f37eb into c9be4f9
Browse files Browse the repository at this point in the history
  • Loading branch information
wI2L committed Apr 6, 2020
2 parents c9be4f9 + 58f37eb commit 4476af3
Show file tree
Hide file tree
Showing 20 changed files with 228 additions and 45 deletions.
2 changes: 1 addition & 1 deletion api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestPagination(t *testing.T) {
cnt := 20
var midTask task.Task
for i := 0; i < cnt; i++ {
tsk, err := task.Create(dbp, tmpl, regularUser, nil, nil, map[string]interface{}{"id": strconv.Itoa(i)}, nil)
tsk, err := task.Create(dbp, tmpl, regularUser, nil, nil, map[string]interface{}{"id": strconv.Itoa(i)}, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion api/handler/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type createBatchIn struct {
Inputs []map[string]interface{} `json:"inputs" binding:"required"`
Comment string `json:"comment"`
WatcherUsernames []string `json:"watcher_usernames"`
Tags map[string]string `json:"tags"`
}

// CreateBatch handles the creation of a collection of tasks based on the same template
Expand Down Expand Up @@ -50,7 +51,7 @@ func CreateBatch(c *gin.Context, in *createBatchIn) (*task.Batch, error) {
return nil, err
}

_, err = taskutils.CreateTask(c, dbp, tt, in.WatcherUsernames, []string{}, input, b, in.Comment, nil)
_, err = taskutils.CreateTask(c, dbp, tt, in.WatcherUsernames, []string{}, input, b, in.Comment, nil, in.Tags)
if err != nil {
dbp.Rollback()
return nil, err
Expand Down
24 changes: 20 additions & 4 deletions api/handler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handler

import (
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
Expand All @@ -23,6 +24,7 @@ type createTaskIn struct {
Comment string `json:"comment"`
WatcherUsernames []string `json:"watcher_usernames"`
Delay *string `json:"delay"`
Tags map[string]string `json:"tags"`
}

// CreateTask handles the creation of a new task based on an existing template
Expand All @@ -49,7 +51,7 @@ func CreateTask(c *gin.Context, in *createTaskIn) (*task.Task, error) {
return nil, err
}

t, err := taskutils.CreateTask(c, dbp, tt, in.WatcherUsernames, []string{}, in.Input, nil, in.Comment, in.Delay)
t, err := taskutils.CreateTask(c, dbp, tt, in.WatcherUsernames, []string{}, in.Input, nil, in.Comment, in.Delay, in.Tags)
if err != nil {
dbp.Rollback()
return nil, err
Expand All @@ -70,13 +72,14 @@ const (
)

type listTasksIn struct {
Type string `query:"type, default=own"`
Type string `query:"type,default=own"`
State *string `query:"state"`
BatchPublicID *string `query:"batch"`
PageSize uint64 `query:"page_size"`
Last *string `query:"last"`
After *time.Time `query:"after"`
Before *time.Time `query:"before"`
Tags []string `query:"tag" explode:"true"`
}

// ListTasks returns a list of tasks, which can be filtered by state, batch ID,
Expand All @@ -89,13 +92,24 @@ func ListTasks(c *gin.Context, in *listTasksIn) (t []*task.Task, err error) {
if err != nil {
return nil, err
}

tags := make(map[string]string, len(in.Tags))
for _, t := range in.Tags {
parts := strings.Split(t, "=")
if len(parts) != 2 {
return nil, errors.BadRequestf("invalid tag %s", t)
}
if parts[0] == "" || parts[1] == "" {
return nil, errors.BadRequestf("invalid tag %s", t)
}
tags[parts[0]] = parts[1]
}
filter := task.ListFilter{
PageSize: normalizePageSize(in.PageSize),
Last: in.Last,
State: in.State,
After: in.After,
Before: in.Before,
Tags: tags,
}

var b *task.Batch
Expand Down Expand Up @@ -201,9 +215,10 @@ func GetTask(c *gin.Context, in *getTaskIn) (*task.Task, error) {
}

type updateTaskIn struct {
PublicID string `path:"id, required"`
PublicID string `path:"id,required"`
Input map[string]interface{} `json:"input"`
WatcherUsernames []string `json:"watcher_usernames"`
Tags map[string]string `json:"tags"`
}

// UpdateTask modifies a task, allowing it's requester or an administrator
Expand Down Expand Up @@ -249,6 +264,7 @@ func UpdateTask(c *gin.Context, in *updateTaskIn) (*task.Task, error) {

t.SetInput(clearInput)
t.SetWatcherUsernames(in.WatcherUsernames)
t.SetTags(in.Tags, nil)

if err := t.Update(dbp,
false, // do validate task contents
Expand Down
26 changes: 19 additions & 7 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,15 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, deb
case s := <-stepChan:
s.LastRun = time.Now()

// Replace task's tags with the tags returned in the step.
for k, v := range s.Tags {
if v == "" {
delete(t.Tags, v)
} else {
t.Tags[k] = v
}
}

// "commit" step back into resolution
res.SetStep(s.Name, s)
// consolidate its result into live values
Expand Down Expand Up @@ -370,7 +379,6 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, deb
if s.IsFinal() && !s.IsChild() {
t.StepsDone++
}

// one less step to go
expectedMessages--
// state change might unlock more steps for execution
Expand Down Expand Up @@ -501,11 +509,15 @@ func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) erro
if err != nil {
return err
}
if err := res.Update(dbp); err != nil {
return err
if res != nil {
if err := res.Update(dbp); err != nil {
return err
}
}
if err := t.Update(dbp, false, true); err != nil {
return err
if t != nil {
if err := t.Update(dbp, false, true); err != nil {
return err
}
}
return dbp.Commit()
}
Expand Down Expand Up @@ -546,7 +558,7 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res
}
// rebuild step dependency tree to include generated loop steps
res.BuildStepTree()
commit(dbp, res, t)
commit(dbp, res, nil)
go func() { stepChan <- s }()
} else { // regular step
s.ResultValidate = jsonschema.Validator(s.Name, s.Schema)
Expand All @@ -557,7 +569,7 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res
if s.State != step.StateAfterrunError {
s.State = step.StateRunning
step.PreRun(s, res.Values, resolutionStateSetter(res, preRunModifiedSteps))
commit(dbp, res, t)
commit(dbp, res, nil)
}

// run
Expand Down
2 changes: 1 addition & 1 deletion engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func createResolution(tmplName string, inputs, resolverInputs map[string]interfa
if err != nil {
return nil, err
}
tsk, err := task.Create(dbp, tmpl, "", nil, nil, inputs, nil)
tsk, err := task.Create(dbp, tmpl, "", nil, nil, inputs, nil, nil)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions engine/step/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type Step struct {
Item interface{} `json:"item,omitempty"` // "child" step: item value, issued from foreach

Resources []string `json:"resources"` // resource limits to enforce

Tags map[string]string `json:"tags"`
}

// Executor matches an executor type with its required configuration
Expand Down Expand Up @@ -252,7 +254,6 @@ func Run(st *Step, baseConfig map[string]json.RawMessage, values *values.Values,
}

go func() {

limits := uniqueSortedList(st.Resources)
for _, limit := range limits {
utask.AcquireResource(limit)
Expand All @@ -262,7 +263,7 @@ func Run(st *Step, baseConfig map[string]json.RawMessage, values *values.Values,
case <-stopRunningSteps:
st.State = StateToRetry
default:
st.Output, st.Metadata, err = runner.Exec(st.Name, baseCfgRaw, config, ctx)
st.Output, st.Metadata, st.Tags, err = runner.Exec(st.Name, baseCfgRaw, config, ctx)
if baseOutput != nil {
if st.Output != nil {
marshaled, err := utils.JSONMarshal(st.Output)
Expand Down
2 changes: 1 addition & 1 deletion engine/step/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// Runner represents a component capable of executing a specific action,
// provided a configuration and a context
type Runner interface {
Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, error)
Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, error)
ValidConfig(baseConfig json.RawMessage, config json.RawMessage) error
Context(stepName string) interface{}
MetadataSchema() json.RawMessage
Expand Down
76 changes: 61 additions & 15 deletions models/task/task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package task

import (
"encoding/json"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -62,27 +63,28 @@ type Task struct {

// DBModel is the "strict" representation of a task in DB, as expressed in SQL schema
type DBModel struct {
ID int64 `json:"-" db:"id"`
PublicID string `json:"id" db:"public_id"`
Title string `json:"title" db:"title"`
TemplateID int64 `json:"-" db:"id_template"`
BatchID *int64 `json:"-" db:"id_batch"`
RequesterUsername string `json:"requester_username" db:"requester_username"`
WatcherUsernames []string `json:"watcher_usernames,omitempty" db:"watcher_usernames"`
ResolverUsernames []string `json:"resolver_usernames,omitempty" db:"resolver_usernames"`
Created time.Time `json:"created" db:"created"`
State string `json:"state" db:"state"`
StepsDone int `json:"steps_done" db:"steps_done"`
StepsTotal int `json:"steps_total" db:"steps_total"`
LastActivity time.Time `json:"last_activity" db:"last_activity"`
ID int64 `json:"-" db:"id"`
PublicID string `json:"id" db:"public_id"`
Title string `json:"title" db:"title"`
TemplateID int64 `json:"-" db:"id_template"`
BatchID *int64 `json:"-" db:"id_batch"`
RequesterUsername string `json:"requester_username" db:"requester_username"`
WatcherUsernames []string `json:"watcher_usernames,omitempty" db:"watcher_usernames"`
ResolverUsernames []string `json:"resolver_usernames,omitempty" db:"resolver_usernames"`
Created time.Time `json:"created" db:"created"`
State string `json:"state" db:"state"`
StepsDone int `json:"steps_done" db:"steps_done"`
StepsTotal int `json:"steps_total" db:"steps_total"`
LastActivity time.Time `json:"last_activity" db:"last_activity"`
Tags map[string]string `json:"tags,omitempty" db:"tags"`

CryptKey []byte `json:"-" db:"crypt_key"` // key for encrypting steps (itself encrypted with master key)
EncryptedInput []byte `json:"-" db:"encrypted_input"`
EncryptedResult []byte `json:"-" db:"encrypted_result"` // encrypted Result
}

// Create inserts a new Task in DB
func Create(dbp zesty.DBProvider, tt *tasktemplate.TaskTemplate, reqUsername string, watcherUsernames []string, resolverUsernames []string, input map[string]interface{}, b *Batch) (t *Task, err error) {
func Create(dbp zesty.DBProvider, tt *tasktemplate.TaskTemplate, reqUsername string, watcherUsernames []string, resolverUsernames []string, input map[string]interface{}, tags map[string]string, b *Batch) (t *Task, err error) {
defer errors.DeferredAnnotatef(&err, "Failed to create new Task")

t = &Task{
Expand Down Expand Up @@ -141,6 +143,18 @@ func Create(dbp zesty.DBProvider, tt *tasktemplate.TaskTemplate, reqUsername str
}
t.Title = string(title)

// Merge input tags into template tags.
mergedTags := make(map[string]string)
for k, v := range tt.Tags {
mergedTags[k] = v
}
for k, v := range tags {
mergedTags[k] = v
}
if err := t.SetTags(mergedTags, v); err != nil {
return nil, err
}

err = dbp.DB().Insert(&t.DBModel)
if err != nil {
return nil, pgjuju.Interpret(err)
Expand Down Expand Up @@ -255,6 +269,7 @@ type ListFilter struct {
PageSize uint64
Before *time.Time
After *time.Time
Tags map[string]string
}

// ListTasks returns a list of tasks, optionally filtered on one or several criteria
Expand Down Expand Up @@ -306,6 +321,14 @@ func ListTasks(dbp zesty.DBProvider, filter ListFilter) (t []*Task, err error) {
sel = sel.Where(squirrel.Eq{`"task".id_batch`: filter.Batch.ID})
}

if filter.Tags != nil && len(filter.Tags) > 0 {
b, err := json.Marshal(filter.Tags)
if err != nil {
return nil, err
}
sel = sel.Where(`"task".tags @> ?::jsonb`, string(b))
}

query, params, err := sel.ToSql()
if err != nil {
return nil, err
Expand Down Expand Up @@ -500,6 +523,29 @@ func (t *Task) SetState(s string) {
t.notifyState(nil)
}

func (t *Task) SetTags(tags map[string]string, values *values.Values) error {
t.Tags = tags
if values == nil {
return nil
}
for k, v := range t.Tags {
tempk, err := values.Apply(k, nil, "")
if err != nil {
return fmt.Errorf("failed to template: %s", err.Error())
}
if string(tempk) != k {
t.Tags[string(tempk)] = v
delete(t.Tags, k)
}
tempv, err := values.Apply(v, nil, "")
if err != nil {
return fmt.Errorf("failed to template: %s", err.Error())
}
t.Tags[string(tempk)] = string(tempv)
}
return nil
}

// Valid asserts that the task holds valid data: the state is among accepted states,
// and input is present and valid given the template spec
func (t *Task) Valid(tt *tasktemplate.TaskTemplate) error {
Expand Down Expand Up @@ -534,7 +580,7 @@ func (t *Task) ExportTaskInfos(values *values.Values) {

var (
tSelector = sqlgenerator.PGsql.Select(
`"task".id, "task".public_id, "task".title, "task".id_template, "task".id_batch, "task".requester_username, "task".watcher_usernames, "task".created, "task".state, "task".steps_done, "task".steps_total, "task".crypt_key, "task".encrypted_input, "task".encrypted_result, "task".last_activity, "task".resolver_usernames, "task_template".name as template_name, "resolution".public_id as resolution_public_id, "resolution".last_start as last_start, "resolution".last_stop as last_stop, "resolution".resolver_username as resolver_username, "batch".public_id as batch_public_id`,
`"task".id, "task".public_id, "task".title, "task".id_template, "task".id_batch, "task".requester_username, "task".watcher_usernames, "task".created, "task".state, "task".tags, "task".steps_done, "task".steps_total, "task".crypt_key, "task".encrypted_input, "task".encrypted_result, "task".last_activity, "task".resolver_usernames, "task_template".name as template_name, "resolution".public_id as resolution_public_id, "resolution".last_start as last_start, "resolution".last_stop as last_stop, "resolution".resolver_username as resolver_username, "batch".public_id as batch_public_id`,
).From(
`"task"`,
).Join(
Expand Down
2 changes: 1 addition & 1 deletion models/tasktemplate/fromdir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestLoadFromDir(t *testing.T) {
err = dbp.DB().Insert(&tt)
assert.Nil(t, err, "unable to insert new template")

_, err = task.Create(dbp, &tt, "admin", []string{}, []string{}, map[string]interface{}{}, nil)
_, err = task.Create(dbp, &tt, "admin", []string{}, []string{}, map[string]interface{}{}, nil, nil)
assert.Nil(t, err, "unable to create task")

err = tasktemplate.LoadFromDir(dbp, "templates_tests")
Expand Down

0 comments on commit 4476af3

Please sign in to comment.