Skip to content

Commit

Permalink
fix(hooks): sync outgoing tasks (#5854)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt committed Jun 22, 2021
1 parent a0920e1 commit 266b47c
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 13 deletions.
8 changes: 8 additions & 0 deletions engine/api/api_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ func isCDN(ctx context.Context) bool {
return c.Service != nil && c.Service.Type == sdk.TypeCDN
}

func isHooks(ctx context.Context) bool {
c := getAPIConsumer(ctx)
if c == nil {
return false
}
return c.Service != nil && c.Service.Type == sdk.TypeHooks
}

func isMFA(ctx context.Context) bool {
s := getAuthSession(ctx)
if s == nil {
Expand Down
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func (api *API) InitRouter() {

r.Handle("/workflow/search", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getSearchWorkflowHandler))
r.Handle("/workflow/hook", Scope(sdk.AuthConsumerScopeHooks), r.GET(api.getWorkflowHooksHandler))
r.Handle("/workflow/hook/executions", Scope(sdk.AuthConsumerScopeHooks), r.GET(api.getWorkflowHookExecutionsHandler))
r.Handle("/workflow/hook/model/{model}", ScopeNone(), r.GET(api.getWorkflowHookModelHandler), r.POST(api.postWorkflowHookModelHandler, service.OverrideAuth(api.authAdminMiddleware)), r.PUT(api.putWorkflowHookModelHandler, service.OverrideAuth(api.authAdminMiddleware)))

// SSE
Expand Down
15 changes: 14 additions & 1 deletion engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const withLightNodeRunTestsField string = ", json_build_object('ko', workflow_no
func LoadNodeRunIDsWithLogs(db gorp.SqlExecutor, wIDs []int64, status []string) ([]sdk.WorkflowNodeRunIdentifiers, error) {
query := `
WITH noderun as (
SELECT distinct workflow_node_run_id as id, workflow_node_run.workflow_run_id, status
SELECT distinct workflow_node_run_id as id, workflow_node_run.workflow_run_id, status
FROM workflow_node_run_job_logs
JOIN workflow_node_run ON workflow_node_run.id = workflow_node_run_id
WHERE workflow_node_run.workflow_id = ANY($1)
Expand Down Expand Up @@ -866,3 +866,16 @@ func RunExist(db gorp.SqlExecutor, projectKey string, workflowID int64, hash str
count, err := db.SelectInt(query, projectKey, workflowID, hash)
return count != 0, err
}

func LoadNodeRunDistinctExecutionIDs(db gorp.SqlExecutor) ([]string, error) {
query := `
SELECT DISTINCT execution_id
FROM workflow_node_run
WHERE execution_id <> '' AND execution_id IS NOT NULL;
`
var executionIDs []string
if _, err := db.Select(&executionIDs, query); err != nil {
return nil, sdk.WithStack(err)
}
return executionIDs, nil
}
18 changes: 16 additions & 2 deletions engine/api/workflow_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (

func (api *API) getWorkflowHooksHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// This handler can only be called by a service managed by an admin
if isService := isService(ctx); !isService && !isAdmin(ctx) {
if !isHooks(ctx) {
return sdk.WithStack(sdk.ErrForbidden)
}

Expand All @@ -31,6 +30,21 @@ func (api *API) getWorkflowHooksHandler() service.Handler {
}
}

func (api *API) getWorkflowHookExecutionsHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if !isHooks(ctx) {
return sdk.WithStack(sdk.ErrForbidden)
}

executionIDs, err := workflow.LoadNodeRunDistinctExecutionIDs(api.mustDB())
if err != nil {
return err
}

return service.WriteJSON(w, executionIDs, http.StatusOK)
}
}

func (api *API) getWorkflowOutgoingHookModelsHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
m, err := workflow.LoadOutgoingHookModels(api.mustDB())
Expand Down
33 changes: 23 additions & 10 deletions engine/hooks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,41 @@ func (s *Service) synchronizeTasks(ctx context.Context) error {

log.Info(ctx, "Hooks> Synchronizing tasks from CDS API (%s)", s.Cfg.API.HTTP.URL)

//Get all hooks from CDS, and synchronize the tasks in cache
// Get all hooks from CDS, and synchronize the tasks in cache
hooks, err := s.Client.WorkflowAllHooksList()
if err != nil {
return sdk.WrapError(err, "Unable to get hooks")
return sdk.WrapError(err, "unable to get hooks")
}
mHookIDs := make(map[string]struct{}, len(hooks))
for i := range hooks {
mHookIDs[hooks[i].UUID] = struct{}{}
}

// Get all node run execution ids from CDS, and synchronize the outgoing tasks in cache
executionIDs, err := s.Client.WorkflowAllHooksExecutions()
if err != nil {
return sdk.WrapError(err, "unable to get hook execution ids")
}
mExecutionIDs := make(map[string]struct{}, len(executionIDs))
for i := range executionIDs {
mExecutionIDs[executionIDs[i]] = struct{}{}
}

allOldTasks, err := s.Dao.FindAllTasks(ctx)
if err != nil {
return sdk.WrapError(err, "Unable to get allOldTasks")
}

//Delete all old task which are not referenced in CDS API anymore
// Delete all old task which are not referenced in CDS API anymore
for i := range allOldTasks {
t := &allOldTasks[i]
var found bool
for _, h := range hooks {
if h.UUID == t.UUID {
found = true
log.Debug(ctx, "Hook> Synchronizing %s task %s", h.HookModelName, t.UUID)
break
}
if t.Type == TypeOutgoingWebHook || t.Type == TypeOutgoingWorkflow {
_, found = mExecutionIDs[t.UUID]
} else {
_, found = mHookIDs[t.UUID]
}
if !found && t.Type != TypeOutgoingWebHook && t.Type != TypeOutgoingWorkflow {
if !found {
if err := s.deleteTask(ctx, t); err != nil {
log.Error(ctx, "Hook> Error on task %s delete on synchronization: %v", t.UUID, err)
} else {
Expand All @@ -100,6 +112,7 @@ func (s *Service) synchronizeTasks(ctx context.Context) error {
}
}

// Create or update hook tasks from CDS API data
for _, h := range hooks {
confProj := h.Config[sdk.HookConfigProject]
confWorkflow := h.Config[sdk.HookConfigWorkflow]
Expand Down
55 changes: 55 additions & 0 deletions engine/hooks/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hooks
import (
"context"
"net/http"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -88,6 +89,7 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
// Mock the sync of tasks
// It will remove all the tasks from the database
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil)
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes()
require.NoError(t, s.synchronizeTasks(ctx))

Expand Down Expand Up @@ -164,6 +166,7 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
// Now we will triggered another hooks sync
// The mock must return one hook
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{*h}, nil)
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
require.NoError(t, s.synchronizeTasks(context.Background()))

// We must be able to find the task
Expand All @@ -177,3 +180,55 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
assert.Equal(t, "DONE", execs[0].Status)
assert.Equal(t, "SCHEDULED", execs[1].Status)
}

func Test_synchronizeTasks(t *testing.T) {
log.Factory = log.NewTestingWrapper(t)
s, cancel := setupTestHookService(t)
defer cancel()

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()

// Get the mock
m := s.Client.(*mock_cdsclient.MockInterface)

m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes()

m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil)
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
require.NoError(t, s.synchronizeTasks(ctx))

tasks, err := s.Dao.FindAllTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 0)

require.NoError(t, s.Dao.SaveTask(&sdk.Task{
UUID: "1",
Type: TypeScheduler,
}))
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
UUID: sdk.UUID(),
Type: TypeScheduler,
}))
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
UUID: "2",
Type: TypeOutgoingWorkflow,
}))
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
UUID: sdk.UUID(),
Type: TypeOutgoingWorkflow,
}))

m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{{UUID: "1"}}, nil)
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{"2"}, nil)
require.NoError(t, s.synchronizeTasks(ctx))

tasks, err = s.Dao.FindAllTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 2)
sort.Slice(tasks, func(i, j int) bool { return tasks[i].UUID < tasks[j].UUID })
require.Equal(t, "1", tasks[0].UUID)
require.Equal(t, TypeScheduler, tasks[0].Type)
require.Equal(t, "2", tasks[1].UUID)
require.Equal(t, TypeOutgoingWorkflow, tasks[1].Type)
}
9 changes: 9 additions & 0 deletions sdk/cdsclient/client_workflow_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ func (c *client) WorkflowAllHooksList() ([]sdk.NodeHook, error) {
}
return w, nil
}

func (c *client) WorkflowAllHooksExecutions() ([]string, error) {
url := fmt.Sprintf("/workflow/hook/executions")
var res []string
if _, err := c.GetJSON(context.Background(), url, &res); err != nil {
return nil, err
}
return res, nil
}
1 change: 1 addition & 0 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ type WorkflowClient interface {
WorkflowLogDownload(ctx context.Context, link sdk.CDNLogLink) ([]byte, error)
WorkflowNodeRunRelease(projectKey string, workflowName string, runNumber int64, nodeRunID int64, release sdk.WorkflowNodeRunRelease) error
WorkflowAllHooksList() ([]sdk.NodeHook, error)
WorkflowAllHooksExecutions() ([]string, error)
WorkflowCachePush(projectKey, integrationName, ref string, tarContent io.Reader, size int) error
WorkflowCachePull(projectKey, integrationName, ref string) (io.Reader, error)
WorkflowTransformAsCode(projectKey, workflowName, branch, message string) (*sdk.Operation, error)
Expand Down
30 changes: 30 additions & 0 deletions sdk/cdsclient/mock_cdsclient/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 266b47c

Please sign in to comment.