From 3e7a9f30de8b3dd305a31f2d0ee1a424a56197cb Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Thu, 24 Nov 2022 21:52:05 +0100 Subject: [PATCH 1/5] refactor: move autoremove into the jobexecutor breaking: docker container are removed after job exit --- pkg/runner/job_executor.go | 18 ++++++++++++++++++ pkg/runner/runner.go | 27 ++------------------------- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/pkg/runner/job_executor.go b/pkg/runner/job_executor.go index e1de6cbc7ab..9949bf7ded2 100644 --- a/pkg/runner/job_executor.go +++ b/pkg/runner/job_executor.go @@ -128,6 +128,24 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo return postExecutor(ctx) }). Finally(info.interpolateOutputs()). + Finally(func(ctx context.Context) error { + logger := common.Logger(ctx) + if rc.Config.AutoRemove { + var cancel context.CancelFunc + if ctx.Err() == context.Canceled { + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + } + + logger.Infof("Cleaning up container for job %s", rc.JobName) + + if err := rc.stopJobContainer()(ctx); err != nil { + logger.Errorf("Error while cleaning container: %v", err) + } + } + + return nil + }). Finally(info.closeContainer())) } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 564814bc953..84f2da1103b 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "time" log "github.com/sirupsen/logrus" @@ -84,11 +83,10 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { stagePipeline := make([]common.Executor, 0) for i := range plan.Stages { - s := i stage := plan.Stages[i] stagePipeline = append(stagePipeline, func(ctx context.Context) error { pipeline := make([]common.Executor, 0) - for r, run := range stage.Runs { + for _, run := range stage.Runs { stageExecutor := make([]common.Executor, 0) job := run.Job() @@ -123,29 +121,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { maxJobNameLen = len(rc.String()) } stageExecutor = append(stageExecutor, func(ctx context.Context) error { - logger := common.Logger(ctx) jobName := fmt.Sprintf("%-*s", maxJobNameLen, rc.String()) - return rc.Executor().Finally(func(ctx context.Context) error { - isLastRunningContainer := func(currentStage int, currentRun int) bool { - return currentStage == len(plan.Stages)-1 && currentRun == len(stage.Runs)-1 - } - - if runner.config.AutoRemove && isLastRunningContainer(s, r) { - var cancel context.CancelFunc - if ctx.Err() == context.Canceled { - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - } - - log.Infof("Cleaning up container for job %s", rc.JobName) - - if err := rc.stopJobContainer()(ctx); err != nil { - logger.Errorf("Error while cleaning container: %v", err) - } - } - - return nil - })(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix))) + return rc.Executor()(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix))) }) } pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) From 47d199849309bb8c34217e00631309196086a432 Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Thu, 24 Nov 2022 21:59:06 +0100 Subject: [PATCH 2/5] reduce complexity --- pkg/runner/job_executor.go | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/pkg/runner/job_executor.go b/pkg/runner/job_executor.go index 9949bf7ded2..0b85969591c 100644 --- a/pkg/runner/job_executor.go +++ b/pkg/runner/job_executor.go @@ -98,6 +98,12 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo logger := common.Logger(ctx) jobError := common.JobError(ctx) if jobError != nil { + if rc.Config.AutoRemove { + err := info.stopContainer()(ctx) + if err != nil { + return err + } + } info.result("failure") logger.WithField("jobResult", "failure").Infof("\U0001F3C1 Job failed") } else { @@ -128,24 +134,6 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo return postExecutor(ctx) }). Finally(info.interpolateOutputs()). - Finally(func(ctx context.Context) error { - logger := common.Logger(ctx) - if rc.Config.AutoRemove { - var cancel context.CancelFunc - if ctx.Err() == context.Canceled { - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - } - - logger.Infof("Cleaning up container for job %s", rc.JobName) - - if err := rc.stopJobContainer()(ctx); err != nil { - logger.Errorf("Error while cleaning container: %v", err) - } - } - - return nil - }). Finally(info.closeContainer())) } From aef863fc867270dac11d738988fdf7efddddc7da Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Thu, 24 Nov 2022 22:00:11 +0100 Subject: [PATCH 3/5] remove linter exception --- pkg/runner/runner.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 84f2da1103b..65f1897d78e 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -76,8 +76,6 @@ func New(runnerConfig *Config) (Runner, error) { } // NewPlanExecutor ... -// -//nolint:gocyclo func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { maxJobNameLen := 0 From 4e52c3eff4660bdfd0733b860a70da450ea7d494 Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Thu, 24 Nov 2022 22:18:00 +0100 Subject: [PATCH 4/5] reduce cyclic complexity --- pkg/runner/job_executor.go | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/pkg/runner/job_executor.go b/pkg/runner/job_executor.go index 0b85969591c..59b8198d053 100644 --- a/pkg/runner/job_executor.go +++ b/pkg/runner/job_executor.go @@ -95,27 +95,13 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo } postExecutor = postExecutor.Finally(func(ctx context.Context) error { - logger := common.Logger(ctx) jobError := common.JobError(ctx) - if jobError != nil { - if rc.Config.AutoRemove { - err := info.stopContainer()(ctx) - if err != nil { - return err - } - } - info.result("failure") - logger.WithField("jobResult", "failure").Infof("\U0001F3C1 Job failed") - } else { - err := info.stopContainer()(ctx) - if err != nil { - return err - } - info.result("success") - logger.WithField("jobResult", "success").Infof("\U0001F3C1 Job succeeded") + var err error + if rc.Config.AutoRemove || jobError == nil { + err = info.stopContainer()(ctx) } - - return nil + setJobResult(ctx, info, rc, jobError == nil) + return err }) pipeline := make([]common.Executor, 0) @@ -137,6 +123,18 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo Finally(info.closeContainer())) } +func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) { + logger := common.Logger(ctx) + jobResult := "success" + jobResultMessage := "succeeded" + if !success { + jobResult = "failure" + jobResultMessage = "failed" + } + info.result(jobResult) + logger.WithField("jobResult", jobResult).Infof("\U0001F3C1 Job %s", jobResultMessage) +} + func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor { return func(ctx context.Context) error { ctx = withStepLogger(ctx, stepModel.ID, rc.ExprEval.Interpolate(ctx, stepModel.String()), stage.String()) From 3960deee18fbd91f0b46847ab547de00ff18fcba Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 25 Nov 2022 10:51:50 +0100 Subject: [PATCH 5/5] fix: always allow 1 min for stopping and removing the runner, even if we were cancelled --- pkg/runner/job_executor.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/runner/job_executor.go b/pkg/runner/job_executor.go index 59b8198d053..b445708552f 100644 --- a/pkg/runner/job_executor.go +++ b/pkg/runner/job_executor.go @@ -98,6 +98,9 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo jobError := common.JobError(ctx) var err error if rc.Config.AutoRemove || jobError == nil { + // always allow 1 min for stopping and removing the runner, even if we were cancelled + ctx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute) + defer cancel() err = info.stopContainer()(ctx) } setJobResult(ctx, info, rc, jobError == nil) @@ -114,7 +117,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo if ctx.Err() == context.Canceled { // in case of an aborted run, we still should execute the // post steps to allow cleanup. - ctx, cancel = context.WithTimeout(WithJobLogger(context.Background(), rc.Run.JobID, rc.String(), rc.Config, &rc.Masks, rc.Matrix), 5*time.Minute) + ctx, cancel = context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), 5*time.Minute) defer cancel() } return postExecutor(ctx)