Skip to content

Commit

Permalink
feat(tasks): expose lastRunStatus and lastRunError in task API
Browse files Browse the repository at this point in the history
  • Loading branch information
gavincabbage committed Oct 30, 2019
1 parent 2498011 commit aeb61fb
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features

1. [15313](https://github.com/influxdata/influxdb/pull/15313): Add shortcut for toggling comments in script editor
1. [15650](https://github.com/influxdata/influxdb/pull/15650): Expose last run status and last run error in task API

### UI Improvements
1. [15503](https://github.com/influxdata/influxdb/pull/15503): Redesign page headers to be more space efficient
Expand Down
10 changes: 10 additions & 0 deletions http/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7155,6 +7155,16 @@ components:
type: string
format: date-time
readOnly: true
lastRunStatus:
readOnly: true
type: string
enum:
- failed
- success
- canceled
lastRunError:
readOnly: true
type: string
createdAt:
type: string
format: date-time
Expand Down
23 changes: 19 additions & 4 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf
}
if latestCompletedRun != nil {
latestCompleted := latestCompletedRun.ScheduledFor
if err != nil {
return nil, err
}
if t.LatestCompleted != "" {
tlc, err := time.Parse(time.RFC3339, t.LatestCompleted)
if err == nil && latestCompleted.After(tlc) {
Expand Down Expand Up @@ -745,6 +742,15 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
}
}

if upd.LastRunStatus != nil {
task.LastRunStatus = *upd.LastRunStatus
if *upd.LastRunStatus == "failed" && upd.LastRunError != nil {
task.LastRunError = *upd.LastRunError
} else {
task.LastRunError = ""
}
}

// save the updated task
bucket, err := tx.Bucket(taskBucket)
if err != nil {
Expand Down Expand Up @@ -1571,7 +1577,16 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I

// tell task to update latest completed
scheduledStr := r.ScheduledFor.Format(time.RFC3339)
_, err = s.updateTask(ctx, tx, taskID, influxdb.TaskUpdate{LatestCompleted: &scheduledStr})
_, err = s.updateTask(ctx, tx, taskID, influxdb.TaskUpdate{
LatestCompleted: &scheduledStr,
LastRunStatus: &r.Status,
LastRunError: func() *string {
if r.Status == "failed" && len(r.Log) > 0 {
return &r.Log[len(r.Log)-1].Message
}
return nil
}(),
})
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Task struct {
Cron string `json:"cron,omitempty"`
Offset string `json:"offset,omitempty"`
LatestCompleted string `json:"latestCompleted,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
UpdatedAt string `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Expand Down Expand Up @@ -230,6 +232,8 @@ type TaskUpdate struct {

// LatestCompleted us to set latest completed on startup to skip task catchup
LatestCompleted *string `json:"-"`
LastRunStatus *string `json:"-"`
LastRunError *string `json:"-"`
Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.

// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
Expand Down
50 changes: 50 additions & 0 deletions task/servicetest/servicetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,10 @@ func testUpdate(t *testing.T, sys *System) {
t.Fatal(err)
}

if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.Created.RunID, time.Now(), backend.RunSuccess); err != nil {
t.Fatal(err)
}

if _, err := sys.TaskControlService.FinishRun(sys.Ctx, task.ID, rc.Created.RunID); err != nil {
t.Fatal(err)
}
Expand All @@ -577,6 +581,52 @@ func testUpdate(t *testing.T, sys *System) {
t.Fatalf("executed task has not updated latest complete: expected %s > %s", st2.LatestCompleted, st.LatestCompleted)
}

if st2.LastRunStatus != "success" {
t.Fatal("executed task has not updated last run status")
}

if st2.LastRunError != "" {
t.Fatal("executed task has updated last run error on success")
}

rc2, err := sys.TaskControlService.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix)
if err != nil {
t.Fatal(err)
}

if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.Created.RunID, time.Now(), backend.RunStarted); err != nil {
t.Fatal(err)
}

if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.Created.RunID, time.Now(), backend.RunFail); err != nil {
t.Fatal(err)
}

if err := sys.TaskControlService.AddRunLog(sys.Ctx, task.ID, rc2.Created.RunID, time.Now(), "error message"); err != nil {
t.Fatal(err)
}

if _, err := sys.TaskControlService.FinishRun(sys.Ctx, task.ID, rc2.Created.RunID); err != nil {
t.Fatal(err)
}

st3, err := sys.TaskService.FindTaskByID(sys.Ctx, task.ID)
if err != nil {
t.Fatal(err)
}

if st3.LatestCompleted <= st2.LatestCompleted {
t.Fatalf("executed task has not updated latest complete: expected %s > %s", st3.LatestCompleted, st2.LatestCompleted)
}

if st3.LastRunStatus != "failed" {
t.Fatal("executed task has not updated last run status")
}

if st3.LastRunError != "error message" {
t.Fatal("executed task has not updated last run error on failed")
}

now = time.Now()
flux := fmt.Sprintf(scriptFmt, 1)
task, err = sys.TaskService.UpdateTask(authorizedCtx, task.ID, influxdb.TaskUpdate{Flux: &flux})
Expand Down

0 comments on commit aeb61fb

Please sign in to comment.