Skip to content

Commit

Permalink
refactor: refactor for tasks backport (#21117)
Browse files Browse the repository at this point in the history
* chore: remove dead code

* refactor: move FluxLanguageService interface to fluxlang

* chore: run fmt

* refactor: move task.go from top level to task/taskmodel

* chore: run formatter

* chore: fix up import ordering with gci
  • Loading branch information
lesam committed Apr 7, 2021
1 parent 07c030a commit 7b2e122
Show file tree
Hide file tree
Showing 74 changed files with 1,267 additions and 1,358 deletions.
6 changes: 3 additions & 3 deletions authorizer/authorize_find.go
Expand Up @@ -3,9 +3,9 @@ package authorizer
import (
"context"

"github.com/influxdata/influxdb/v2/kit/platform/errors"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/task/taskmodel"
)

// AuthorizeFindDBRPs takes the given items and returns only the ones that the user is authorized to access.
Expand Down Expand Up @@ -129,7 +129,7 @@ func AuthorizeFindSources(ctx context.Context, rs []*influxdb.Source) ([]*influx
}

// AuthorizeFindTasks takes the given items and returns only the ones that the user is authorized to read.
func AuthorizeFindTasks(ctx context.Context, rs []*influxdb.Task) ([]*influxdb.Task, int, error) {
func AuthorizeFindTasks(ctx context.Context, rs []*taskmodel.Task) ([]*taskmodel.Task, int, error) {
// This filters without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
rrs := rs[:0]
Expand Down
6 changes: 3 additions & 3 deletions authorizer/check.go
Expand Up @@ -3,9 +3,9 @@ package authorizer
import (
"context"

"github.com/influxdata/influxdb/v2/kit/platform"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/task/taskmodel"
)

var _ influxdb.CheckService = (*CheckService)(nil)
Expand All @@ -16,7 +16,7 @@ type CheckService struct {
s influxdb.CheckService
influxdb.UserResourceMappingService
influxdb.OrganizationService
influxdb.TaskService
taskmodel.TaskService
}

// NewCheckService constructs an instance of an authorizing check service.
Expand Down
32 changes: 16 additions & 16 deletions authorizer/task.go
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"fmt"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/task/taskmodel"
"go.uber.org/zap"
)

Expand All @@ -35,13 +35,13 @@ var (
)

type taskServiceValidator struct {
influxdb.TaskService
taskmodel.TaskService
log *zap.Logger
}

// TaskService wraps ts and checks appropriate permissions before calling requested methods on ts.
// Authorization failures are logged to the logger.
func NewTaskService(log *zap.Logger, ts influxdb.TaskService) influxdb.TaskService {
func NewTaskService(log *zap.Logger, ts taskmodel.TaskService) taskmodel.TaskService {
return &taskServiceValidator{
TaskService: ts,
log: log,
Expand All @@ -61,7 +61,7 @@ func (ts *taskServiceValidator) processPermissionError(a influxdb.Authorizer, p
return err
}

func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*influxdb.Task, error) {
func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*taskmodel.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -79,7 +79,7 @@ func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID
return task, nil
}

func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter taskmodel.TaskFilter) ([]*taskmodel.Task, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
// Get the tasks in the organization, without authentication.
Expand All @@ -90,12 +90,12 @@ func (ts *taskServiceValidator) FindTasks(ctx context.Context, filter influxdb.T
return AuthorizeFindTasks(ctx, unauthenticatedTasks)
}

func (ts *taskServiceValidator) CreateTask(ctx context.Context, t influxdb.TaskCreate) (*influxdb.Task, error) {
func (ts *taskServiceValidator) CreateTask(ctx context.Context, t taskmodel.TaskCreate) (*taskmodel.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if !t.OwnerID.Valid() {
return nil, influxdb.ErrInvalidOwnerID
return nil, taskmodel.ErrInvalidOwnerID
}

a, p, err := AuthorizeCreate(ctx, influxdb.TasksResourceType, t.OrganizationID)
Expand All @@ -106,7 +106,7 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t influxdb.TaskC
return ts.TaskService.CreateTask(ctx, t)
}

func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
func (ts *taskServiceValidator) UpdateTask(ctx context.Context, id platform.ID, upd taskmodel.TaskUpdate) (*taskmodel.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand Down Expand Up @@ -142,7 +142,7 @@ func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID)
return ts.TaskService.DeleteTask(ctx, id)
}

func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) {
func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter taskmodel.LogFilter) ([]*taskmodel.Log, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -155,7 +155,7 @@ func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter influxdb.Lo
return ts.TaskService.FindLogs(ctx, filter)
}

func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter taskmodel.RunFilter) ([]*taskmodel.Run, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -174,7 +174,7 @@ func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter influxdb.Ru
return ts.TaskService.FindRuns(ctx, filter)
}

func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*influxdb.Run, error) {
func (ts *taskServiceValidator) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand Down Expand Up @@ -210,7 +210,7 @@ func (ts *taskServiceValidator) CancelRun(ctx context.Context, taskID, runID pla
return ts.TaskService.CancelRun(ctx, taskID, runID)
}

func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*influxdb.Run, error) {
func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -220,7 +220,7 @@ func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID plat
return nil, err
}

if task.Status != string(influxdb.TaskActive) {
if task.Status != string(taskmodel.TaskActive) {
return nil, ErrInactiveTask
}

Expand All @@ -232,7 +232,7 @@ func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID plat
return ts.TaskService.RetryRun(ctx, taskID, runID)
}

func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*influxdb.Run, error) {
func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*taskmodel.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -242,7 +242,7 @@ func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID platform.ID
return nil, err
}

if task.Status != string(influxdb.TaskActive) {
if task.Status != string(taskmodel.TaskActive) {
return nil, ErrInactiveTask
}

Expand Down

0 comments on commit 7b2e122

Please sign in to comment.