-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
task.http.go
129 lines (112 loc) · 4.63 KB
/
task.http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package pipeline
import (
"context"
"encoding/json"
"net/http"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"
"github.com/smartcontractkit/chainlink/v2/core/logger"
clhttp "github.com/smartcontractkit/chainlink/v2/core/utils/http"
)
// Return types:
//
// string
type HTTPTask struct {
BaseTask `mapstructure:",squash"`
Method string
URL string
RequestData string `json:"requestData"`
AllowUnrestrictedNetworkAccess string
Headers string
config Config
httpClient *http.Client
unrestrictedHTTPClient *http.Client
}
var _ Task = (*HTTPTask)(nil)
var (
promHTTPFetchTime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "pipeline_task_http_fetch_time",
Help: "Time taken to fully execute the HTTP request",
},
[]string{"pipeline_task_spec_id"},
)
promHTTPResponseBodySize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "pipeline_task_http_response_body_size",
Help: "Size (in bytes) of the HTTP response body",
},
[]string{"pipeline_task_spec_id"},
)
)
func (t *HTTPTask) Type() TaskType {
return TaskTypeHTTP
}
func (t *HTTPTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) {
_, err := CheckInputs(inputs, -1, -1, 0)
if err != nil {
return Result{Error: errors.Wrap(err, "task inputs")}, runInfo
}
var (
method StringParam
url URLParam
requestData MapParam
allowUnrestrictedNetworkAccess BoolParam
reqHeaders StringSliceParam
)
err = multierr.Combine(
errors.Wrap(ResolveParam(&method, From(NonemptyString(t.Method), "GET")), "method"),
errors.Wrap(ResolveParam(&url, From(VarExpr(t.URL, vars), NonemptyString(t.URL))), "url"),
errors.Wrap(ResolveParam(&requestData, From(VarExpr(t.RequestData, vars), JSONWithVarExprs(t.RequestData, vars, false), nil)), "requestData"),
// Any hardcoded strings used for URL uses the unrestricted HTTP adapter
// Interpolated variable URLs use restricted HTTP adapter by default
// You must set allowUnrestrictedNetworkAccess=true on the task to enable variable-interpolated URLs to make restricted network requests
errors.Wrap(ResolveParam(&allowUnrestrictedNetworkAccess, From(NonemptyString(t.AllowUnrestrictedNetworkAccess), !variableRegexp.MatchString(t.URL))), "allowUnrestrictedNetworkAccess"),
errors.Wrap(ResolveParam(&reqHeaders, From(NonemptyString(t.Headers), "[]")), "reqHeaders"),
)
if err != nil {
return Result{Error: err}, runInfo
}
if len(reqHeaders)%2 != 0 {
return Result{Error: errors.Errorf("headers must have an even number of elements")}, runInfo
}
requestDataJSON, err := json.Marshal(requestData)
if err != nil {
return Result{Error: err}, runInfo
}
lggr.Debugw("HTTP task: sending request",
"requestData", string(requestDataJSON),
"url", url.String(),
"method", method,
"reqHeaders", reqHeaders,
"allowUnrestrictedNetworkAccess", allowUnrestrictedNetworkAccess,
)
requestCtx, cancel := httpRequestCtx(ctx, t, t.config)
defer cancel()
var client *http.Client
if allowUnrestrictedNetworkAccess {
client = t.unrestrictedHTTPClient
} else {
client = t.httpClient
}
responseBytes, statusCode, respHeaders, elapsed, err := makeHTTPRequest(requestCtx, lggr, method, url, reqHeaders, requestData, client, t.config.DefaultHTTPLimit())
if err != nil {
if errors.Is(errors.Cause(err), clhttp.ErrDisallowedIP) {
err = errors.Wrap(err, `connections to local resources are disabled by default, if you are sure this is safe, you can enable on a per-task basis by setting allowUnrestrictedNetworkAccess="true" in the pipeline task spec, e.g. fetch [type="http" method=GET url="$(decode_cbor.url)" allowUnrestrictedNetworkAccess="true"]`)
}
return Result{Error: err}, RunInfo{IsRetryable: isRetryableHTTPError(statusCode, err)}
}
lggr.Debugw("HTTP task got response",
"response", string(responseBytes),
"respHeaders", respHeaders,
"url", url.String(),
"dotID", t.DotID(),
)
promHTTPFetchTime.WithLabelValues(t.DotID()).Set(float64(elapsed))
promHTTPResponseBodySize.WithLabelValues(t.DotID()).Set(float64(len(responseBytes)))
// NOTE: We always stringify the response since this is required for all current jobs.
// If a binary response is required we might consider adding an adapter
// flag such as "BinaryMode: true" which passes through raw binary as the
// value instead.
return Result{Value: string(responseBytes)}, runInfo
}