/
dao_run_job.go
163 lines (146 loc) · 5.96 KB
/
dao_run_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package workflow_v2
import (
"context"
"time"
"github.com/go-gorp/gorp"
"github.com/lib/pq"
"github.com/rockbears/log"
"go.opencensus.io/trace"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/telemetry"
)
func getAllRunJobs(ctx context.Context, db gorp.SqlExecutor, query gorpmapping.Query) ([]sdk.V2WorkflowRunJob, error) {
var dbWkfRunJobs []dbWorkflowRunJob
if err := gorpmapping.GetAll(ctx, db, query, &dbWkfRunJobs); err != nil {
return nil, err
}
jobRuns := make([]sdk.V2WorkflowRunJob, 0, len(dbWkfRunJobs))
for _, rj := range dbWkfRunJobs {
isValid, err := gorpmapping.CheckSignature(rj, rj.Signature)
if err != nil {
return nil, err
}
if !isValid {
log.Error(ctx, "run job %s on run %s: data corrupted", rj.ID, rj.WorkflowRunID)
continue
}
jobRuns = append(jobRuns, rj.V2WorkflowRunJob)
}
return jobRuns, nil
}
func getRunJob(ctx context.Context, db gorp.SqlExecutor, query gorpmapping.Query) (*sdk.V2WorkflowRunJob, error) {
var dbWkfRunJob dbWorkflowRunJob
found, err := gorpmapping.Get(ctx, db, query, &dbWkfRunJob)
if err != nil {
return nil, sdk.WithStack(err)
}
if !found {
return nil, sdk.WithStack(sdk.ErrNotFound)
}
isValid, err := gorpmapping.CheckSignature(dbWkfRunJob, dbWkfRunJob.Signature)
if err != nil {
return nil, err
}
if !isValid {
log.Error(ctx, "run job %s on run %s: data corrupted", dbWkfRunJob.ID, dbWkfRunJob.WorkflowRunID)
return nil, sdk.WithStack(sdk.ErrNotFound)
}
return &dbWkfRunJob.V2WorkflowRunJob, nil
}
func InsertRunJob(ctx context.Context, db gorpmapper.SqlExecutorWithTx, wrj *sdk.V2WorkflowRunJob) error {
ctx, next := telemetry.Span(ctx, "workflow_v2.InsertRunJob", trace.StringAttribute(telemetry.TagJob, wrj.JobID))
defer next()
wrj.ID = sdk.UUID()
if wrj.Queued.IsZero() {
wrj.Queued = time.Now()
}
dbWkfRunJob := &dbWorkflowRunJob{V2WorkflowRunJob: *wrj}
if err := gorpmapping.InsertAndSign(ctx, db, dbWkfRunJob); err != nil {
return err
}
*wrj = dbWkfRunJob.V2WorkflowRunJob
return nil
}
func UpdateJobRun(ctx context.Context, db gorpmapper.SqlExecutorWithTx, wrj *sdk.V2WorkflowRunJob) error {
ctx, next := telemetry.Span(ctx, "workflow_v2.UpdateJobRun")
defer next()
dbWkfRunJob := &dbWorkflowRunJob{V2WorkflowRunJob: *wrj}
if err := gorpmapping.UpdateAndSign(ctx, db, dbWkfRunJob); err != nil {
return err
}
*wrj = dbWkfRunJob.V2WorkflowRunJob
return nil
}
func LoadRunJobsByRunID(ctx context.Context, db gorp.SqlExecutor, runID string, runAttempt int64) ([]sdk.V2WorkflowRunJob, error) {
ctx, next := telemetry.Span(ctx, "LoadRunJobsByRunID")
defer next()
query := gorpmapping.NewQuery(`
SELECT *
FROM v2_workflow_run_job
WHERE workflow_run_id = $1 AND run_attempt = $2
`).Args(runID, runAttempt)
return getAllRunJobs(ctx, db, query)
}
func UnsafeLoadAllRunJobs(ctx context.Context, db gorp.SqlExecutor) ([]sdk.V2WorkflowRunJob, error) {
query := "SELECT * from v2_workflow_run_job"
var runJobs []sdk.V2WorkflowRunJob
if _, err := db.Select(&runJobs, query); err != nil {
return nil, sdk.WithStack(err)
}
return runJobs, nil
}
func LoadRunJobByID(ctx context.Context, db gorp.SqlExecutor, jobRunID string) (*sdk.V2WorkflowRunJob, error) {
ctx, next := telemetry.Span(ctx, "workflow_v2.LoadRunJobByID")
defer next()
query := gorpmapping.NewQuery("SELECT * from v2_workflow_run_job WHERE id = $1").Args(jobRunID)
return getRunJob(ctx, db, query)
}
func LoadRunJobByRunIDAndID(ctx context.Context, db gorp.SqlExecutor, wrID, jobRunID string) (*sdk.V2WorkflowRunJob, error) {
ctx, next := telemetry.Span(ctx, "workflow_v2.LoadRunJobByRunIDAndID")
defer next()
query := gorpmapping.NewQuery("SELECT * from v2_workflow_run_job WHERE workflow_run_id = $1 AND id = $2").Args(wrID, jobRunID)
return getRunJob(ctx, db, query)
}
func LoadRunJobByName(ctx context.Context, db gorp.SqlExecutor, wrID string, jobName string, runAttempt int64) (*sdk.V2WorkflowRunJob, error) {
ctx, next := telemetry.Span(ctx, "workflow_v2.LoadRunJobByName")
defer next()
query := gorpmapping.NewQuery("SELECT * from v2_workflow_run_job WHERE workflow_run_id = $1 AND job_id = $2 AND run_attempt = $3").Args(wrID, jobName, runAttempt)
return getRunJob(ctx, db, query)
}
func LoadQueuedRunJobByModelTypeAndRegion(ctx context.Context, db gorp.SqlExecutor, regionName string, modelType string) ([]sdk.V2WorkflowRunJob, error) {
ctx, next := telemetry.Span(ctx, "workflow_v2.LoadQueuedRunJobByModelTypeAndRegion")
defer next()
query := gorpmapping.NewQuery("SELECT * from v2_workflow_run_job WHERE status = $1 AND model_type = $2 and region = $3 ORDER BY queued").
Args(sdk.StatusWaiting, modelType, regionName)
return getAllRunJobs(ctx, db, query)
}
func LoadRunJobsByRunIDAndStatus(ctx context.Context, db gorp.SqlExecutor, runID string, status []string) ([]sdk.V2WorkflowRunJob, error) {
ctx, next := telemetry.Span(ctx, "workflow_v2.LoadRunJobsByRunIDAndStatus")
defer next()
query := gorpmapping.NewQuery("SELECT * from v2_workflow_run_job WHERE workflow_run_id = $1 AND status = ANY($2)").Args(runID, pq.StringArray(status))
return getAllRunJobs(ctx, db, query)
}
func LoadOldScheduledRunJob(ctx context.Context, db gorp.SqlExecutor, timeout int64) ([]sdk.V2WorkflowRunJob, error) {
ctx, next := telemetry.Span(ctx, "workflow_v2.LoadOldScheduledRunJob")
defer next()
query := gorpmapping.NewQuery(`
SELECT *
FROM v2_workflow_run_job
WHERE status = $1 AND now() - scheduled > $2 * INTERVAL '1' SECOND
LIMIT 100
`).Args(sdk.StatusScheduling, timeout)
return getAllRunJobs(ctx, db, query)
}
func LoadDeadJobs(ctx context.Context, db gorp.SqlExecutor) ([]sdk.V2WorkflowRunJob, error) {
query := gorpmapping.NewQuery(`
SELECT v2_workflow_run_job.*
FROM v2_workflow_run_job
LEFT JOIN v2_worker ON v2_worker.run_job_id = v2_workflow_run_job.id
WHERE v2_workflow_run_job.status = $1 AND v2_worker.id IS NULL
ORDER BY started
LIMIT 100
`).Args(sdk.StatusBuilding)
return getAllRunJobs(ctx, db, query)
}