Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hatchery): do not add same job into workersStarter #6722

Merged
merged 7 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Common struct {
Clientv2 cdsclient.HatcheryServiceClient
mapServiceNextLineNumberMutex sync.Mutex
mapServiceNextLineNumber map[string]int64
mapPendingWorkerCreation *sdk.HatcheryPendingWorkerCreation
}

func (c *Common) MaxHeartbeat() int {
Expand Down Expand Up @@ -65,6 +66,14 @@ func (c *Common) GetGoRoutines() *sdk.GoRoutines {
return c.GoRoutines
}

func (c *Common) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation {
if c.mapPendingWorkerCreation == nil {
c.mapPendingWorkerCreation = &sdk.HatcheryPendingWorkerCreation{}
c.mapPendingWorkerCreation.Init()
}
return c.mapPendingWorkerCreation
}

// CommonServe start the HatcheryLocal server
func (c *Common) CommonServe(ctx context.Context, h hatchery.Interface) error {
log.Info(ctx, "%s> Starting service %s (%s)...", c.Name(), h.Configuration().Name, sdk.VERSION)
Expand Down
26 changes: 23 additions & 3 deletions sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ovh/cds/sdk"
"github.com/rockbears/log"
)

// shrinkQueue is used to shrink the polled queue 200% of the channel capacity (l)
Expand Down Expand Up @@ -42,7 +43,7 @@ func shrinkQueue(queue *sdk.WorkflowQueue, nbJobsToKeep int) time.Time {
return t0
}

func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
jobsTicker := time.NewTicker(delay)

// This goroutine call the SSE route
Expand Down Expand Up @@ -86,6 +87,14 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, j
// push the job in the channel
if job.Status == sdk.StatusWaiting && job.BookedBy.Name == "" {
job.Header["WS"] = "true"

id := strconv.FormatInt(job.ID, 10)
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) {
log.Debug(ctx, "skipping job %s", id)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)

jobs <- *job
}
}
Expand Down Expand Up @@ -115,8 +124,19 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, j
fmt.Println("Jobs Queue size: ", len(queue))
}

shrinkQueue(&queue, cap(jobs))
for _, j := range queue {
queueFiltered := sdk.WorkflowQueue{}
for _, job := range queue {
id := strconv.FormatInt(job.ID, 10)
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) {
log.Debug(ctx, "skipping job %s", id)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
queueFiltered = append(queueFiltered, job)
}

shrinkQueue(&queueFiltered, cap(jobs))
for _, j := range queueFiltered {
jobs <- j
}
}
Expand Down
25 changes: 20 additions & 5 deletions sdk/cdsclient/client_queue_V2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ovh/cds/sdk"
"github.com/rockbears/log"
)

func (c *client) V2QueueJobStepUpdate(ctx context.Context, regionName string, jobRunID string, stepsStatus sdk.JobStepsStatus) error {
Expand Down Expand Up @@ -103,7 +104,7 @@ func (c *client) V2QueueGetJobRun(ctx context.Context, regionName, id string) (*
return &job, nil
}

func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error {
jobsTicker := time.NewTicker(delay)

// This goroutine call the Websocket
Expand Down Expand Up @@ -135,6 +136,11 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
}
// push the job in the channel
if j.Status == sdk.StatusWaiting {
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(wsEvent.Event.JobRunID) {
log.Debug(ctx, "skipping job %s", wsEvent.Event.JobRunID)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(wsEvent.Event.JobRunID)
jobs <- *j
}
case <-jobsTicker.C:
Expand All @@ -161,14 +167,23 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
fmt.Println("Jobs Queue size: ", len(queue))
}

queueFiltered := []sdk.V2WorkflowRunJob{}
for _, job := range queue {
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(job.ID) {
log.Debug(ctx, "skipping job %s", job.ID)
continue
}
pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID)
queueFiltered = append(queueFiltered, job)
}

max := cap(jobs) * 2
if len(queue) < max {
max = len(queue)
if len(queueFiltered) < max {
max = len(queueFiltered)
}
for i := 0; i < max; i++ {
jobs <- queue[i]
jobs <- queueFiltered[i]
}

}
}
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/cdsclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ type ProjectVariablesClient interface {

type V2QueueClient interface {
V2QueueGetJobRun(ctx context.Context, regionName string, id string) (*sdk.V2WorkflowRunJob, error)
V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
V2QueueJobResult(ctx context.Context, region string, jobRunID string, result sdk.V2WorkflowRunJobResult) error
V2QueueJobRunResultGet(ctx context.Context, regionName string, jobRunID string, runResultID string) (*sdk.V2WorkflowRunResult, error)
V2QueueJobRunResultsGet(ctx context.Context, regionName string, jobRunID string) ([]sdk.V2WorkflowRunResult, error)
Expand All @@ -311,7 +311,7 @@ type V2QueueClient interface {
type QueueClient interface {
QueueWorkflowNodeJobRun(mods ...RequestModifier) ([]sdk.WorkflowNodeJobRun, error)
QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string) (sdk.WorkflowNodeJobRunCount, error)
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error
QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (*sdk.WorkflowNodeJobRunData, error)
QueueJobBook(ctx context.Context, id string) (sdk.WorkflowNodeJobRunBooked, error)
QueueJobRelease(ctx context.Context, id string) error
Expand Down
56 changes: 28 additions & 28 deletions sdk/cdsclient/mock_cdsclient/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions sdk/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -46,6 +47,35 @@ type Hatchery struct {
Token string `json:"token,omitempty" db:"-" cli:"token,omitempty"`
}

type HatcheryPendingWorkerCreation struct {
mapSpawnJobRequest map[string]struct{}
mapSpawnJobRequestMutex *sync.Mutex
}

func (c *HatcheryPendingWorkerCreation) Init() {
c.mapSpawnJobRequest = make(map[string]struct{})
c.mapSpawnJobRequestMutex = new(sync.Mutex)
}

func (c *HatcheryPendingWorkerCreation) SetJobInPendingWorkerCreation(id string) {
c.mapSpawnJobRequestMutex.Lock()
c.mapSpawnJobRequest[id] = struct{}{}
c.mapSpawnJobRequestMutex.Unlock()
}

func (c *HatcheryPendingWorkerCreation) RemoveJobFromPendingWorkerCreation(id string) {
c.mapSpawnJobRequestMutex.Lock()
delete(c.mapSpawnJobRequest, id)
c.mapSpawnJobRequestMutex.Unlock()
}

func (c *HatcheryPendingWorkerCreation) IsJobAlreadyPendingWorkerCreation(id string) bool {
c.mapSpawnJobRequestMutex.Lock()
_, has := c.mapSpawnJobRequest[id]
c.mapSpawnJobRequestMutex.Unlock()
return has
}

type HatcheryConfig map[string]interface{}

func (hc HatcheryConfig) Value() (driver.Value, error) {
Expand Down
Loading