Skip to content

Commit

Permalink
Add verbosity to diagnose mis-signaling
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitrii Okunev <xaionaro@meta.com>
  • Loading branch information
xaionaro committed May 25, 2023
1 parent 9a7a576 commit b48cf54
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 13 deletions.
5 changes: 4 additions & 1 deletion pkg/jobmanager/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ func (jm *JobManager) Run(ctx context.Context, resumeJobs bool) error {
}

apiCtx, apiCancel := context.WithCancel(ctx)
jm.apiCancel = apiCancel
jm.apiCancel = func() {
logger.FromCtx(ctx).Debugf("cancelling API context")
apiCancel()
}

errCh := make(chan error, 1)
go func() {
Expand Down
29 changes: 22 additions & 7 deletions pkg/jobmanager/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/facebookincubator/go-belt/beltctx"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/experimental/metrics"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/linuxboot/contest/pkg/api"
Expand Down Expand Up @@ -84,9 +85,19 @@ func (jm *JobManager) start(ev *api.Event) *api.EventResponse {
func (jm *JobManager) startJob(ctx context.Context, j *job.Job, resumeState *job.PauseEventPayload) {
jm.jobsMu.Lock()
defer jm.jobsMu.Unlock()
jobCtx, jobCancel := context.WithCancel(ctx)
jobCtx := ctx
jobCtx, jobCancel := context.WithCancel(jobCtx)
jobCtx, jobPause := signaling.WithSignal(jobCtx, signals.Paused)
jm.jobs[j.ID] = &jobInfo{job: j, pause: jobPause, cancel: jobCancel}
jm.jobs[j.ID] = &jobInfo{
job: j,
pause: func() {
logger.FromCtx(ctx).Debugf("pausing job")
jobPause()
},
cancel: func() {
logger.FromCtx(ctx).Debugf("cancelling job context")
jobCancel()
}}
go jm.runJob(jobCtx, j, resumeState)
}

Expand Down Expand Up @@ -117,11 +128,13 @@ func (jm *JobManager) runJob(ctx context.Context, j *job.Job, resumeState *job.P
logger.FromCtx(ctx).Debugf("Job %d: runner finished, err %v", j.ID, err)
switch err {
case context.Canceled:
_ = jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
_err := jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
errmon.ObserveErrorCtx(ctx, _err)
return
case signals.Paused:
if err := jm.emitEventPayload(ctx, j.ID, job.EventJobPaused, resumeState); err != nil {
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("Job %+v failed pausing: %v", j, err))
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("job %+v failed pausing: %w", j, err))
errmon.ObserveErrorCtx(ctx, _err)
} else {
logger.FromCtx(ctx).Infof("Successfully paused job %d (run %d, %d targets)", j.ID, resumeState.RunID, len(resumeState.Targets))
logger.FromCtx(ctx).Debugf("Job %d pause state: %+v", j.ID, resumeState)
Expand All @@ -131,17 +144,19 @@ func (jm *JobManager) runJob(ctx context.Context, j *job.Job, resumeState *job.P
select {
case <-signaling.Until(ctx, signals.Paused):
// We were asked to pause but failed to do so.
pauseErr := fmt.Errorf("Job %+v failed pausing: %v", j, err)
pauseErr := fmt.Errorf("job %+v failed pausing: %w", j, err)
logger.FromCtx(ctx).Errorf("%v", pauseErr)
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
errmon.ObserveErrorCtx(ctx, _err)
return
default:
}
logger.FromCtx(ctx).Infof("Job %d finished", j.ID)
// at this point it is safe to emit the job status event. Note: this is
// checking `err` from the `jm.jobRunner.Run()` call above.
if err != nil {
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("Job %d failed after %s: %w", j.ID, duration, err))
_err := jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("job %d failed after %s: %w", j.ID, duration, err))
errmon.ObserveErrorCtx(ctx, _err)
} else {
logger.FromCtx(ctx).Infof("Job %+v completed after %s", j, duration)
err = jm.emitEvent(ctx, j.ID, job.EventJobCompleted)
Expand Down
6 changes: 5 additions & 1 deletion pkg/runner/step_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ func (ss *stepState) Run(ctx context.Context) error {
return nil
}

stepCtx, cancel := context.WithCancel(ctx)
stepCtx, stepCtxCancel := context.WithCancel(ctx)
cancel := func() {
logger.FromCtx(stepCtx).Debugf("cancelling step context")
stepCtxCancel()
}
stepCtx = beltctx.WithField(stepCtx, "step_index", strconv.Itoa(ss.stepIndex))
stepCtx = beltctx.WithField(stepCtx, "step_label", ss.sb.TestStepLabel)

Expand Down
11 changes: 8 additions & 3 deletions pkg/runner/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ func (tr *TestRunner) Run(

// Peel off contexts used for steps and target handlers.
runCtx, runCancel := context.WithCancel(ctx)
defer runCancel()
defer func() {
logger.FromCtx(ctx).Debugf("run finished, cancelling the context")
runCancel()
}()

var targetStates map[string]*targetState

Expand Down Expand Up @@ -231,7 +234,9 @@ func (tr *TestRunner) Run(
if !ok {
return resultErr
}
logger.FromCtx(ctx).Tracef("got event from targetErrors: %v", runErr)
case runErr = <-stepsErrorsCh:
logger.FromCtx(ctx).Tracef("got event from stepsErrorsCh: %v", runErr)
}
if runErr != nil && runErr != signals.Paused && resultErr == nil {
resultErr = runErr
Expand Down Expand Up @@ -348,7 +353,7 @@ func (tr *TestRunner) waitSteps(ctx context.Context) ([]json.RawMessage, error)
if err != nil {
stepsNeverReturned = append(stepsNeverReturned, ss.GetTestStepLabel())
ss.SetError(ctx, &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.GetTestStepLabel()}})
// Stop step context, this will help release the reader.
logger.FromCtx(ctx).Debugf("stopping step context, this will help release the reader")
ss.ForceStop()
} else if resultErr == nil && result.Err != nil && result.Err != signals.Paused {
resultErr = result.Err
Expand Down Expand Up @@ -481,7 +486,7 @@ loop:

case <-ctx.Done():
err = ctx.Err()
logger.FromCtx(ctx).Debugf("Canceled target context during waiting for target result")
logger.FromCtx(ctx).Debugf("Canceled target context during waiting for target result: %v", err)
}

case signals.Paused:
Expand Down
5 changes: 4 additions & 1 deletion tests/integ/jobmanager/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/linuxboot/contest/pkg/target"
"github.com/linuxboot/contest/pkg/types"

"github.com/facebookincubator/go-belt/beltctx"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/linuxboot/contest/plugins/reporters/targetsuccess"
"github.com/linuxboot/contest/plugins/targetlocker/inmemory"
Expand Down Expand Up @@ -355,7 +356,9 @@ func (suite *TestJobManagerSuite) initJobManager(instanceTag string) {
require.NoError(suite.T(), err)

suite.jm = jm
suite.jmCtx, suite.jmCancel = context.WithCancel(logging.WithBelt(context.Background(), logger.LevelDebug))
ctx := logging.WithBelt(context.Background(), logger.LevelTrace)
ctx = beltctx.WithField(ctx, "integ-test", suite.T().Name())
suite.jmCtx, suite.jmCancel = context.WithCancel(ctx)
suite.jmCtx, suite.jmPause = signaling.WithSignal(suite.jmCtx, signals.Paused)
}

Expand Down

0 comments on commit b48cf54

Please sign in to comment.