-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
pipeline_runner_adapter.go
93 lines (80 loc) · 2.6 KB
/
pipeline_runner_adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package generic
import (
"context"
"time"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)
var _ types.PipelineRunnerService = (*PipelineRunnerAdapter)(nil)
type pipelineRunner interface {
ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, results pipeline.TaskRunResults, err error)
}
type PipelineRunnerAdapter struct {
runner pipelineRunner
job job.Job
logger logger.Logger
}
func (p *PipelineRunnerAdapter) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) (types.TaskResults, error) {
s := pipeline.Spec{
DotDagSource: spec,
CreatedAt: time.Now(),
MaxTaskDuration: models.Interval(options.MaxTaskDuration),
JobID: p.job.ID,
JobName: p.job.Name.ValueOrZero(),
JobType: string(p.job.Type),
}
defaultVars := map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": p.job.ID,
"externalJobID": p.job.ExternalJobID,
"name": p.job.Name.ValueOrZero(),
},
}
merge(defaultVars, vars.Vars)
finalVars := pipeline.NewVarsFrom(defaultVars)
_, trrs, err := p.runner.ExecuteAndInsertFinishedRun(ctx, s, finalVars, p.logger, true)
if err != nil {
return nil, err
}
taskResults := make([]types.TaskResult, len(trrs))
for i, trr := range trrs {
taskResults[i] = types.TaskResult{
ID: trr.ID.String(),
Type: string(trr.Task.Type()),
Index: int(trr.Task.OutputIndex()),
TaskValue: types.TaskValue{
Value: trr.Result.OutputDB(),
Error: trr.Result.Error,
IsTerminal: len(trr.Task.Outputs()) == 0,
},
}
}
return taskResults, nil
}
func NewPipelineRunnerAdapter(logger logger.Logger, job job.Job, runner pipelineRunner) *PipelineRunnerAdapter {
return &PipelineRunnerAdapter{
logger: logger,
job: job,
runner: runner,
}
}
// merge merges mapTwo into mapOne, modifying mapOne in the process.
func merge(mapOne, mapTwo map[string]interface{}) {
for k, v := range mapTwo {
// if `mapOne` doesn't have `k`, then nothing to do, just assign v to `mapOne`.
if _, ok := mapOne[k]; !ok {
mapOne[k] = v
} else {
vAsMap, vOK := v.(map[string]interface{})
mapOneVAsMap, moOK := mapOne[k].(map[string]interface{})
if vOK && moOK {
merge(mapOneVAsMap, vAsMap)
} else {
mapOne[k] = v
}
}
}
}