Skip to content

Commit

Permalink
refactor(task): look up runs more efficiently
Browse files Browse the repository at this point in the history
This switches run status from a tag to a field. This is likely a
breaking change to existing task logs.

Using a one-off local query, for 250 records, the previous approach took
around 10 seconds and the new approach is about 30 milliseconds. At 1000
records, the previous approach was roughly 110 seconds and the new
approach is around 70 milliseconds.
  • Loading branch information
mark-rushakoff committed Jan 16, 2019
1 parent a5154cf commit f93ecaa
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 49 deletions.
6 changes: 3 additions & 3 deletions task/backend/point_logwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ const (
runIDField = "runID"
scheduledForField = "scheduledFor"
requestedAtField = "requestedAt"
statusField = "status"

taskIDTag = "taskID"
statusTag = "status"

// Fixed system bucket ID for task and run logs.
taskSystemBucketID platform.ID = 10
Expand All @@ -40,10 +40,10 @@ func NewPointLogWriter(pw PointsWriter) *PointLogWriter {

func (p *PointLogWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error {
tags := models.Tags{
models.NewTag([]byte(statusTag), []byte(status.String())),
models.NewTag([]byte(taskIDTag), []byte(rlb.Task.ID.String())),
}
fields := make(map[string]interface{}, 3)
fields := make(map[string]interface{}, 4)
fields[statusField] = status.String()
fields[runIDField] = rlb.RunID.String()
fields[scheduledForField] = time.Unix(rlb.RunScheduledFor, 0).UTC().Format(time.RFC3339)
if rlb.RequestedAt != 0 {
Expand Down
78 changes: 32 additions & 46 deletions task/backend/query_logreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,17 @@ func (qlr *QueryLogReader) ListRuns(ctx context.Context, runFilter platform.RunF
scheduledBefore = runFilter.BeforeTime
}

listScript := fmt.Sprintf(`supl = from(bucketID: "000000000000000a")
listScript := fmt.Sprintf(`
from(bucketID: "000000000000000a")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "records" and r.taskID == %q)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["scheduledFor"])
|> filter(fn: (r) => r.scheduledFor < %q and r.scheduledFor > %q)
|> sort(desc: true, columns: ["_start"]) |> limit(n: 1)
main = from(bucketID: "000000000000000a")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "records" and r.taskID == %q)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> pivot(rowKey:["runID"], columnKey: ["status"], valueColumn: "_time")
|> filter(fn: (r) => r.runID > %q)
join(tables: {main: main, supl: supl}, on: ["_start", "_stop", "orgID", "taskID", "runID", "_measurement"])
|> group(columns: ["_measurement"])
%s
|> yield(name: "result")
`, runFilter.Task.String(), scheduledBefore, scheduledAfter, runFilter.Task.String(), afterID, limit)
|> filter(fn: (r) => r._measurement == "records" and r.taskID == %q)
|> drop(columns: ["_start", "_stop"])
|> group(columns: ["_measurement", "taskID", "scheduledFor", "status", "runID"])
|> influxFieldsAsCols()
|> filter(fn: (r) => r.scheduledFor < %q and r.scheduledFor > %q and r.runID > %q)
|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")
%s
`, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, limit)

auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
Expand All @@ -147,35 +138,25 @@ join(tables: {main: main, supl: supl}, on: ["_start", "_stop", "orgID", "taskID"
}

func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
// TODO: sort |> limit will be replaced with last once last is working.
showScript := fmt.Sprintf(`supl = from(bucketID: "000000000000000a")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "records")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r.runID == %q)
|> group(columns: ["scheduledFor"])
|> sort(desc: true, columns: ["_start"]) |> limit(n: 1)
showScript := fmt.Sprintf(`
logs = from(bucketID: "000000000000000a")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "logs")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "logs")
|> drop(columns: ["_start", "_stop"])
|> influxFieldsAsCols()
|> filter(fn: (r) => r.runID == %q)
|> yield(name: "logs")
main = from(bucketID: "000000000000000a")
from(bucketID: "000000000000000a")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "records")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r.runID == %q)
|> pivot(rowKey:["runID"], columnKey: ["status"], valueColumn: "_time")
join(
tables: {main: main, supl: supl},
on: ["_start", "_stop", "orgID", "taskID", "runID", "_measurement"],
) |> yield(name: "result")
logs |> yield(name: "logs")
`, runID.String(), runID.String(), runID.String())
|> filter(fn: (r) => r._measurement == "records")
|> drop(columns: ["_start", "_stop"])
|> group(columns: ["_measurement", "taskID", "scheduledFor", "status", "runID"])
|> influxFieldsAsCols()
|> filter(fn: (r) => r.runID == %q)
|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")
|> yield(name: "result")
`, runID.String(), runID.String())

auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
Expand Down Expand Up @@ -270,8 +251,6 @@ func (re *runExtractor) extractRecord(cr flux.ColReader) error {
r.RequestedAt = cr.Strings(j).ValueString(i)
case scheduledForField:
r.ScheduledFor = cr.Strings(j).ValueString(i)
case "status":
r.Status = cr.Strings(j).ValueString(i)
case "runID":
id, err := platform.IDFromString(cr.Strings(j).ValueString(i))
if err != nil {
Expand All @@ -286,8 +265,15 @@ func (re *runExtractor) extractRecord(cr flux.ColReader) error {
r.TaskID = *id
case RunStarted.String():
r.StartedAt = values.Time(cr.Times(j).Value(i)).Time().Format(time.RFC3339Nano)
if r.Status == "" {
// Only set status if it wasn't already set.
r.Status = col.Label
}
case RunSuccess.String(), RunFail.String(), RunCanceled.String():
r.FinishedAt = values.Time(cr.Times(j).Value(i)).Time().Format(time.RFC3339Nano)
// Finished can be set unconditionally;
// it's fine to overwrite if the status was already set to started.
r.Status = col.Label
}
}

Expand Down

0 comments on commit f93ecaa

Please sign in to comment.