Skip to content

Commit

Permalink
feat(tasks): added functionality to filter runns by time
Browse files Browse the repository at this point in the history
  • Loading branch information
Nav-aggarwal09 committed Nov 10, 2020
1 parent 6ef0e03 commit 64a562d
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 9 deletions.
7 changes: 7 additions & 0 deletions flags.yml
Expand Up @@ -119,3 +119,10 @@
key: enforceOrgDashboardLimits
default: false
contact: Compute Team

- name: Time Filter Flags
description: Filter task run list based on before and after flags
key: timeFilterFlags
contact: Compute Team
default: false
expose: true
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 22 additions & 2 deletions kv/task.go
Expand Up @@ -951,6 +951,21 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, influxdb.ErrOutOfBoundsLimit
}
parsedFilterAfterTime := time.Time{}
parsedFilterBeforeTime := time.Now().UTC()
var err error
if filter.AfterTime != "" {
parsedFilterAfterTime, err = time.Parse(time.RFC3339, filter.AfterTime)
if err != nil {
return nil, 0, err
}
}
if filter.BeforeTime != "" {
parsedFilterBeforeTime, err = time.Parse(time.RFC3339, filter.BeforeTime)
if err != nil {
return nil, 0, err
}
}

var runs []*influxdb.Run
// manual runs
Expand All @@ -959,7 +974,9 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter
return nil, 0, err
}
for _, run := range manualRuns {
runs = append(runs, run)
if run.ScheduledFor.After(parsedFilterAfterTime) && run.ScheduledFor.Before(parsedFilterBeforeTime) {
runs = append(runs, run)
}
if len(runs) >= filter.Limit {
return runs, len(runs), nil
}
Expand All @@ -971,13 +988,16 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter
return nil, 0, err
}
for _, run := range currentlyRunning {
runs = append(runs, run)
if run.ScheduledFor.After(parsedFilterAfterTime) && run.ScheduledFor.Before(parsedFilterBeforeTime) {
runs = append(runs, run)
}
if len(runs) >= filter.Limit {
return runs, len(runs), nil
}
}

return runs, len(runs), nil

}

// FindRunByID returns a single run.
Expand Down
31 changes: 30 additions & 1 deletion task/backend/analytical_storage.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/storage"
"go.uber.org/zap"
Expand Down Expand Up @@ -152,18 +153,45 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
filterPart = fmt.Sprintf(`|> filter(fn: (r) => r.runID > %q)`, filter.After.String())
}

parsedAfterTime := time.Time{}
parsedBeforeTime := time.Now()
constructedTimeFilter := ""
if feature.TimeFilterFlags().Enabled(ctx) {
if filter.AfterTime != "" {
tmpParsedAfter, err := time.Parse(time.RFC3339, filter.AfterTime)
if err != nil {
return nil, 0, err
}

parsedAfterTime = tmpParsedAfter

}
if filter.BeforeTime != "" {
tmpParsedBefore, err := time.Parse(time.RFC3339, filter.BeforeTime)
if err != nil {
return nil, 0, err
}
parsedBeforeTime = tmpParsedBefore
}
constructedTimeFilter = fmt.Sprintf(
`|> filter(fn: (r) =>time(v: r["scheduledFor"]) > %s and time(v: r["scheduledFor"]) < %s)`,
parsedAfterTime.Format(time.RFC3339),
parsedBeforeTime.Format(time.RFC3339))
}

// the data will be stored for 7 days in the system bucket so pulling 14d's is sufficient.
runsScript := fmt.Sprintf(`from(bucketID: %q)
|> range(start: -14d)
|> filter(fn: (r) => r._field != "status")
|> filter(fn: (r) => r._measurement == "runs" and r.taskID == %q)
%s
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
%s
|> group(columns: ["taskID"])
|> sort(columns:["scheduledFor"], desc: true)
|> limit(n:%d)
`, sb.ID.String(), filter.Task.String(), filterPart, filter.Limit-len(runs))
`, sb.ID.String(), filter.Task.String(), filterPart, constructedTimeFilter, filter.Limit-len(runs))

// At this point we are behind authorization
// so we are faking a read only permission to the org's system bucket
Expand Down Expand Up @@ -205,6 +233,7 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
runs = as.combineRuns(runs, re.runs)

return runs, len(runs), err

}

// remove any kv runs that exist in the list of completed runs
Expand Down
3 changes: 2 additions & 1 deletion task/backend/analytical_storage_test.go
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorization"
icontext "github.com/influxdata/influxdb/v2/context"
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query"
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
"github.com/influxdata/influxdb/v2/query/control"
"github.com/influxdata/influxdb/v2/query/fluxlang"
stdlib "github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
Expand Down Expand Up @@ -78,6 +78,7 @@ func TestAnalyticalStore(t *testing.T) {
UserResourceMappingService: ts.UserResourceMappingService,
AuthorizationService: authSvc,
Ctx: authCtx,
CallFinishRun: true,
}, func() {
cancelFunc()
ab.Close(t)
Expand Down
89 changes: 84 additions & 5 deletions task/servicetest/servicetest.go
Expand Up @@ -19,8 +19,12 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/feature"
influxdbmock "github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/task/backend"
"github.com/influxdata/influxdb/v2/task/options"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// BackendComponentFactory is supplied by consumers of the adaptertest package,
Expand Down Expand Up @@ -175,6 +179,9 @@ type System struct {
// the caller should set this value and return valid IDs and a valid token.
// It is safe if this returns the same values every time it is called.
CredsFunc func(*testing.T) (TestCreds, error)

// Toggles behavior between KV and archive storage because FinishRun() deletes runs after completion
CallFinishRun bool
}

func testTaskCRUD(t *testing.T, sys *System) {
Expand Down Expand Up @@ -902,7 +909,7 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
}

requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make two runs.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make two runs.

rc0, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
if err != nil {
Expand Down Expand Up @@ -994,6 +1001,78 @@ func testTaskRuns(t *testing.T, sys *System) {
}
})

t.Run("FindRunsByTime", func(t *testing.T) {

t.Parallel()
ctx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
ctx, err := feature.Annotate(ctx, influxdbmock.NewFlagger(map[feature.Flag]interface{}{
feature.TimeFilterFlags(): true,
}))
require.NoError(t, err)

// Script is set to run every minute. The platform adapter is currently hardcoded to schedule after "now",
// which makes timing of runs somewhat difficult.
ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
OwnerID: cr.UserID,
}
task, err := sys.TaskService.CreateTask(ctx, ct)
if err != nil {
t.Fatal(err)
}

// set to one hour before now because of bucket retention policy
scheduledFor := time.Now().Add(time.Hour * -1).UTC()
runs := make([]*influxdb.Run, 0, 5)
// create runs to put into Context
for i := 5; i > 0; i-- {
run, err := sys.TaskControlService.CreateRun(ctx, task.ID, scheduledFor.Add(time.Second*time.Duration(i)), scheduledFor.Add(time.Second*time.Duration(i)))
if err != nil {
t.Fatal(err)
}
err = sys.TaskControlService.UpdateRunState(ctx, task.ID, run.ID, scheduledFor.Add(time.Second*time.Duration(i+1)), influxdb.RunStarted)
if err != nil {
t.Fatal(err)
}
err = sys.TaskControlService.UpdateRunState(ctx, task.ID, run.ID, scheduledFor.Add(time.Second*time.Duration(i+2)), influxdb.RunSuccess)
if err != nil {
t.Fatal(err)
}
// setting run in memory to match the fields in Context
run.StartedAt = scheduledFor.Add(time.Second * time.Duration(i+1))
run.FinishedAt = scheduledFor.Add(time.Second * time.Duration(i+2))
run.RunAt = scheduledFor.Add(time.Second * time.Duration(i))
run.Status = influxdb.RunSuccess.String()
run.Log = nil

if sys.CallFinishRun {
run, err = sys.TaskControlService.FinishRun(ctx, task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
// Analytical storage does not store run at
run.RunAt = time.Time{}
}

runs = append(runs, run)
}

found, _, err := sys.TaskService.FindRuns(ctx,
influxdb.RunFilter{
Task: task.ID,
Limit: 2,
AfterTime: scheduledFor.Add(time.Second * time.Duration(1)).Format(time.RFC3339),
BeforeTime: scheduledFor.Add(time.Second * time.Duration(4)).Format(time.RFC3339),
})
if err != nil {
t.Fatal(err)
}

assert.Equal(t, runs[2:4], found)

})

t.Run("ForceRun", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1036,7 +1115,7 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Fatal(err)
}

requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make a run.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make a run.

// Create two runs.
rc1, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
Expand Down Expand Up @@ -1345,7 +1424,7 @@ func testRunStorage(t *testing.T, sys *System) {
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
}

requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make two runs.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make two runs.

rc0, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
if err != nil {
Expand Down Expand Up @@ -1519,7 +1598,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) {
t.Errorf("expected retrying run that doesn't exist to return %v, got %v", influxdb.ErrRunNotFound, err)
}

requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make a run.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make a run.

rc, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
if err != nil {
Expand Down Expand Up @@ -1581,7 +1660,7 @@ func testLogsAcrossStorage(t *testing.T, sys *System) {
t.Fatal(err)
}

requestedAt := time.Now().Add(5 * time.Minute).UTC() // This should guarantee we can make two runs.
requestedAt := time.Now().Add(time.Hour * -1).UTC() // This should guarantee we can make two runs.

rc0, err := sys.TaskControlService.CreateRun(sys.Ctx, task.ID, requestedAt, requestedAt.Add(time.Second))
if err != nil {
Expand Down

0 comments on commit 64a562d

Please sign in to comment.