Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move autoremove into the jobexecutor #1463

Merged
merged 6 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions pkg/runner/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,16 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
}

postExecutor = postExecutor.Finally(func(ctx context.Context) error {
logger := common.Logger(ctx)
jobError := common.JobError(ctx)
if jobError != nil {
info.result("failure")
logger.WithField("jobResult", "failure").Infof("\U0001F3C1 Job failed")
} else {
err := info.stopContainer()(ctx)
if err != nil {
return err
}
info.result("success")
logger.WithField("jobResult", "success").Infof("\U0001F3C1 Job succeeded")
var err error
if rc.Config.AutoRemove || jobError == nil {
KnisterPeter marked this conversation as resolved.
Show resolved Hide resolved
// always allow 1 min for stopping and removing the runner, even if we were cancelled
ctx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute)
defer cancel()
err = info.stopContainer()(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail if the job was canceled before (e.g. by os signal).
Please make sure that a fallback context is used in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code to allow docker 1min for closing / removing the runner. Should be safe now.

Additionally I now reuse the existing joblogger instead of creating one. I always want to use my own logger provided to the RunContext executor.

Do we want to add a forceCancel context as field to the context, so we can force quit act by pressing ctrl+c twice? This could be the parent context of the context with a timeout.

Needs to be tested manually, I don't think we have tests for cancelling act.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a forceCancel context as field to the context, so we can force quit act by pressing ctrl+c twice? This could be the parent context of the context with a timeout.

That was the case before. The BackgroundContext used as a fallback was cancelable as well. That would then cancel/stop the cleanup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the case before. The BackgroundContext used as a fallback was cancelable as well. That would then cancel/stop the cleanup.

I want both a normal cancel with cleanup and additionally the opportunity to send another cancellation signal to abort cleanup and quit as soon as possible.
Previously we had one of them, but never both at the same time.
For Example this scenario

  • Start a long running job
  • cancel the normal context it while it executes the main stage
  • A new context for the post executors is created
  • This job has a long runnng post step
  • cancel again this time force cancel context, because I don't want to wait otherwise I have to open the task manager and kill act.
  • act stopped before hit 5min

Your CI system could just never fire the forcecancel context and you have an opt out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood and I'm fine with that.
I just thought that it did work before, as a second cancelation was respected by act (if I remember correctly).

}

return nil
setJobResult(ctx, info, rc, jobError == nil)
return err
})

pipeline := make([]common.Executor, 0)
Expand All @@ -122,7 +117,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
if ctx.Err() == context.Canceled {
// in case of an aborted run, we still should execute the
// post steps to allow cleanup.
ctx, cancel = context.WithTimeout(WithJobLogger(context.Background(), rc.Run.JobID, rc.String(), rc.Config, &rc.Masks, rc.Matrix), 5*time.Minute)
ctx, cancel = context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), 5*time.Minute)
defer cancel()
}
return postExecutor(ctx)
Expand All @@ -131,6 +126,18 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
Finally(info.closeContainer()))
}

func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {
logger := common.Logger(ctx)
jobResult := "success"
jobResultMessage := "succeeded"
if !success {
jobResult = "failure"
jobResultMessage = "failed"
}
info.result(jobResult)
logger.WithField("jobResult", jobResult).Infof("\U0001F3C1 Job %s", jobResultMessage)
}

func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor {
return func(ctx context.Context) error {
ctx = withStepLogger(ctx, stepModel.ID, rc.ExprEval.Interpolate(ctx, stepModel.String()), stage.String())
Expand Down
29 changes: 2 additions & 27 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
"time"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -77,18 +76,15 @@ func New(runnerConfig *Config) (Runner, error) {
}

// NewPlanExecutor ...
//
//nolint:gocyclo
func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
maxJobNameLen := 0

stagePipeline := make([]common.Executor, 0)
for i := range plan.Stages {
s := i
stage := plan.Stages[i]
stagePipeline = append(stagePipeline, func(ctx context.Context) error {
pipeline := make([]common.Executor, 0)
for r, run := range stage.Runs {
for _, run := range stage.Runs {
stageExecutor := make([]common.Executor, 0)
job := run.Job()

Expand Down Expand Up @@ -123,29 +119,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
maxJobNameLen = len(rc.String())
}
stageExecutor = append(stageExecutor, func(ctx context.Context) error {
logger := common.Logger(ctx)
jobName := fmt.Sprintf("%-*s", maxJobNameLen, rc.String())
return rc.Executor().Finally(func(ctx context.Context) error {
isLastRunningContainer := func(currentStage int, currentRun int) bool {
return currentStage == len(plan.Stages)-1 && currentRun == len(stage.Runs)-1
}

if runner.config.AutoRemove && isLastRunningContainer(s, r) {
var cancel context.CancelFunc
if ctx.Err() == context.Canceled {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
}

log.Infof("Cleaning up container for job %s", rc.JobName)

if err := rc.stopJobContainer()(ctx); err != nil {
logger.Errorf("Error while cleaning container: %v", err)
}
}

return nil
})(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix)))
return rc.Executor()(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix)))
})
}
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))
Expand Down