From cfa8f04b8a650bc4902291f0f7ba318a483e3724 Mon Sep 17 00:00:00 2001 From: richardlt Date: Thu, 1 Sep 2022 12:31:11 +0200 Subject: [PATCH 1/3] feat(api): clean workflow run secrets Signed-off-by: richardlt --- engine/api/api.go | 8 ++- engine/api/workflow/dao_run.go | 35 ++++++++-- engine/api/workflow/dao_run_secret.go | 17 +++++ engine/api/workflow_run_secrets.go | 52 +++++++++++++++ engine/api/workflow_run_secrets_test.go | 65 +++++++++++++++++++ .../api/251_clean_workflow_run_secrets.sql | 5 ++ 6 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 engine/api/workflow_run_secrets.go create mode 100644 engine/api/workflow_run_secrets_test.go create mode 100644 engine/sql/api/251_clean_workflow_run_secrets.sql diff --git a/engine/api/api.go b/engine/api/api.go index a022654205..399a291835 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -66,7 +66,8 @@ type Configuration struct { } `toml:"url" comment:"#####################\n CDS URLs Settings \n####################" json:"url"` HTTP service.HTTPRouterConfiguration `toml:"http" json:"http"` Secrets struct { - SkipProjectSecretsOnRegion []string `toml:"skipProjectSecretsOnRegion" json:"-"` + SkipProjectSecretsOnRegion []string `toml:"skipProjectSecretsOnRegion" json:"skipProjectSecretsOnRegion" comment:"For given region, CDS will not automatically inject project's secrets when running a job."` + SnapshotRetentionDelay int64 `toml:"snapshotRetentionDelay" json:"snapshotRetentionDelay" comment:"Retention delay for workflow run secrets snapshot (in days), set to 0 will keep secrets until workflow run deletion. Removing secrets will activate the read only mode on a workflow run."` } `toml:"secrets" json:"secrets"` Database database.DBConfiguration `toml:"database" comment:"################################\n Postgresql Database settings \n###############################" json:"database"` Cache struct { @@ -748,6 +749,11 @@ func (a *API) Serve(ctx context.Context) error { a.GoRoutines.RunWithRestart(ctx, "workflow.ResyncWorkflowRunResultsRoutine", func(ctx context.Context) { workflow.ResyncWorkflowRunResultsRoutine(ctx, a.mustDB) }) + if a.Config.Secrets.SnapshotRetentionDelay > 0 { + a.GoRoutines.RunWithRestart(ctx, "workflow.CleanSecretsSnapshot", func(ctx context.Context) { + a.cleanWorkflowRunSecrets(ctx, time.Minute*10) + }) + } log.Info(ctx, "Bootstrapping database...") defaultValues := sdk.DefaultValues{ diff --git a/engine/api/workflow/dao_run.go b/engine/api/workflow/dao_run.go index a08a300695..f83031813c 100644 --- a/engine/api/workflow/dao_run.go +++ b/engine/api/workflow/dao_run.go @@ -353,27 +353,27 @@ func LoadRunsSummaries(ctx context.Context, db gorp.SqlExecutor, projectkey, wor SELECT workflow.id FROM workflow JOIN project ON project.id = workflow.project_id WHERE workflow.name = $2 AND project.projectkey = $1 - ), + ), runs as ( - SELECT %s + SELECT %s FROM workflow_run wr JOIN workflowID ON workflowID.id = wr.workflow_id WHERE wr.to_delete = false ), tags as ( - SELECT workflow_run_id, tag || '=' || value "all_tags" + SELECT workflow_run_id, tag || '=' || value "all_tags" FROM workflow_run_tag JOIN runs ON runs.id = workflow_run_id ), aggTags as ( - SELECT workflow_run_id, string_agg(all_tags, ',') as tags + SELECT workflow_run_id, string_agg(all_tags, ',') as tags FROM tags GROUP BY workflow_run_id ) SELECT runs.* FROM runs JOIN aggTags ON aggTags.workflow_run_id = runs.id - WHERE string_to_array($5, ',') <@ string_to_array(aggTags.tags, ',') + WHERE string_to_array($5, ',') <@ string_to_array(aggTags.tags, ',') ORDER BY runs.start DESC OFFSET $4 LIMIT $3`, selectedColumn) var tags []string for k, v := range tagFilter { @@ -1095,3 +1095,28 @@ func stopRunsBlocked(ctx context.Context, db *gorp.DbMap) error { } return nil } + +// LoadRunsIDsCreatedBefore returns the first 100 workflow runs created before given date. +func LoadRunsIDsCreatedBefore(ctx context.Context, db gorp.SqlExecutor, date time.Time) ([]int64, error) { + var ids []int64 + query := ` + SELECT id + FROM workflow_run + WHERE read_only = false AND start < $1 + ORDER BY start ASC + LIMIT 100 + ` + if _, err := db.Select(&ids, query, date); err != nil { + return nil, sdk.WithStack(err) + } + return ids, nil +} + +// SetRunReadOnly set read only flag of a workflow run, this run cannot be restarted anymore. +func SetRunReadOnlyByID(ctx context.Context, db gorpmapper.SqlExecutorWithTx, workflowRunID int64) error { + query := `UPDATE workflow_run SET read_only = true WHERE id = $1` + if _, err := db.Exec(query, workflowRunID); err != nil { + return sdk.WrapError(err, "unable to set read only for workflow run with id %d", workflowRunID) + } + return nil +} diff --git a/engine/api/workflow/dao_run_secret.go b/engine/api/workflow/dao_run_secret.go index ec9d5c8655..b8414424a3 100644 --- a/engine/api/workflow/dao_run_secret.go +++ b/engine/api/workflow/dao_run_secret.go @@ -49,3 +49,20 @@ func loadRunSecretWithDecryption(ctx context.Context, db gorp.SqlExecutor, runID } return secrets, nil } + +func CountRunSecretsByWorkflowRunID(ctx context.Context, db gorp.SqlExecutor, workflowRunID int64) (int64, error) { + query := `SELECT COUNT(1) FROM workflow_run_secret WHERE workflow_run_id = $1` + count, err := db.SelectInt(query, workflowRunID) + if err != nil { + return 0, sdk.WrapError(err, "unable to count workflow run secret for workflow run id %d", workflowRunID) + } + return count, nil +} + +func DeleteRunSecretsByWorkflowRunID(ctx context.Context, db gorpmapper.SqlExecutorWithTx, workflowRunID int64) error { + query := `DELETE FROM workflow_run_secret WHERE workflow_run_id = $1` + if _, err := db.Exec(query, workflowRunID); err != nil { + return sdk.WrapError(err, "unable to delete workflow run secret for workflow run id %d", workflowRunID) + } + return nil +} diff --git a/engine/api/workflow_run_secrets.go b/engine/api/workflow_run_secrets.go new file mode 100644 index 0000000000..d52dd4a068 --- /dev/null +++ b/engine/api/workflow_run_secrets.go @@ -0,0 +1,52 @@ +package api + +import ( + "context" + "time" + + "github.com/go-gorp/gorp" + "github.com/rockbears/log" + + "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/sdk" +) + +func (api *API) cleanWorkflowRunSecrets(ctx context.Context, delay time.Duration) { + // Load workflow run older than now - snapshot retention delay + maxRetentionDate := time.Now().Add(-time.Hour * time.Duration(24*api.Config.Secrets.SnapshotRetentionDelay)) + + db := api.mustDB() + + ticker := time.NewTicker(delay) + + for range ticker.C { + runIDs, err := workflow.LoadRunsIDsCreatedBefore(ctx, db, maxRetentionDate) + if err != nil { + log.ErrorWithStackTrace(ctx, err) + continue + } + for _, id := range runIDs { + if err := api.cleanWorkflowRunSecretsForRun(ctx, db, id); err != nil { + log.ErrorWithStackTrace(ctx, err) + } + } + } +} + +func (api *API) cleanWorkflowRunSecretsForRun(ctx context.Context, db *gorp.DbMap, workflowRunID int64) error { + tx, err := db.Begin() + if err != nil { + return sdk.WithStack(err) + } + defer tx.Rollback() // nolint + if err := workflow.SetRunReadOnlyByID(ctx, tx, workflowRunID); err != nil { + return sdk.WithStack(err) + } + if err := workflow.DeleteRunSecretsByWorkflowRunID(ctx, tx, workflowRunID); err != nil { + return sdk.WithStack(err) + } + if err := tx.Commit(); err != nil { + return sdk.WithStack(err) + } + return nil +} diff --git a/engine/api/workflow_run_secrets_test.go b/engine/api/workflow_run_secrets_test.go new file mode 100644 index 0000000000..4c35cf6651 --- /dev/null +++ b/engine/api/workflow_run_secrets_test.go @@ -0,0 +1,65 @@ +package api + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ovh/cds/engine/api/authentication" + "github.com/ovh/cds/engine/api/project" + "github.com/ovh/cds/engine/api/test/assets" + "github.com/ovh/cds/engine/api/workflow" + "github.com/ovh/cds/sdk" +) + +func Test_cleanSecretsSnapshotForRun(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + api, db, _ := newTestAPI(t) + + u, _ := assets.InsertAdminUser(t, db) + consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser) + projectKey := sdk.RandomString(10) + p := assets.InsertTestProject(t, db, api.Cache, projectKey, projectKey) + + require.NoError(t, project.InsertVariable(db, p.ID, &sdk.ProjectVariable{ + Type: sdk.SecretVariable, + Name: "my-secret", + Value: "my-value", + }, u)) + + w := assets.InsertTestWorkflow(t, db, api.Cache, p, sdk.RandomString(10)) + wr, err := workflow.CreateRun(db.DbMap, w, sdk.WorkflowRunPostHandlerOption{ + Hook: &sdk.WorkflowNodeRunHookEvent{}, + }) + require.NoError(t, err) + api.initWorkflowRun(ctx, p.Key, w, wr, sdk.WorkflowRunPostHandlerOption{ + Manual: &sdk.WorkflowNodeRunManual{}, + AuthConsumerID: consumer.ID, + }) + + runIDs, err := workflow.LoadRunsIDsCreatedBefore(ctx, db, time.Now()) + require.NoError(t, err) + require.Contains(t, runIDs, wr.ID) + + runIDs, err = workflow.LoadRunsIDsCreatedBefore(ctx, db, wr.Start) + require.NoError(t, err) + require.NotContains(t, runIDs, wr.ID) + + count, err := workflow.CountRunSecretsByWorkflowRunID(ctx, db, wr.ID) + require.NoError(t, err) + require.Equal(t, int64(1), count) + + require.NoError(t, api.cleanWorkflowRunSecretsForRun(ctx, db.DbMap, wr.ID)) + + result, err := workflow.LoadRunByID(ctx, db, wr.ID, workflow.LoadRunOptions{}) + require.NoError(t, err) + require.True(t, result.ReadOnly) + + count, err = workflow.CountRunSecretsByWorkflowRunID(ctx, db, wr.ID) + require.NoError(t, err) + require.Equal(t, int64(0), count) +} diff --git a/engine/sql/api/251_clean_workflow_run_secrets.sql b/engine/sql/api/251_clean_workflow_run_secrets.sql new file mode 100644 index 0000000000..0629e3bf67 --- /dev/null +++ b/engine/sql/api/251_clean_workflow_run_secrets.sql @@ -0,0 +1,5 @@ +-- +migrate Up +SELECT create_index('workflow_run', 'idx_workflow_run_start', 'start'); + +-- +migrate Down +DROP INDEX idx_workflow_run_start; From f9424d1bceab10c03a821d04d353ac54d5b2fafb Mon Sep 17 00:00:00 2001 From: richardlt Date: Thu, 1 Sep 2022 16:00:34 +0200 Subject: [PATCH 2/3] fix: cr Signed-off-by: richardlt --- engine/sql/api/251_clean_workflow_run_secrets.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/sql/api/251_clean_workflow_run_secrets.sql b/engine/sql/api/251_clean_workflow_run_secrets.sql index 0629e3bf67..3c23b7fdea 100644 --- a/engine/sql/api/251_clean_workflow_run_secrets.sql +++ b/engine/sql/api/251_clean_workflow_run_secrets.sql @@ -1,5 +1,5 @@ -- +migrate Up -SELECT create_index('workflow_run', 'idx_workflow_run_start', 'start'); +SELECT create_index('workflow_run', 'idx_workflow_run_start', 'read_only,start'); -- +migrate Down DROP INDEX idx_workflow_run_start; From 870ee24eaf7aba868edfb1863cf2adf04ca3fda7 Mon Sep 17 00:00:00 2001 From: richardlt Date: Wed, 7 Sep 2022 11:27:03 +0200 Subject: [PATCH 3/3] feat: move delay in config --- engine/api/api.go | 4 +++- engine/api/workflow/dao_run.go | 8 ++++---- engine/api/workflow_run_secrets.go | 16 ++++++++++++++-- engine/api/workflow_run_secrets_test.go | 4 ++-- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/engine/api/api.go b/engine/api/api.go index 399a291835..32cc3a03c0 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -68,6 +68,8 @@ type Configuration struct { Secrets struct { SkipProjectSecretsOnRegion []string `toml:"skipProjectSecretsOnRegion" json:"skipProjectSecretsOnRegion" comment:"For given region, CDS will not automatically inject project's secrets when running a job."` SnapshotRetentionDelay int64 `toml:"snapshotRetentionDelay" json:"snapshotRetentionDelay" comment:"Retention delay for workflow run secrets snapshot (in days), set to 0 will keep secrets until workflow run deletion. Removing secrets will activate the read only mode on a workflow run."` + SnapshotCleanInterval int64 `toml:"snapshotCleanInterval" json:"snapshotCleanInterval" comment:"Interval for secret snapshot clean (in minutes), default: 10"` + SnapshotCleanBatchSize int64 `toml:"snapshotCleanBatchSize" json:"snapshotCleanBatchSize" comment:"Batch size for secret snapshot clean, default: 100"` } `toml:"secrets" json:"secrets"` Database database.DBConfiguration `toml:"database" comment:"################################\n Postgresql Database settings \n###############################" json:"database"` Cache struct { @@ -751,7 +753,7 @@ func (a *API) Serve(ctx context.Context) error { }) if a.Config.Secrets.SnapshotRetentionDelay > 0 { a.GoRoutines.RunWithRestart(ctx, "workflow.CleanSecretsSnapshot", func(ctx context.Context) { - a.cleanWorkflowRunSecrets(ctx, time.Minute*10) + a.cleanWorkflowRunSecrets(ctx) }) } diff --git a/engine/api/workflow/dao_run.go b/engine/api/workflow/dao_run.go index f83031813c..c793da0be5 100644 --- a/engine/api/workflow/dao_run.go +++ b/engine/api/workflow/dao_run.go @@ -1096,17 +1096,17 @@ func stopRunsBlocked(ctx context.Context, db *gorp.DbMap) error { return nil } -// LoadRunsIDsCreatedBefore returns the first 100 workflow runs created before given date. -func LoadRunsIDsCreatedBefore(ctx context.Context, db gorp.SqlExecutor, date time.Time) ([]int64, error) { +// LoadRunsIDsCreatedBefore returns the first workflow runs created before given date. +func LoadRunsIDsCreatedBefore(ctx context.Context, db gorp.SqlExecutor, date time.Time, limit int64) ([]int64, error) { var ids []int64 query := ` SELECT id FROM workflow_run WHERE read_only = false AND start < $1 ORDER BY start ASC - LIMIT 100 + LIMIT $2 ` - if _, err := db.Select(&ids, query, date); err != nil { + if _, err := db.Select(&ids, query, date, limit); err != nil { return nil, sdk.WithStack(err) } return ids, nil diff --git a/engine/api/workflow_run_secrets.go b/engine/api/workflow_run_secrets.go index d52dd4a068..869fc28a63 100644 --- a/engine/api/workflow_run_secrets.go +++ b/engine/api/workflow_run_secrets.go @@ -11,16 +11,28 @@ import ( "github.com/ovh/cds/sdk" ) -func (api *API) cleanWorkflowRunSecrets(ctx context.Context, delay time.Duration) { +func (api *API) cleanWorkflowRunSecrets(ctx context.Context) { // Load workflow run older than now - snapshot retention delay maxRetentionDate := time.Now().Add(-time.Hour * time.Duration(24*api.Config.Secrets.SnapshotRetentionDelay)) db := api.mustDB() + delay := 10 * time.Minute + if api.Config.Secrets.SnapshotRetentionDelay > 0 { + delay = time.Duration(api.Config.Secrets.SnapshotRetentionDelay) * time.Minute + } + + limit := int64(100) + if api.Config.Secrets.SnapshotCleanBatchSize > 0 { + limit = api.Config.Secrets.SnapshotCleanBatchSize + } + + log.Info(ctx, "Starting workflow run secrets clean routine") + ticker := time.NewTicker(delay) for range ticker.C { - runIDs, err := workflow.LoadRunsIDsCreatedBefore(ctx, db, maxRetentionDate) + runIDs, err := workflow.LoadRunsIDsCreatedBefore(ctx, db, maxRetentionDate, limit) if err != nil { log.ErrorWithStackTrace(ctx, err) continue diff --git a/engine/api/workflow_run_secrets_test.go b/engine/api/workflow_run_secrets_test.go index 4c35cf6651..d7f23e3bba 100644 --- a/engine/api/workflow_run_secrets_test.go +++ b/engine/api/workflow_run_secrets_test.go @@ -41,11 +41,11 @@ func Test_cleanSecretsSnapshotForRun(t *testing.T) { AuthConsumerID: consumer.ID, }) - runIDs, err := workflow.LoadRunsIDsCreatedBefore(ctx, db, time.Now()) + runIDs, err := workflow.LoadRunsIDsCreatedBefore(ctx, db, time.Now(), 100) require.NoError(t, err) require.Contains(t, runIDs, wr.ID) - runIDs, err = workflow.LoadRunsIDsCreatedBefore(ctx, db, wr.Start) + runIDs, err = workflow.LoadRunsIDsCreatedBefore(ctx, db, wr.Start, 100) require.NoError(t, err) require.NotContains(t, runIDs, wr.ID)