From a576f03d80a34b6e024fbe8716ec9face3d36b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Fri, 17 Nov 2023 15:02:17 +0100 Subject: [PATCH 1/2] fix(api): try to fix lock between workers and jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Samin --- engine/api/api.go | 5 ++- engine/api/api_test.go | 14 +++---- engine/api/workflow/dao_node_run_job.go | 34 +++++++++++++--- engine/api/workflow/dao_run.go | 2 +- engine/api/workflow/execute_node_run.go | 5 +-- engine/api/workflow_run_craft.go | 54 ++++++++++++++++++++++++- 6 files changed, 94 insertions(+), 20 deletions(-) diff --git a/engine/api/api.go b/engine/api/api.go index 2c245aa51d..161569627f 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "math/rand" "net/http" "net/url" "os" @@ -849,7 +850,9 @@ func (a *API) Serve(ctx context.Context) error { a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunCraft", func(ctx context.Context) { a.WorkflowRunCraft(ctx, 100*time.Millisecond) }) - + a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunJobDeletion", func(ctx context.Context) { + a.WorkflowRunJobDeletion(ctx, time.Duration(10*rand.Float64())*time.Second, 10) + }) a.GoRoutines.RunWithRestart(ctx, "api.V2WorkflowRunCraft", func(ctx context.Context) { a.V2WorkflowRunCraft(ctx, 10*time.Second) }) diff --git a/engine/api/api_test.go b/engine/api/api_test.go index 48fd92f87b..963091654f 100644 --- a/engine/api/api_test.go +++ b/engine/api/api_test.go @@ -2,27 +2,27 @@ package api import ( "context" - "github.com/ovh/cds/engine/api/link" - "github.com/ovh/cds/engine/api/organization" "net/http/httptest" "net/url" "testing" "time" - "github.com/gorilla/mux" - "github.com/rockbears/log" - "github.com/stretchr/testify/require" - "github.com/ovh/cds/engine/api/authentication/builtin" "github.com/ovh/cds/engine/api/authentication/local" authdrivertest "github.com/ovh/cds/engine/api/authentication/test" "github.com/ovh/cds/engine/api/bootstrap" + "github.com/ovh/cds/engine/api/link" + "github.com/ovh/cds/engine/api/organization" apiTest "github.com/ovh/cds/engine/api/test" "github.com/ovh/cds/engine/api/workflow" "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/service" "github.com/ovh/cds/engine/test" "github.com/ovh/cds/sdk" + + "github.com/gorilla/mux" + "github.com/rockbears/log" + "github.com/stretchr/testify/require" ) func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.FakeTransaction, *Router) { @@ -63,7 +63,7 @@ func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.Fak // Clean all the pending crafting workflow runs lockKey := cache.Key("api:workflowRunCraft") require.NoError(t, store.DeleteAll(lockKey)) - ids, _ := workflow.LoadCratingWorkflowRunIDs(api.mustDB()) + ids, _ := workflow.LoadCraftingWorkflowRunIDs(api.mustDB()) for _, id := range ids { require.NoError(t, workflow.UpdateCraftedWorkflowRun(api.mustDB(), id)) } diff --git a/engine/api/workflow/dao_node_run_job.go b/engine/api/workflow/dao_node_run_job.go index e8ec2bb080..0b2ff2925c 100644 --- a/engine/api/workflow/dao_node_run_job.go +++ b/engine/api/workflow/dao_node_run_job.go @@ -229,7 +229,7 @@ func LoadNodeJobRunIDByNodeRunID(db gorp.SqlExecutor, runNodeID int64) ([]int64, return ids, nil } -//LoadNodeJobRun load a NodeJobRun given its ID +// LoadNodeJobRun load a NodeJobRun given its ID func LoadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) { j := JobRun{} query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1` @@ -252,7 +252,7 @@ func LoadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, return &jr, nil } -//LoadDeadNodeJobRun load a NodeJobRun which is Building but without worker +// LoadDeadNodeJobRun load a NodeJobRun which is Building but without worker func LoadDeadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store) ([]sdk.WorkflowNodeJobRun, error) { var deadJobsDB []JobRun query := `SELECT workflow_node_run_job.* FROM workflow_node_run_job WHERE worker_id IS NULL` @@ -279,7 +279,29 @@ func LoadDeadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.St return deadJobs, nil } -//LoadAndLockNodeJobRunWait load for update a NodeJobRun given its ID +func LoadAndLockTerminatedNodeJobRun(ctx context.Context, db gorp.SqlExecutor, limit int) ([]sdk.WorkflowNodeJobRun, error) { + var terminatedJobsDB []JobRun + query := `SELECT workflow_node_run_job.* FROM workflow_node_run_job WHERE status IN ($1, $2, $3) ORDER BY id ASC LIMIT $4 FOR UPDATE SKIP LOCKED` + if _, err := db.Select(&terminatedJobsDB, query, sdk.StatusStopped, sdk.StatusSuccess, sdk.StatusFail, limit); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + + deadJobs := make([]sdk.WorkflowNodeJobRun, len(terminatedJobsDB)) + for i, deadJob := range terminatedJobsDB { + jr, err := deadJob.WorkflowNodeRunJob() + if err != nil { + return nil, err + } + deadJobs[i] = jr + } + + return deadJobs, nil +} + +// LoadAndLockNodeJobRunWait load for update a NodeJobRun given its ID func LoadAndLockNodeJobRunWait(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) { j := JobRun{} query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1 for update` @@ -294,7 +316,7 @@ func LoadAndLockNodeJobRunWait(ctx context.Context, db gorp.SqlExecutor, store c return &jr, nil } -//LoadAndLockNodeJobRunSkipLocked load for update a NodeJobRun given its ID +// LoadAndLockNodeJobRunSkipLocked load for update a NodeJobRun given its ID func LoadAndLockNodeJobRunSkipLocked(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) { var end func() _, end = telemetry.Span(ctx, "workflow.LoadAndLockNodeJobRunSkipLocked") @@ -329,7 +351,7 @@ func insertWorkflowNodeJobRun(db gorp.SqlExecutor, j *sdk.WorkflowNodeJobRun) er return nil } -//DeleteNodeJobRuns deletes all workflow_node_run_job for a given workflow_node_run +// DeleteNodeJobRuns deletes all workflow_node_run_job for a given workflow_node_run func DeleteNodeJobRuns(db gorp.SqlExecutor, nodeID int64) error { query := `delete from workflow_node_run_job where workflow_node_run_id = $1` _, err := db.Exec(query, nodeID) @@ -343,7 +365,7 @@ func DeleteNodeJobRun(db gorp.SqlExecutor, nodeRunJob int64) error { return err } -//UpdateNodeJobRun updates a workflow_node_run_job +// UpdateNodeJobRun updates a workflow_node_run_job func UpdateNodeJobRun(ctx context.Context, db gorp.SqlExecutor, j *sdk.WorkflowNodeJobRun) error { var end func() _, end = telemetry.Span(ctx, "workflow.UpdateNodeJobRun") diff --git a/engine/api/workflow/dao_run.go b/engine/api/workflow/dao_run.go index c2b12f88fd..c16bcf696c 100644 --- a/engine/api/workflow/dao_run.go +++ b/engine/api/workflow/dao_run.go @@ -596,7 +596,7 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error { return nil } -func LoadCratingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) { +func LoadCraftingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) { query := ` SELECT id FROM workflow_run diff --git a/engine/api/workflow/execute_node_run.go b/engine/api/workflow/execute_node_run.go index e83cacf5de..479366b29a 100644 --- a/engine/api/workflow/execute_node_run.go +++ b/engine/api/workflow/execute_node_run.go @@ -327,10 +327,7 @@ func executeNodeRun(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store } // End of temporary debug - // Delete the line in workflow_node_run_job - if err := DeleteNodeJobRuns(db, workflowNodeRun.ID); err != nil { - return nil, sdk.WrapError(err, "unable to delete node %d job runs", workflowNodeRun.ID) - } + // Delete the line in workflow_node_run_job is done asynchronously in a goroutine at api level // If current node has a mutex, we want to trigger another node run that can be waiting for the mutex node := updatedWorkflowRun.Workflow.WorkflowData.NodeByID(workflowNodeRun.WorkflowNodeID) diff --git a/engine/api/workflow_run_craft.go b/engine/api/workflow_run_craft.go index c861731a2b..7f60157e9a 100644 --- a/engine/api/workflow_run_craft.go +++ b/engine/api/workflow_run_craft.go @@ -31,7 +31,7 @@ func (api *API) WorkflowRunCraft(ctx context.Context, tick time.Duration) { } return case <-ticker.C: - ids, err := workflow.LoadCratingWorkflowRunIDs(api.mustDB()) + ids, err := workflow.LoadCraftingWorkflowRunIDs(api.mustDB()) if err != nil { log.Error(ctx, "WorkflowRunCraft> unable to start tx: %v", err) continue @@ -166,3 +166,55 @@ func (api *API) workflowRunCraft(ctx context.Context, id int64) error { return workflow.UpdateCraftedWorkflowRun(api.mustDB(), run.ID) } + +func (api *API) WorkflowRunJobDeletion(ctx context.Context, tick time.Duration, limit int) { + ticker := time.NewTicker(tick) + defer ticker.Stop() + +mainLoop: + for { + select { + case <-ctx.Done(): + if ctx.Err() != nil { + log.Error(ctx, "%v", ctx.Err()) + } + return + case <-ticker.C: + tx, err := api.mustDB().Begin() + if err != nil { + log.ErrorWithStackTrace(ctx, err) + continue + } + jobs, err := workflow.LoadAndLockTerminatedNodeJobRun(ctx, tx, limit) + if err != nil { + log.Error(ctx, "WorkflowRunJobDeletion> unable to start tx: %v", err) + tx.Rollback() + continue + } + for i := range jobs { + j := &jobs[i] + node, err := workflow.LoadNodeRunByID(ctx, tx, j.WorkflowNodeRunID, workflow.LoadRunOptions{}) + if err != nil { + log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "unable to load NodeRun %d", j.WorkflowNodeRunID)) + tx.Rollback() + continue mainLoop + } + + if !sdk.StatusIsTerminated(node.Status) { + continue + } + + if err := workflow.DeleteNodeJobRun(tx, j.ID); err != nil { + log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "unable to delete WorkflowNodeJobRun %d", j.ID)) + tx.Rollback() + continue mainLoop + } + } + if err := tx.Commit(); err != nil { + log.Error(ctx, "WorkflowRunJobDeletion> unable to commit tx: %v", err) + tx.Rollback() + continue + } + } + } +} From 03038a4ae4970c5428cae65912d2bf22d0acd2fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Fri, 17 Nov 2023 15:34:46 +0100 Subject: [PATCH 2/2] fix: cr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Samin --- engine/api/workflow_run_craft.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/api/workflow_run_craft.go b/engine/api/workflow_run_craft.go index 7f60157e9a..32550d1d69 100644 --- a/engine/api/workflow_run_craft.go +++ b/engine/api/workflow_run_craft.go @@ -188,7 +188,7 @@ mainLoop: jobs, err := workflow.LoadAndLockTerminatedNodeJobRun(ctx, tx, limit) if err != nil { log.Error(ctx, "WorkflowRunJobDeletion> unable to start tx: %v", err) - tx.Rollback() + _ = tx.Rollback() continue } for i := range jobs { @@ -196,7 +196,7 @@ mainLoop: node, err := workflow.LoadNodeRunByID(ctx, tx, j.WorkflowNodeRunID, workflow.LoadRunOptions{}) if err != nil { log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "unable to load NodeRun %d", j.WorkflowNodeRunID)) - tx.Rollback() + _ = tx.Rollback() continue mainLoop } @@ -206,13 +206,13 @@ mainLoop: if err := workflow.DeleteNodeJobRun(tx, j.ID); err != nil { log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "unable to delete WorkflowNodeJobRun %d", j.ID)) - tx.Rollback() + _ = tx.Rollback() continue mainLoop } } if err := tx.Commit(); err != nil { log.Error(ctx, "WorkflowRunJobDeletion> unable to commit tx: %v", err) - tx.Rollback() + _ = tx.Rollback() continue } }