-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
external_initiators.go
161 lines (143 loc) · 4.55 KB
/
external_initiators.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package services
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/static"
"github.com/smartcontractkit/chainlink/core/store"
"github.com/smartcontractkit/chainlink/core/store/models"
"gorm.io/gorm"
"github.com/pkg/errors"
)
// JobSpecNotice is sent to the External Initiator when JobSpecs are created.
type JobSpecNotice struct {
JobID models.JobID `json:"jobId"`
Type string `json:"type"`
Params models.JSON `json:"params,omitempty"`
}
// NewJobSpecNotice returns a new JobSpec.
func NewJobSpecNotice(initiator models.Initiator, js models.JobSpec) (*JobSpecNotice, error) {
if initiator.Body == nil {
return nil, errors.New("body must be defined")
}
return &JobSpecNotice{
JobID: js.ID,
Type: initiator.Type,
Params: *initiator.Body,
}, nil
}
func newNotifyHTTPRequest(jsn JobSpecNotice, ei models.ExternalInitiator) (*http.Request, error) {
buf, err := json.Marshal(jsn)
if err != nil {
return nil, errors.Wrap(err, "new Job Spec notification")
}
req, err := http.NewRequest(http.MethodPost, ei.URL.String(), bytes.NewBuffer(buf))
if err != nil {
return nil, err
}
setHeaders(req, ei)
return req, nil
}
// NewExternalInitiatorManager returns the concrete externalInitiatorManager
func NewExternalInitiatorManager() *externalInitiatorManager {
return &externalInitiatorManager{}
}
type externalInitiatorManager struct{}
// Notify sends a POST notification to the External Initiator
// responsible for initiating the Job Spec.
func (externalInitiatorManager) Notify(
js models.JobSpec,
store *store.Store,
) error {
initrs := js.InitiatorsFor(models.InitiatorExternal)
if len(initrs) > 1 {
return errors.New("must have one or less External Initiators")
}
if len(initrs) == 0 {
return nil
}
initr := initrs[0]
ei, err := store.FindExternalInitiatorByName(initr.Name)
if err != nil {
return errors.Wrap(err, "external initiator")
}
if ei.URL == nil {
return nil
}
notice, err := NewJobSpecNotice(initr, js)
if err != nil {
return errors.Wrap(err, "new Job Spec notification")
}
req, err := newNotifyHTTPRequest(*notice, ei)
if err != nil {
return errors.Wrap(err, "creating notify HTTP request")
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Wrap(err, "could not notify '%s' (%s)")
}
defer logger.ErrorIfCalling(resp.Body.Close)
if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {
return fmt.Errorf(" notify '%s' (%s) received bad response '%s'", ei.Name, ei.URL, resp.Status)
}
return nil
}
func (externalInitiatorManager) DeleteJob(db *gorm.DB, jobID models.JobID) error {
ei, err := findExternalInitiatorForJob(db, jobID)
if err != nil {
return errors.Wrapf(err, "error looking up external initiator for job with id %s", jobID)
}
if ei == nil {
return nil
}
if ei.URL == nil {
return nil
}
req, err := newDeleteJobFromExternalInitiatorHTTPRequest(*ei, jobID)
if err != nil {
return errors.Wrap(err, "creating delete HTTP request")
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Wrapf(err, "could not delete job from remote external initiator at %s", req.URL)
}
defer logger.ErrorIfCalling(resp.Body.Close)
if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {
return fmt.Errorf(" notify '%s' (%s) received bad response '%s'", ei.Name, ei.URL, resp.Status)
}
return nil
}
func newDeleteJobFromExternalInitiatorHTTPRequest(ei models.ExternalInitiator, id models.JobID) (*http.Request, error) {
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%s", ei.URL.String(), id), bytes.NewBuffer(nil))
if err != nil {
return nil, err
}
setHeaders(req, ei)
return req, nil
}
func setHeaders(req *http.Request, ei models.ExternalInitiator) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set(static.ExternalInitiatorAccessKeyHeader, ei.OutgoingToken)
req.Header.Set(static.ExternalInitiatorSecretHeader, ei.OutgoingSecret)
}
func findExternalInitiatorForJob(db *gorm.DB, id models.JobID) (exi *models.ExternalInitiator, err error) {
exi = new(models.ExternalInitiator)
err = db.
Joins("INNER JOIN initiators ON initiators.name = external_initiators.name").
Where("initiators.job_spec_id = ?", id).
First(exi).
Error
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return exi, err
}
type NullExternalInitiatorManager struct{}
func (NullExternalInitiatorManager) Notify(models.JobSpec, *store.Store) error {
return nil
}
func (NullExternalInitiatorManager) DeleteJob(db *gorm.DB, jobID models.JobID) error {
return nil
}