Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task): Add task types #14567

Merged
merged 1 commit into from Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,7 @@

### Features
1. [14495](https://github.com/influxdata/influxdb/pull/14495): optional gzip compression of the query CSV response.
1. [14567](https://github.com/influxdata/influxdb/pull/14567): Add task types.

### UI Improvements

Expand Down
4 changes: 4 additions & 0 deletions authorizer/task.go
Expand Up @@ -122,6 +122,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC
return nil, influxdb.ErrMissingToken
}

if t.Type == influxdb.TaskTypeWildcard {
return nil, influxdb.ErrInvalidTaskType
}

p, err := platform.NewPermission(platform.WriteAction, platform.TasksResourceType, t.OrganizationID)
if err != nil {
return nil, err
Expand Down
21 changes: 20 additions & 1 deletion authorizer/task_test.go
Expand Up @@ -216,6 +216,25 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
auth: &influxdb.Authorization{},
},
{
name: "create bad type",
check: func(ctx context.Context, svc influxdb.TaskService) error {
_, err := svc.CreateTask(ctx, influxdb.TaskCreate{
OrganizationID: r.Org.ID,
Token: r.Auth.Token,
Type: influxdb.TaskTypeWildcard,
Flux: `option task = {
name: "my_task",
every: 1s,
}
from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
})
if err != influxdb.ErrInvalidTaskType {
return errors.New("failed to error with invalid task type")
}
return nil
},
auth: &influxdb.Authorization{},
}, {
name: "create success",
auth: r.Auth,
check: func(ctx context.Context, svc influxdb.TaskService) error {
Expand All @@ -232,7 +251,7 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
},
},
{
name: "create badbucket",
name: "create bad bucket",
auth: r.Auth,
check: func(ctx context.Context, svc influxdb.TaskService) error {
var (
Expand Down
6 changes: 6 additions & 0 deletions http/swagger.yml
Expand Up @@ -6556,6 +6556,9 @@ components:
id:
readOnly: true
type: string
type:
description: The type of task, this can be used for filtering tasks on list actions.
type: string
lyondhill marked this conversation as resolved.
Show resolved Hide resolved
orgID:
description: The ID of the organization that owns this Task.
type: string
Expand Down Expand Up @@ -8816,6 +8819,9 @@ components:
TaskCreateRequest:
type: object
properties:
type:
description: The type of task, this can be used for filtering tasks on list actions.
type: string
orgID:
description: The ID of the organization that owns this Task.
type: string
Expand Down
8 changes: 8 additions & 0 deletions http/task_service.go
Expand Up @@ -362,6 +362,10 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request, orgs platform.O
req.filter.Limit = platform.TaskDefaultPageSize
}

if ttype := qp.Get("type"); ttype != "" {
req.filter.Type = &ttype
}

return req, nil
}

Expand Down Expand Up @@ -1313,6 +1317,10 @@ func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter)
val.Add("limit", strconv.Itoa(filter.Limit))
}

if filter.Type != nil {
val.Add("type", *filter.Type)
}

u.RawQuery = val.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
Expand Down
7 changes: 6 additions & 1 deletion http/task_test.go
Expand Up @@ -67,7 +67,12 @@ func TestTaskService(t *testing.T) {
Token: auth.Token,
}

cFunc := func() (servicetest.TestCreds, error) {
cFunc := func(t *testing.T) (servicetest.TestCreds, error) {
org := &platform.Organization{Name: t.Name() + "_org"}
if err := service.CreateOrganization(ctx, org); err != nil {
t.Fatal(err)
}

return servicetest.TestCreds{
OrgID: org.ID,
Org: org.Name,
Expand Down
29 changes: 26 additions & 3 deletions kv/task.go
Expand Up @@ -227,6 +227,14 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
continue
}

if filter.Type == nil {
ft := ""
filter.Type = &ft
}
if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != task.Type {
continue
}

ts = append(ts, task)

if len(ts) >= filter.Limit {
Expand Down Expand Up @@ -296,10 +304,16 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
return nil, 0, err
}

// insert the new task into the list
if t != nil {
ts = append(ts, t)

typ := ""
if filter.Type != nil {
typ = *filter.Type
}

// if the filter type matches task type or filter type is a wildcard
if typ == t.Type || typ == influxdb.TaskTypeWildcard {
ts = append(ts, t)
}
}
}
}
Expand Down Expand Up @@ -334,6 +348,14 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
break
}

if filter.Type == nil {
ft := ""
filter.Type = &ft
}
if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != t.Type {
continue
}

// insert the new task into the list
ts = append(ts, t)

Expand Down Expand Up @@ -491,6 +513,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
createdAt := time.Now().UTC().Format(time.RFC3339)
task := &influxdb.Task{
ID: s.IDGenerator.ID(),
Type: tc.Type,
OrganizationID: org.ID,
Organization: org.Name,
AuthorizationID: auth.Identifier(),
Expand Down
5 changes: 5 additions & 0 deletions task.go
Expand Up @@ -20,11 +20,14 @@ const (

TaskStatusActive = "active"
TaskStatusInactive = "inactive"

TaskTypeWildcard = "*"
)

// Task is a task. 🎊
type Task struct {
ID ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID ID `json:"orgID"`
Organization string `json:"org"`
AuthorizationID ID `json:"authorizationID"`
Expand Down Expand Up @@ -134,6 +137,7 @@ type TaskService interface {

// TaskCreate is the set of values to create a task.
type TaskCreate struct {
Type string `json:"type,omitempty"`
Flux string `json:"flux"`
Description string `json:"description,omitempty"`
Status string `json:"status,omitempty"`
Expand Down Expand Up @@ -374,6 +378,7 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error {

// TaskFilter represents a set of filters that restrict the returned results
type TaskFilter struct {
Type *string
After *ID
OrganizationID *ID
Organization string
Expand Down
93 changes: 91 additions & 2 deletions task/servicetest/servicetest.go
Expand Up @@ -83,6 +83,12 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory, testCategory ...s
t.Parallel()
testManualRun(t, sys)
})

t.Run("Task Type", func(t *testing.T) {
t.Parallel()
testTaskType(t, sys)
})

})
case "analytical":
t.Run("AnalyticalTaskService", func(t *testing.T) {
Expand Down Expand Up @@ -149,7 +155,7 @@ type System struct {
// However, if the system needs to verify credentials,
// the caller should set this value and return valid IDs and a valid token.
// It is safe if this returns the same values every time it is called.
CredsFunc func() (TestCreds, error)
CredsFunc func(*testing.T) (TestCreds, error)
}

func testTaskCRUD(t *testing.T, sys *System) {
Expand Down Expand Up @@ -1405,7 +1411,7 @@ func creds(t *testing.T, s *System) TestCreds {
}
}

c, err := s.CredsFunc()
c, err := s.CredsFunc(t)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1437,3 +1443,86 @@ option task = {
from(bucket: "b")
|> http.to(url: "http://example.com")`
)

func testTaskType(t *testing.T, sys *System) {
cr := creds(t, sys)
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())

// Create a tasks
ts := influxdb.TaskCreate{
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}

tsk, err := sys.TaskService.CreateTask(authorizedCtx, ts)
if err != nil {
t.Fatal(err)
}
if !tsk.ID.Valid() {
t.Fatal("no task ID set")
}

tc := influxdb.TaskCreate{
Type: "cows",
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}

tskCow, err := sys.TaskService.CreateTask(authorizedCtx, tc)
if err != nil {
t.Fatal(err)
}
if !tskCow.ID.Valid() {
t.Fatal("no task ID set")
}

tp := influxdb.TaskCreate{
Type: "pigs",
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}

tskPig, err := sys.TaskService.CreateTask(authorizedCtx, tp)
if err != nil {
t.Fatal(err)
}
if !tskPig.ID.Valid() {
t.Fatal("no task ID set")
}

// get default tasks
tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID})
if err != nil {
t.Fatal(err)
}

for _, task := range tasks {
if task.Type != "" {
t.Fatal("recieved a task with a type when sending no type restriction")
}
}

// get filtered tasks
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &tc.Type})
if err != nil {
t.Fatal(err)
}

if len(tasks) != 1 {
t.Fatalf("failed to return tasks by type, expected 1, got %d", len(tasks))
}

// get all tasks
wc := influxdb.TaskTypeWildcard
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &wc})
if err != nil {
t.Fatal(err)
}

if len(tasks) != 3 {
t.Fatalf("failed to return tasks with wildcard, expected 3, got %d", len(tasks))
}
}
5 changes: 5 additions & 0 deletions task_errors.go
Expand Up @@ -36,6 +36,11 @@ var (
Msg: "invalid id",
}

// ErrInvalidTaskType error object for bad id's
ErrInvalidTaskType = &Error{
Code: EInvalid,
Msg: "invalid task type",
}
// ErrTaskNotFound indicates no task could be found for given parameters.
ErrTaskNotFound = &Error{
Code: ENotFound,
Expand Down