/
task.bridge.go
89 lines (75 loc) · 2.01 KB
/
task.bridge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package pipeline
import (
"context"
"fmt"
"net/url"
"github.com/jinzhu/gorm"
"github.com/pkg/errors"
"github.com/vordev/VOR/core/logger"
"github.com/vordev/VOR/core/store/models"
)
type BridgeTask struct {
BaseTask `mapstructure:",squash"`
Name string `json:"name"`
RequestData HttpRequestData `json:"requestData"`
txdb *gorm.DB
config Config
}
var _ Task = (*BridgeTask)(nil)
func (t *BridgeTask) Type() TaskType {
return TaskTypeBridge
}
func (t *BridgeTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) (result Result) {
if len(inputs) > 0 {
return Result{Error: errors.Wrapf(ErrWrongInputCardinality, "BridgeTask requires 0 inputs")}
}
url, err := t.getBridgeURLFromName()
if err != nil {
return Result{Error: err}
}
var meta map[string]interface{}
switch v := taskRun.PipelineRun.Meta.Val.(type) {
case map[string]interface{}:
meta = v
case nil:
default:
logger.Warnw(`"meta" field on task run is malformed, discarding`,
"jobID", taskRun.PipelineRun.PipelineSpecID,
"taskRunID", taskRun.ID,
"task", taskRun.PipelineTaskSpec.DotID,
"meta", taskRun.PipelineRun.Meta.Val,
)
}
result = (&HTTPTask{
URL: models.WebURL(url),
Method: "POST",
RequestData: withIDAndMeta(t.RequestData, taskRun.PipelineRunID, meta),
config: t.config,
}).Run(ctx, taskRun, inputs)
if result.Error != nil {
return result
}
logger.Debugw("Bridge task: fetched answer",
"answer", string(result.Value.([]byte)),
"url", url.String(),
)
return result
}
func (t BridgeTask) getBridgeURLFromName() (url.URL, error) {
task := models.TaskType(t.Name)
bridge, err := FindBridge(t.txdb, task)
if err != nil {
return url.URL{}, err
}
bridgeURL := url.URL(bridge.URL)
return bridgeURL, nil
}
func withIDAndMeta(request HttpRequestData, runID int64, meta HttpRequestData) HttpRequestData {
output := make(HttpRequestData)
for k, v := range request {
output[k] = v
}
output["id"] = fmt.Sprintf("%d", runID)
output["meta"] = meta
return output
}