Skip to content

Commit

Permalink
refactor: top level task files to task package
Browse files Browse the repository at this point in the history
  • Loading branch information
lesam committed Apr 5, 2021
1 parent 725d472 commit d941092
Show file tree
Hide file tree
Showing 62 changed files with 1,134 additions and 1,096 deletions.
3 changes: 2 additions & 1 deletion authorizer/authorize_find.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package authorizer

import (
"context"
"github.com/influxdata/influxdb/v2/task"

"github.com/influxdata/influxdb/v2/kit/platform/errors"

Expand Down Expand Up @@ -129,7 +130,7 @@ func AuthorizeFindSources(ctx context.Context, rs []*influxdb.Source) ([]*influx
}

// AuthorizeFindTasks takes the given items and returns only the ones that the user is authorized to read.
func AuthorizeFindTasks(ctx context.Context, rs []*influxdb.Task) ([]*influxdb.Task, int, error) {
func AuthorizeFindTasks(ctx context.Context, rs []*task.Task) ([]*task.Task, int, error) {
// This filters without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
rrs := rs[:0]
Expand Down
3 changes: 2 additions & 1 deletion authorizer/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package authorizer

import (
"context"
"github.com/influxdata/influxdb/v2/task"

"github.com/influxdata/influxdb/v2/kit/platform"

Expand All @@ -16,7 +17,7 @@ type CheckService struct {
s influxdb.CheckService
influxdb.UserResourceMappingService
influxdb.OrganizationService
influxdb.TaskService
task.TaskService
}

// NewCheckService constructs an instance of an authorizing check service.
Expand Down
29 changes: 15 additions & 14 deletions authorizer/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package authorizer
import (
"context"
"fmt"
"github.com/influxdata/influxdb/v2/task"

"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
Expand Down Expand Up @@ -35,13 +36,13 @@ var (
)

type taskServiceValidator struct {
influxdb.TaskService
task.TaskService
log *zap.Logger
}

// TaskService wraps ts and checks appropriate permissions before calling requested methods on ts.
// Authorization failures are logged to the logger.
func NewTaskService(log *zap.Logger, ts influxdb.TaskService) influxdb.TaskService {
func NewTaskService(log *zap.Logger, ts task.TaskService) task.TaskService {
return &taskServiceValidator{
TaskService: ts,
log: log,
Expand All @@ -61,7 +62,7 @@ func (ts *taskServiceValidator) processPermissionError(a influxdb.Authorizer, p
return err
}

func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*influxdb.Task, error) {
func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*task.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -79,7 +80,7 @@ func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID
return task, nil
}

func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter task.TaskFilter) ([]*task.Task, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
// Get the tasks in the organization, without authentication.
Expand All @@ -90,12 +91,12 @@ func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter influxdb.T
return AuthorizeFindTasks(ctx, unauthenticatedTasks)
}

func (ts *taskServiceValidator) CreateTask(ctx context.Context, t influxdb.TaskCreate) (*influxdb.Task, error) {
func (ts *taskServiceValidator) CreateTask(ctx context.Context, t task.TaskCreate) (*task.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if !t.OwnerID.Valid() {
return nil, influxdb.ErrInvalidOwnerID
return nil, task.ErrInvalidOwnerID
}

a, p, err := AuthorizeCreate(ctx, influxdb.TasksResourceType, t.OrganizationID)
Expand All @@ -106,7 +107,7 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t influxdb.TaskC
return ts.TaskService.CreateTask(ctx, t)
}

func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID, upd task.TaskUpdate) (*task.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand Down Expand Up @@ -142,7 +143,7 @@ func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID)
return ts.TaskService.DeleteTask(ctx, id)
}

func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) {
func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter task.LogFilter) ([]*task.Log, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -155,7 +156,7 @@ func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter influxdb.Lo
return ts.TaskService.FindLogs(ctx, filter)
}

func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter task.RunFilter) ([]*task.Run, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -174,7 +175,7 @@ func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter influxdb.Ru
return ts.TaskService.FindRuns(ctx, filter)
}

func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*influxdb.Run, error) {
func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*task.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand Down Expand Up @@ -210,7 +211,7 @@ func (ts *taskServiceValidator) CancelRun(ctx context.Context, taskID, runID pla
return ts.TaskService.CancelRun(ctx, taskID, runID)
}

func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*influxdb.Run, error) {
func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*task.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -220,7 +221,7 @@ func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID plat
return nil, err
}

if task.Status != string(influxdb.TaskActive) {
if task.Status != string(task.TaskActive) {
return nil, ErrInactiveTask
}

Expand All @@ -232,7 +233,7 @@ func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID plat
return ts.TaskService.RetryRun(ctx, taskID, runID)
}

func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*influxdb.Run, error) {
func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*task.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -242,7 +243,7 @@ func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID
return nil, err
}

if task.Status != string(influxdb.TaskActive) {
if task.Status != string(task.TaskActive) {
return nil, ErrInactiveTask
}

Expand Down
Loading

0 comments on commit d941092

Please sign in to comment.