Skip to content

Commit

Permalink
fix(api): workflow hook reference (#4868)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin committed Jan 30, 2020
1 parent 0bcfe73 commit d39462a
Show file tree
Hide file tree
Showing 34 changed files with 2,264 additions and 93 deletions.
2 changes: 1 addition & 1 deletion engine/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (api *API) postMaintenanceHandler() service.Handler {
return err
}
url := fmt.Sprintf("/admin/maintenance?enable=%v", enable)
_, code, errHooks := services.DoJSONRequest(ctx, api.mustDB(), srvs, http.MethodPost, url, nil, nil)
_, code, errHooks := services.NewClient(api.mustDB(), srvs).DoJSONRequest(ctx, http.MethodPost, url, nil, nil)
if errHooks != nil || code >= 400 {
return fmt.Errorf("unable to change hook maintenant state to %v. Code result %d: %v", enable, code, errHooks)
}
Expand Down
10 changes: 10 additions & 0 deletions engine/api/ascode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ func Test_postPerformImportAsCodeHandler(t *testing.T) {
return nil, sdk.WithStack(err)
}

for k, h := range hooks {
if h.HookModelName == sdk.RepositoryWebHookModelName {
cfg := hooks[k].Config
cfg["webHookURL"] = sdk.WorkflowNodeHookConfigValue{
Value: "http://lolcat.host",
Configurable: false,
}
}
}

body := new(bytes.Buffer)
w := new(http.Response)
enc := json.NewEncoder(body)
Expand Down
4 changes: 2 additions & 2 deletions engine/api/event/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func PushInElasticSearch(ctx context.Context, db gorp.SqlExecutor, store cache.S
continue
}
e.Payload = nil
_, code, errD := services.DoJSONRequest(context.Background(), db, esServices, "POST", "/events", e, nil)
_, code, errD := services.NewClient(db, esServices).DoJSONRequest(context.Background(), "POST", "/events", e, nil)
if code >= 400 || errD != nil {
log.Error(ctx, "PushInElasticSearch> Unable to send event %s to elasticsearch [%d]: %v", e.EventType, code, errD)
continue
Expand All @@ -58,7 +58,7 @@ func GetEvents(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filt
}

var esEvents []elastic.SearchHit
if _, _, err := services.DoJSONRequest(context.Background(), db, srvs, "GET", "/events", filters, &esEvents); err != nil {
if _, _, err := services.NewClient(db, srvs).DoJSONRequest(context.Background(), "GET", "/events", filters, &esEvents); err != nil {
return nil, sdk.WrapError(err, "Unable to get events")
}

Expand Down
4 changes: 2 additions & 2 deletions engine/api/metrics/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Init(ctx context.Context, DBFunc func() *gorp.DbMap) {
continue
}

_, code, errD := services.DoJSONRequest(context.Background(), DBFunc(), esServices, "POST", "/metrics", e, nil)
_, code, errD := services.NewClient(DBFunc(), esServices).DoJSONRequest(context.Background(), "POST", "/metrics", e, nil)
if code >= 400 || errD != nil {
log.Error(ctx, "metrics.pushInElasticSearch> Unable to send metrics to elasticsearch [%d]: %v", code, errD)
continue
Expand All @@ -63,7 +63,7 @@ func GetMetrics(ctx context.Context, db gorp.SqlExecutor, key string, appID int6
}

var esMetrics []elastic.SearchHit
if _, _, err := services.DoJSONRequest(context.Background(), db, srvs, "GET", "/metrics", metricsRequest, &esMetrics); err != nil {
if _, _, err := services.NewClient(db, srvs).DoJSONRequest(context.Background(), "GET", "/metrics", metricsRequest, &esMetrics); err != nil {
return nil, sdk.WrapError(err, "Unable to get metrics")
}

Expand Down
9 changes: 9 additions & 0 deletions engine/api/repositories_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ func TestAPI_detachRepositoriesManagerHandler(t *testing.T) {
if err := json.Unmarshal(bts, &hooks); err != nil {
return writeError(w, err)
}
for k, h := range hooks {
if h.HookModelName == sdk.RepositoryWebHookModelName {
cfg := hooks[k].Config
cfg["webHookURL"] = sdk.WorkflowNodeHookConfigValue{
Value: "http://lolcat.host",
Configurable: false,
}
}
}
if err := enc.Encode(hooks); err != nil {
return writeError(w, err)
}
Expand Down
10 changes: 5 additions & 5 deletions engine/api/repositoriesmanager/repositories_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func LoadByName(ctx context.Context, db gorp.SqlExecutor, vcsName string) (sdk.V
if err != nil {
return vcsServer, sdk.WrapError(err, "Unable to load services")
}
if _, _, err := services.DoJSONRequest(ctx, db, srvs, "GET", fmt.Sprintf("/vcs/%s", vcsName), nil, &vcsServer); err != nil {
if _, _, err := services.NewClient(db, srvs).DoJSONRequest(ctx, "GET", fmt.Sprintf("/vcs/%s", vcsName), nil, &vcsServer); err != nil {
return vcsServer, sdk.WithStack(err)
}
return vcsServer, nil
Expand All @@ -42,7 +42,7 @@ func LoadAll(ctx context.Context, db *gorp.DbMap, store cache.Store) (map[string
}

vcsServers := make(map[string]sdk.VCSConfiguration)
if _, _, err := services.DoJSONRequest(ctx, db, srvs, "GET", "/vcs", nil, &vcsServers); err != nil {
if _, _, err := services.NewClient(db, srvs).DoJSONRequest(ctx, "GET", "/vcs", nil, &vcsServers); err != nil {
return nil, sdk.WithStack(err)
}
return vcsServers, nil
Expand Down Expand Up @@ -101,7 +101,7 @@ func (c *vcsConsumer) AuthorizeRedirect(ctx context.Context) (string, string, er
res := map[string]string{}
path := fmt.Sprintf("/vcs/%s/authorize", c.name)
log.Info(ctx, "Performing request on %s", path)
if _, _, err := services.DoJSONRequest(ctx, db, srv, "GET", path, nil, &res); err != nil {
if _, _, err := services.NewClient(db, srv).DoJSONRequest(ctx, "GET", path, nil, &res); err != nil {
return "", "", sdk.WithStack(err)
}

Expand All @@ -122,7 +122,7 @@ func (c *vcsConsumer) AuthorizeToken(ctx context.Context, token string, secret s

res := map[string]string{}
path := fmt.Sprintf("/vcs/%s/authorize", c.name)
if _, _, err := services.DoJSONRequest(ctx, db, srv, "POST", path, body, &res); err != nil {
if _, _, err := services.NewClient(db, srv).DoJSONRequest(ctx, "POST", path, body, &res); err != nil {
return "", "", sdk.WithStack(err)
}

Expand Down Expand Up @@ -184,7 +184,7 @@ func AuthorizedClient(ctx context.Context, db gorp.SqlExecutor, store cache.Stor
}

func (c *vcsClient) doJSONRequest(ctx context.Context, method, path string, in interface{}, out interface{}) (int, error) {
headers, code, err := services.DoJSONRequest(ctx, c.db, c.srvs, method, path, in, out, func(req *http.Request) {
headers, code, err := services.NewClient(c.db, c.srvs).DoJSONRequest(ctx, method, path, in, out, func(req *http.Request) {
req.Header.Set(sdk.HeaderXAccessToken, base64.StdEncoding.EncodeToString([]byte(c.token)))
req.Header.Set(sdk.HeaderXAccessTokenSecret, base64.StdEncoding.EncodeToString([]byte(c.secret)))
if c.created != 0 {
Expand Down
43 changes: 36 additions & 7 deletions engine/api/services/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,33 @@ var HTTPClient cdsclient.HTTPClient
// HTTPSigner is used to sign requests based on the RFC draft specification https://tools.ietf.org/html/draft-cavage-http-signatures-06
var HTTPSigner *httpsig.Signer

// DoMultiPartRequest performs an http request on a service with multipart tar file + json field
func DoMultiPartRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, method, path string, multiPartData *MultiPartData, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (int, error) {
type Client interface {
// DoJSONRequest performs an http request on a service
DoJSONRequest(ctx context.Context, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error)
// DoMultiPartRequest performs an http request on a service with multipart tar file + json field
DoMultiPartRequest(ctx context.Context, method, path string, multiPartData *MultiPartData, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (int, error)
}

type defaultServiceClient struct {
db gorp.SqlExecutor
srvs []sdk.Service
}

var NewClient func(gorp.SqlExecutor, []sdk.Service) Client = NewDefaultClient

func NewDefaultClient(db gorp.SqlExecutor, srvs []sdk.Service) Client {
return &defaultServiceClient{
db: db,
srvs: srvs,
}
}

func (s *defaultServiceClient) DoMultiPartRequest(ctx context.Context, method, path string, multiPartData *MultiPartData, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (int, error) {
return doMultiPartRequest(ctx, s.db, s.srvs, method, path, multiPartData, in, out, mods...)
}

// doMultiPartRequest performs an http request on a service with multipart tar file + json field
func doMultiPartRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, method, path string, multiPartData *MultiPartData, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (int, error) {
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)

Expand Down Expand Up @@ -95,16 +120,20 @@ func DoMultiPartRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Ser
return lastCode, lastErr
}

// DoJSONRequest performs an http request on a service
func DoJSONRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
func (s *defaultServiceClient) DoJSONRequest(ctx context.Context, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
return doJSONRequest(ctx, s.db, s.srvs, method, path, in, out, mods...)
}

// doJSONRequest performs an http request on a service
func doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
var lastErr error
var lastCode int
var attempt int
for {
attempt++
for i := range srvs {
srv := &srvs[i]
headers, code, err := doJSONRequest(ctx, db, srv, method, path, in, out, mods...)
headers, code, err := _doJSONRequest(ctx, db, srv, method, path, in, out, mods...)
if err == nil {
return headers, code, nil
}
Expand All @@ -118,8 +147,8 @@ func DoJSONRequest(ctx context.Context, db gorp.SqlExecutor, srvs []sdk.Service,
return nil, lastCode, lastErr
}

// DoJSONRequest performs an http request on service
func doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srv *sdk.Service, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
// _doJSONRequest is a low level function that performs an http request on service
func _doJSONRequest(ctx context.Context, db gorp.SqlExecutor, srv *sdk.Service, method, path string, in interface{}, out interface{}, mods ...cdsclient.RequestModifier) (http.Header, int, error) {
var b = []byte{}
var err error

Expand Down
78 changes: 78 additions & 0 deletions engine/api/services/mock_services/services_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/api/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (api *API) getWorkflowHookHandler() service.Handler {

path := fmt.Sprintf("/task/%s/execution", uuid)
task := sdk.Task{}
if _, _, err := services.DoJSONRequest(ctx, api.mustDB(), srvs, "GET", path, nil, &task); err != nil {
if _, _, err := services.NewClient(api.mustDB(), srvs).DoJSONRequest(ctx, "GET", path, nil, &task); err != nil {
return sdk.WrapError(err, "Unable to get hook %s task and executions", uuid)
}

Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ func Update(ctx context.Context, db gorp.SqlExecutor, store cache.Store, w *sdk.
if w.WorkflowData.Node.Context != nil && w.WorkflowData.Node.Context.ApplicationID != 0 {
var err error
if w.WorkflowData.Node.Context.DefaultPayload, err = DefaultPayload(ctx, db, store, p, w); err != nil {
log.Warning(ctx, "putWorkflowHandler> Cannot set default payload : %v", err)
log.Warning(ctx, "workflow.Update> Cannot set default payload : %v", err)
}
}

Expand Down Expand Up @@ -1014,7 +1014,7 @@ func MarkAsDelete(db gorp.SqlExecutor, key, name string) error {
// Delete workflow
func Delete(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p *sdk.Project, w *sdk.Workflow) error {
// Delete all hooks
if err := hookUnregistration(ctx, db, store, p, w.WorkflowData.GetHooks()); err != nil {
if err := hookUnregistration(ctx, db, store, p, w.WorkflowData.GetHooksMapRef()); err != nil {
return sdk.WrapError(err, "Unable to delete hooks from workflow")
}

Expand Down
9 changes: 9 additions & 0 deletions engine/api/workflow/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,15 @@ func TestInsertSimpleWorkflowWithHookAndExport(t *testing.T) {
Value: "https://www.github.com",
Configurable: true,
}
for k, h := range hooks {
if h.HookModelName == sdk.RepositoryWebHookModelName {
cfg := hooks[k].Config
cfg["webHookURL"] = sdk.WorkflowNodeHookConfigValue{
Value: "http://lolcat.host",
Configurable: false,
}
}
}
if err := enc.Encode(hooks); err != nil {
return writeError(w, err)
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func stopWorkflowNodeOutGoingHook(ctx context.Context, dbFunc func() *gorp.DbMap

if nodeRun.HookExecutionID != "" {
path := fmt.Sprintf("/task/%s/execution/%d/stop", nodeRun.HookExecutionID, nodeRun.HookExecutionTimeStamp)
if _, _, err := services.DoJSONRequest(ctx, db, srvs, "POST", path, nil, nil); err != nil {
if _, _, err := services.NewClient(db, srvs).DoJSONRequest(ctx, "POST", path, nil, nil); err != nil {
return fmt.Errorf("unable to stop task execution: %v", err)
}
}
Expand Down
Loading

0 comments on commit d39462a

Please sign in to comment.