Skip to content
This repository has been archived by the owner on Dec 26, 2023. It is now read-only.

Commit

Permalink
fix: agent race error (#537)
Browse files Browse the repository at this point in the history
Fixes #486
  • Loading branch information
leg100 committed Jul 25, 2023
1 parent 8ffbd2d commit 6b9e6b1
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 12 deletions.
9 changes: 7 additions & 2 deletions internal/agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package agent

import (
"context"
"errors"

"github.com/leg100/otf/internal"
"github.com/leg100/otf/internal/run"
)

Expand All @@ -27,9 +29,12 @@ func (w *worker) Start(ctx context.Context) {
func (w *worker) handle(ctx context.Context, r *run.Run) {
log := w.Logger.WithValues("run", r.ID, "phase", r.Phase())

// Claim run phase
// claim run phase
r, err := w.StartPhase(ctx, r.ID, r.Phase(), run.PhaseStartOptions{AgentID: DefaultID})
if err != nil {
if errors.Is(err, internal.ErrPhaseAlreadyStarted) {
// another agent has already claimed it
return
} else if err != nil {
log.Error(err, "starting phase")
return
}
Expand Down
11 changes: 8 additions & 3 deletions internal/api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,18 @@ func (a *api) startPhase(w http.ResponseWriter, r *http.Request) {
return
}

run, err := a.StartPhase(r.Context(), params.RunID, params.Phase, run.PhaseStartOptions{})
if err != nil {
started, err := a.StartPhase(r.Context(), params.RunID, params.Phase, run.PhaseStartOptions{})
if errors.Is(err, internal.ErrPhaseAlreadyStarted) {
// A bit silly, but OTF uses the teapot status as a unique means of
// informing the agent the phase has been started by another agent.
w.WriteHeader(http.StatusTeapot)
return
} else if err != nil {
Error(w, err)
return
}

a.writeResponse(w, r, run)
a.writeResponse(w, r, started)
}

func (a *api) finishPhase(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 2 additions & 0 deletions internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ var (
ErrRunDiscardNotAllowed = errors.New("run was not paused for confirmation or priority; discard not allowed")
ErrRunCancelNotAllowed = errors.New("run was not planning or applying; cancel not allowed")
ErrRunForceCancelNotAllowed = errors.New("run was not planning or applying, has not been canceled non-forcefully, or the cool-off period has not yet passed")
//
ErrPhaseAlreadyStarted = errors.New("phase already started")
)

type (
Expand Down
2 changes: 2 additions & 0 deletions internal/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ func checkResponseCode(r *http.Response) error {
return internal.ErrUnauthorized
case 404:
return internal.ErrResourceNotFound
case 418:
return internal.ErrPhaseAlreadyStarted
}
// Decode the error payload.
var payload struct {
Expand Down
3 changes: 1 addition & 2 deletions internal/run/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func (c *Client) StartPhase(ctx context.Context, id string, phase internal.Phase
}

run := &types.Run{}
err = c.Do(ctx, req, run)
if err != nil {
if err := c.Do(ctx, req, run); err != nil {
return nil, err
}

Expand Down
3 changes: 0 additions & 3 deletions internal/run/phase.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package run

import (
"errors"
"time"

"github.com/leg100/otf/internal"
Expand All @@ -17,8 +16,6 @@ const (
PhaseUnreachable PhaseStatus = "unreachable"
)

var ErrPhaseAlreadyStarted = errors.New("phase already started")

type (
// Phase is a section of work performed by a run.
Phase struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (r *Run) Start(phase internal.PhaseType) error {
r.updateStatus(internal.RunApplying)
r.Apply.UpdateStatus(PhaseRunning)
case internal.RunPlanning, internal.RunApplying:
return ErrPhaseAlreadyStarted
return internal.ErrPhaseAlreadyStarted
default:
return ErrInvalidRunStateTransition
}
Expand Down
10 changes: 9 additions & 1 deletion internal/run/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package run

import (
"context"
"errors"
"fmt"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -289,7 +290,14 @@ func (s *service) StartPhase(ctx context.Context, runID string, phase internal.P
return run.Start(phase)
})
if err != nil {
s.Error(err, "starting "+string(phase), "id", runID, "subject", subject)
// only log error if not an phase already started error - this occurs when
// multiple agents 'race' to start the phase and only one can do so,
// whereas the other agents receive this error which is a legitimate
// error condition and not something that should be reported to the
// user.
if !errors.Is(err, internal.ErrPhaseAlreadyStarted) {
s.Error(err, "starting "+string(phase), "id", runID, "subject", subject)
}
return nil, err
}
s.V(0).Info("started "+string(phase), "id", runID, "subject", subject)
Expand Down

0 comments on commit 6b9e6b1

Please sign in to comment.