-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
job_run.go
293 lines (257 loc) · 9.08 KB
/
job_run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
package models
import (
"fmt"
"math/big"
"time"
"github.com/smartcontractkit/chainlink/core/assets"
clnull "github.com/smartcontractkit/chainlink/core/null"
"github.com/smartcontractkit/chainlink/core/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
null "gopkg.in/guregu/null.v3"
)
var (
promTotalRunUpdates = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "run_status_update_total",
Help: "The total number of status updates for Job Runs",
},
[]string{"job_spec_id", "from_status", "status"},
)
)
// JobRun tracks the status of a job by holding its TaskRuns and the
// Result of each Run.
type JobRun struct {
ID *ID `json:"id" gorm:"primary_key;not null"`
JobSpecID *ID `json:"jobId"`
Result RunResult `json:"result" gorm:"foreignkey:ResultID;association_autoupdate:true;association_autocreate:true"`
ResultID clnull.Int64 `json:"-"`
RunRequest RunRequest `json:"-" gorm:"foreignkey:RunRequestID;association_autoupdate:true;association_autocreate:true"`
RunRequestID clnull.Int64 `json:"-"`
Status RunStatus `json:"status" gorm:"default:'unstarted'"`
TaskRuns []TaskRun `json:"taskRuns"`
CreatedAt time.Time `json:"createdAt"`
FinishedAt null.Time `json:"finishedAt"`
UpdatedAt time.Time `json:"updatedAt"`
Initiator Initiator `json:"initiator" gorm:"foreignkey:InitiatorID;association_autoupdate:false;association_autocreate:false"`
InitiatorID int64 `json:"-"`
CreationHeight *utils.Big `json:"creationHeight"`
ObservedHeight *utils.Big `json:"observedHeight"`
DeletedAt null.Time `json:"-"`
Payment *assets.Link `json:"payment,omitempty"`
}
// MakeJobRun returns a new JobRun copy
func MakeJobRun(job *JobSpec, now time.Time, initiator *Initiator, currentHeight *big.Int, runRequest *RunRequest) JobRun {
run := JobRun{
ID: NewID(),
JobSpecID: job.ID,
CreatedAt: now,
UpdatedAt: now,
Initiator: *initiator,
InitiatorID: initiator.ID,
TaskRuns: make([]TaskRun, len(job.Tasks)),
RunRequest: *runRequest,
Payment: runRequest.Payment,
}
if currentHeight != nil {
run.CreationHeight = utils.NewBig(currentHeight)
run.ObservedHeight = utils.NewBig(currentHeight)
}
for i, task := range job.Tasks {
run.TaskRuns[i] = TaskRun{
ID: NewID(),
JobRunID: run.ID,
TaskSpec: task,
Status: RunStatusUnstarted,
}
}
run.SetStatus(RunStatusInProgress)
return run
}
// GetID returns the ID of this structure for jsonapi serialization.
func (jr JobRun) GetID() string {
return jr.ID.String()
}
// GetName returns the pluralized "type" of this structure for jsonapi serialization.
func (jr JobRun) GetName() string {
return "runs"
}
// SetStatus updates run status.
func (jr *JobRun) SetStatus(status RunStatus) {
oldStatus := jr.Status
jr.Status = status
if jr.Status.Completed() && jr.TasksRemain() {
jr.Status = RunStatusInProgress
} else if jr.Status.Finished() {
jr.FinishedAt = null.TimeFrom(time.Now())
}
promTotalRunUpdates.WithLabelValues(jr.JobSpecID.String(), string(oldStatus), string(status)).Inc()
}
// GetStatus returns the JobRun's RunStatus
func (jr *JobRun) GetStatus() RunStatus {
return jr.Status
}
// SetID is used to set the ID of this structure when deserializing from jsonapi documents.
func (jr *JobRun) SetID(value string) error {
return jr.ID.UnmarshalText([]byte(value))
}
// ForLogger formats the JobRun for a common formatting in the log.
func (jr JobRun) ForLogger(kvs ...interface{}) []interface{} {
output := []interface{}{
"job", jr.JobSpecID.String(),
"run", jr.ID.String(),
"status", jr.Status,
}
if jr.CreationHeight != nil {
output = append(output, "creation_height", jr.CreationHeight.ToInt())
}
if jr.ObservedHeight != nil {
output = append(output, "observed_height", jr.ObservedHeight.ToInt())
}
if jr.HasError() {
output = append(output, "job_error", jr.ErrorString())
}
if jr.Status.Completed() {
output = append(output, "link_earned", jr.Payment)
} else {
output = append(output, "input_amount", jr.Payment)
}
if jr.RunRequest.RequestID != nil {
output = append(output, "external_id", jr.RunRequest.RequestID)
}
return append(kvs, output...)
}
// HasError returns true if this JobRun has errored
func (jr JobRun) HasError() bool {
return jr.Status.Errored()
}
// NextTaskRunIndex returns the position of the next unfinished task
func (jr *JobRun) NextTaskRunIndex() (int, bool) {
for index, tr := range jr.TaskRuns {
if tr.Status.CanStart() {
return index, true
}
}
return 0, false
}
// NextTaskRun returns the next immediate TaskRun in the list
// of unfinished TaskRuns.
func (jr *JobRun) NextTaskRun() *TaskRun {
nextTaskIndex, runnable := jr.NextTaskRunIndex()
if runnable {
return &jr.TaskRuns[nextTaskIndex]
}
return nil
}
// PreviousTaskRun returns the last task to be processed, if it exists
func (jr *JobRun) PreviousTaskRun() *TaskRun {
index, runnable := jr.NextTaskRunIndex()
if runnable && index > 0 {
return &jr.TaskRuns[index-1]
}
return nil
}
// TasksRemain returns true if there are unfinished tasks left for this job run
func (jr *JobRun) TasksRemain() bool {
_, runnable := jr.NextTaskRunIndex()
return runnable
}
// SetError sets this job run to failed and saves the error message
func (jr *JobRun) SetError(err error) {
jr.Result.ErrorMessage = null.StringFrom(err.Error())
jr.SetStatus(RunStatusErrored)
}
// Cancel sets this run as cancelled, it should no longer be processed.
func (jr *JobRun) Cancel() {
currentTaskRun := jr.NextTaskRun()
if currentTaskRun != nil {
currentTaskRun.Status = RunStatusCancelled
}
jr.SetStatus(RunStatusCancelled)
}
// ApplyOutput updates the JobRun's Result and Status
func (jr *JobRun) ApplyOutput(result RunOutput) {
if result.HasError() {
jr.SetError(result.Error())
return
}
jr.Result.Data = result.Data()
jr.SetStatus(result.Status())
}
// ApplyBridgeRunResult saves the input from a BridgeAdapter
func (jr *JobRun) ApplyBridgeRunResult(result BridgeRunResult) {
if result.HasError() {
jr.SetError(result.GetError())
}
jr.Result.Data = result.Data
jr.SetStatus(result.Status)
}
// ErrorString returns the error as a string if present, otherwise "".
func (jr *JobRun) ErrorString() string {
return jr.Result.ErrorMessage.ValueOrZero()
}
// RunRequest stores the fields used to initiate the parent job run.
type RunRequest struct {
ID int64 `gorm:"primary_key"`
RequestID *common.Hash
TxHash *common.Hash
BlockHash *common.Hash
Requester *common.Address
CreatedAt time.Time
Payment *assets.Link
RequestParams JSON `gorm:"default: '{}';not null"`
}
// NewRunRequest returns a new RunRequest instance.
func NewRunRequest(requestParams JSON) *RunRequest {
return &RunRequest{CreatedAt: time.Now(), RequestParams: requestParams}
}
// TaskRun stores the Task and represents the status of the
// Task to be ran.
type TaskRun struct {
ID *ID `json:"id" gorm:"primary_key;not null"`
JobRunID *ID `json:"-"`
Result RunResult `json:"result"`
ResultID clnull.Uint32 `json:"-"`
Status RunStatus `json:"status" gorm:"default:'unstarted'"`
TaskSpec TaskSpec `json:"task" gorm:"association_autoupdate:false;association_autocreate:false"`
TaskSpecID int64 `json:"-"`
MinRequiredIncomingConfirmations clnull.Uint32 `json:"minimumConfirmations" gorm:"column:minimum_confirmations"`
ObservedIncomingConfirmations clnull.Uint32 `json:"confirmations" gorm:"column:confirmations"`
CreatedAt time.Time `json:"-"`
UpdatedAt time.Time `json:"-"`
}
// String returns info on the TaskRun as "ID,Type,Status,Result".
func (tr TaskRun) String() string {
return fmt.Sprintf("TaskRun(%v,%v,%v,%v)", tr.ID.String(), tr.TaskSpec.Type, tr.Status, tr.Result)
}
// SetError sets this task run to failed and saves the error message
func (tr *TaskRun) SetError(err error) {
tr.Result.ErrorMessage = null.StringFrom(err.Error())
tr.Status = RunStatusErrored
}
// ApplyBridgeRunResult updates the TaskRun's Result and Status
func (tr *TaskRun) ApplyBridgeRunResult(result BridgeRunResult) {
if result.HasError() {
tr.SetError(result.GetError())
}
tr.Result.Data = result.Data
tr.Status = result.Status
}
// ApplyOutput updates the TaskRun's Result and Status
func (tr *TaskRun) ApplyOutput(result RunOutput) {
if result.HasError() {
tr.SetError(result.Error())
return
}
tr.Result.Data = result.Data()
tr.Status = result.Status()
}
// RunResult keeps track of the outcome of a TaskRun or JobRun. It stores the
// Data and ErrorMessage.
type RunResult struct {
ID int64 `json:"-" gorm:"primary_key;auto_increment"`
Data JSON `json:"data" gorm:"type:text"`
ErrorMessage null.String `json:"error"`
CreatedAt time.Time `json:"-"`
UpdatedAt time.Time `json:"-"`
}