/
run_executor.go
139 lines (110 loc) · 3.95 KB
/
run_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package services
import (
"fmt"
"time"
"github.com/smartcontractkit/chainlink/core/adapters"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services/synchronization"
"github.com/smartcontractkit/chainlink/core/store"
"github.com/smartcontractkit/chainlink/core/store/models"
"github.com/smartcontractkit/chainlink/core/store/orm"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
promAdapterCallsVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "adapter_perform_complete_total",
Help: "The total number of adapters which have completed",
},
[]string{"job_spec_id", "task_type", "status"},
)
)
//go:generate mockery --name RunExecutor --output ../internal/mocks/ --case=underscore
// RunExecutor handles the actual running of the job tasks
type RunExecutor interface {
Execute(*models.ID) error
}
type runExecutor struct {
store *store.Store
statsPusher synchronization.StatsPusher
}
// NewRunExecutor initializes a RunExecutor.
func NewRunExecutor(store *store.Store, statsPusher synchronization.StatsPusher) RunExecutor {
return &runExecutor{
store: store,
statsPusher: statsPusher,
}
}
// Execute performs the work associate with a job run
func (re *runExecutor) Execute(runID *models.ID) error {
run, err := re.store.Unscoped().FindJobRun(runID)
if err != nil {
return errors.Wrapf(err, "error finding run %s", runID)
}
for taskIndex := range run.TaskRuns {
taskRun := &run.TaskRuns[taskIndex]
if !run.GetStatus().Runnable() {
logger.Debugw("Run execution blocked", run.ForLogger("task", taskRun.ID.String())...)
break
}
if taskRun.Status.Completed() {
continue
}
if meetsMinRequiredIncomingConfirmations(&run, taskRun, run.ObservedHeight) {
start := time.Now()
// NOTE: adapters may define and return the new job run status in here
result := re.executeTask(&run, *taskRun)
taskRun.ApplyOutput(result)
run.ApplyOutput(result)
elapsed := time.Since(start).Seconds()
logger.Debugw(fmt.Sprintf("Executed task %s", taskRun.TaskSpec.Type), run.ForLogger("task", taskRun.ID.String(), "elapsed", elapsed)...)
} else {
logger.Debugw("Pausing run pending incoming confirmations",
run.ForLogger("required_height", taskRun.MinRequiredIncomingConfirmations)...,
)
taskRun.Status = models.RunStatusPendingIncomingConfirmations
run.SetStatus(models.RunStatusPendingIncomingConfirmations)
}
if err := re.store.ORM.SaveJobRun(&run); errors.Cause(err) == orm.ErrOptimisticUpdateConflict {
logger.Debugw("Optimistic update conflict while updating run", run.ForLogger()...)
return nil
} else if err != nil {
return err
}
re.statsPusher.PushNow()
}
if run.GetStatus().Finished() {
if run.GetStatus().Errored() {
logger.Warnw("Task failed", run.ForLogger()...)
} else {
logger.Debugw("All tasks complete for run", run.ForLogger()...)
}
}
return nil
}
func (re *runExecutor) executeTask(run *models.JobRun, taskRun models.TaskRun) models.RunOutput {
taskSpec := taskRun.TaskSpec
params, err := models.Merge(run.RunRequest.RequestParams, taskSpec.Params)
if err != nil {
return models.NewRunOutputError(err)
}
taskSpec.Params = params
adapter, err := adapters.For(taskSpec, re.store.Config, re.store.ORM)
if err != nil {
return models.NewRunOutputError(err)
}
previousTaskRun := run.PreviousTaskRun()
previousTaskInput := models.JSON{}
if previousTaskRun != nil {
previousTaskInput = previousTaskRun.Result.Data
}
data, err := models.Merge(run.RunRequest.RequestParams, previousTaskInput, taskRun.Result.Data)
if err != nil {
return models.NewRunOutputError(err)
}
input := *models.NewRunInput(run.ID, *taskRun.ID, data, taskRun.Status)
result := adapter.Perform(input, re.store)
promAdapterCallsVec.WithLabelValues(run.JobSpecID.String(), string(adapter.TaskType()), string(result.Status())).Inc()
return result
}