diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 73d4629c3e..c36032d376 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -32,6 +32,7 @@ type Common struct { Clientv2 cdsclient.HatcheryServiceClient mapServiceNextLineNumberMutex sync.Mutex mapServiceNextLineNumber map[string]int64 + mapPendingWorkerCreation *sdk.HatcheryPendingWorkerCreation } func (c *Common) MaxHeartbeat() int { @@ -65,6 +66,14 @@ func (c *Common) GetGoRoutines() *sdk.GoRoutines { return c.GoRoutines } +func (c *Common) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation { + if c.mapPendingWorkerCreation == nil { + c.mapPendingWorkerCreation = &sdk.HatcheryPendingWorkerCreation{} + c.mapPendingWorkerCreation.Init() + } + return c.mapPendingWorkerCreation +} + // CommonServe start the HatcheryLocal server func (c *Common) CommonServe(ctx context.Context, h hatchery.Interface) error { log.Info(ctx, "%s> Starting service %s (%s)...", c.Name(), h.Configuration().Name, sdk.VERSION) diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 96d2edbe4d..e95fd43238 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ovh/cds/sdk" + "github.com/rockbears/log" ) // shrinkQueue is used to shrink the polled queue 200% of the channel capacity (l) @@ -42,7 +43,7 @@ func shrinkQueue(queue *sdk.WorkflowQueue, nbJobsToKeep int) time.Time { return t0 } -func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error { +func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error { jobsTicker := time.NewTicker(delay) // This goroutine call the SSE route @@ -86,6 +87,14 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, j // push the job in the channel if job.Status == sdk.StatusWaiting && job.BookedBy.Name == "" { job.Header["WS"] = "true" + + id := strconv.FormatInt(job.ID, 10) + if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) { + log.Debug(ctx, "skipping job %s", id) + continue + } + pendingWorkerCreation.SetJobInPendingWorkerCreation(id) + jobs <- *job } } @@ -115,8 +124,19 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, j fmt.Println("Jobs Queue size: ", len(queue)) } - shrinkQueue(&queue, cap(jobs)) - for _, j := range queue { + queueFiltered := sdk.WorkflowQueue{} + for _, job := range queue { + id := strconv.FormatInt(job.ID, 10) + if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) { + log.Debug(ctx, "skipping job %s", id) + continue + } + pendingWorkerCreation.SetJobInPendingWorkerCreation(id) + queueFiltered = append(queueFiltered, job) + } + + shrinkQueue(&queueFiltered, cap(jobs)) + for _, j := range queueFiltered { jobs <- j } } diff --git a/sdk/cdsclient/client_queue_V2.go b/sdk/cdsclient/client_queue_V2.go index 71dace3bfe..53be1e346f 100644 --- a/sdk/cdsclient/client_queue_V2.go +++ b/sdk/cdsclient/client_queue_V2.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ovh/cds/sdk" + "github.com/rockbears/log" ) func (c *client) V2QueueJobStepUpdate(ctx context.Context, regionName string, jobRunID string, stepsStatus sdk.JobStepsStatus) error { @@ -103,7 +104,7 @@ func (c *client) V2QueueGetJobRun(ctx context.Context, regionName, id string) (* return &job, nil } -func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error { +func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error { jobsTicker := time.NewTicker(delay) // This goroutine call the Websocket @@ -135,6 +136,11 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin } // push the job in the channel if j.Status == sdk.StatusWaiting { + if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(wsEvent.Event.JobRunID) { + log.Debug(ctx, "skipping job %s", wsEvent.Event.JobRunID) + continue + } + pendingWorkerCreation.SetJobInPendingWorkerCreation(wsEvent.Event.JobRunID) jobs <- *j } case <-jobsTicker.C: @@ -161,14 +167,23 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin fmt.Println("Jobs Queue size: ", len(queue)) } + queueFiltered := []sdk.V2WorkflowRunJob{} + for _, job := range queue { + if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(job.ID) { + log.Debug(ctx, "skipping job %s", job.ID) + continue + } + pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID) + queueFiltered = append(queueFiltered, job) + } + max := cap(jobs) * 2 - if len(queue) < max { - max = len(queue) + if len(queueFiltered) < max { + max = len(queueFiltered) } for i := 0; i < max; i++ { - jobs <- queue[i] + jobs <- queueFiltered[i] } - } } } diff --git a/sdk/cdsclient/interface.go b/sdk/cdsclient/interface.go index 640b277e73..f851c25890 100644 --- a/sdk/cdsclient/interface.go +++ b/sdk/cdsclient/interface.go @@ -295,7 +295,7 @@ type ProjectVariablesClient interface { type V2QueueClient interface { V2QueueGetJobRun(ctx context.Context, regionName string, id string) (*sdk.V2WorkflowRunJob, error) - V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error + V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error V2QueueJobResult(ctx context.Context, region string, jobRunID string, result sdk.V2WorkflowRunJobResult) error V2QueueJobRunResultGet(ctx context.Context, regionName string, jobRunID string, runResultID string) (*sdk.V2WorkflowRunResult, error) V2QueueJobRunResultsGet(ctx context.Context, regionName string, jobRunID string) ([]sdk.V2WorkflowRunResult, error) @@ -311,7 +311,7 @@ type V2QueueClient interface { type QueueClient interface { QueueWorkflowNodeJobRun(mods ...RequestModifier) ([]sdk.WorkflowNodeJobRun, error) QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string) (sdk.WorkflowNodeJobRunCount, error) - QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error + QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (*sdk.WorkflowNodeJobRunData, error) QueueJobBook(ctx context.Context, id string) (sdk.WorkflowNodeJobRunBooked, error) QueueJobRelease(ctx context.Context, id string) error diff --git a/sdk/cdsclient/mock_cdsclient/interface_mock.go b/sdk/cdsclient/mock_cdsclient/interface_mock.go index 7f818f0f40..4d0b8533d7 100644 --- a/sdk/cdsclient/mock_cdsclient/interface_mock.go +++ b/sdk/cdsclient/mock_cdsclient/interface_mock.go @@ -2820,9 +2820,9 @@ func (mr *MockHatcheryServiceClientMockRecorder) V2QueueJobStepUpdate(ctx, regio } // V2QueuePolling mocks base method. -func (m *MockHatcheryServiceClient) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockHatcheryServiceClient) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, region, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -2832,9 +2832,9 @@ func (m *MockHatcheryServiceClient) V2QueuePolling(ctx context.Context, region s } // V2QueuePolling indicates an expected call of V2QueuePolling. -func (mr *MockHatcheryServiceClientMockRecorder) V2QueuePolling(ctx, region, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockHatcheryServiceClientMockRecorder) V2QueuePolling(ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, region, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2QueuePolling", reflect.TypeOf((*MockHatcheryServiceClient)(nil).V2QueuePolling), varargs...) } @@ -3934,9 +3934,9 @@ func (mr *MockV2QueueClientMockRecorder) V2QueueJobStepUpdate(ctx, regionName, i } // V2QueuePolling mocks base method. -func (m *MockV2QueueClient) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockV2QueueClient) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, region, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -3946,9 +3946,9 @@ func (m *MockV2QueueClient) V2QueuePolling(ctx context.Context, region string, g } // V2QueuePolling indicates an expected call of V2QueuePolling. -func (mr *MockV2QueueClientMockRecorder) V2QueuePolling(ctx, region, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockV2QueueClientMockRecorder) V2QueuePolling(ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, region, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2QueuePolling", reflect.TypeOf((*MockV2QueueClient)(nil).V2QueuePolling), varargs...) } @@ -4120,9 +4120,9 @@ func (mr *MockQueueClientMockRecorder) QueueJobTag(ctx, jobID, tags interface{}) } // QueuePolling mocks base method. -func (m *MockQueueClient) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockQueueClient) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -4132,9 +4132,9 @@ func (m *MockQueueClient) QueuePolling(ctx context.Context, goRoutines *sdk.GoRo } // QueuePolling indicates an expected call of QueuePolling. -func (mr *MockQueueClientMockRecorder) QueuePolling(ctx, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockQueueClientMockRecorder) QueuePolling(ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueuePolling", reflect.TypeOf((*MockQueueClient)(nil).QueuePolling), varargs...) } @@ -8892,9 +8892,9 @@ func (mr *MockInterfaceMockRecorder) QueueJobTag(ctx, jobID, tags interface{}) * } // QueuePolling mocks base method. -func (m *MockInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -8904,9 +8904,9 @@ func (m *MockInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRout } // QueuePolling indicates an expected call of QueuePolling. -func (mr *MockInterfaceMockRecorder) QueuePolling(ctx, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockInterfaceMockRecorder) QueuePolling(ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueuePolling", reflect.TypeOf((*MockInterface)(nil).QueuePolling), varargs...) } @@ -9901,9 +9901,9 @@ func (mr *MockInterfaceMockRecorder) V2QueueJobStepUpdate(ctx, regionName, id, s } // V2QueuePolling mocks base method. -func (m *MockInterface) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockInterface) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, region, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -9913,9 +9913,9 @@ func (m *MockInterface) V2QueuePolling(ctx context.Context, region string, goRou } // V2QueuePolling indicates an expected call of V2QueuePolling. -func (mr *MockInterfaceMockRecorder) V2QueuePolling(ctx, region, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockInterfaceMockRecorder) V2QueuePolling(ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, region, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2QueuePolling", reflect.TypeOf((*MockInterface)(nil).V2QueuePolling), varargs...) } @@ -11458,9 +11458,9 @@ func (mr *MockV2WorkerInterfaceMockRecorder) V2QueueJobStepUpdate(ctx, regionNam } // V2QueuePolling mocks base method. -func (m *MockV2WorkerInterface) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockV2WorkerInterface) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, region, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -11470,9 +11470,9 @@ func (m *MockV2WorkerInterface) V2QueuePolling(ctx context.Context, region strin } // V2QueuePolling indicates an expected call of V2QueuePolling. -func (mr *MockV2WorkerInterfaceMockRecorder) V2QueuePolling(ctx, region, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockV2WorkerInterfaceMockRecorder) V2QueuePolling(ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, region, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2QueuePolling", reflect.TypeOf((*MockV2WorkerInterface)(nil).V2QueuePolling), varargs...) } @@ -11890,9 +11890,9 @@ func (mr *MockWorkerInterfaceMockRecorder) QueueJobTag(ctx, jobID, tags interfac } // QueuePolling mocks base method. -func (m *MockWorkerInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockWorkerInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -11902,9 +11902,9 @@ func (m *MockWorkerInterface) QueuePolling(ctx context.Context, goRoutines *sdk. } // QueuePolling indicates an expected call of QueuePolling. -func (mr *MockWorkerInterfaceMockRecorder) QueuePolling(ctx, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockWorkerInterfaceMockRecorder) QueuePolling(ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueuePolling", reflect.TypeOf((*MockWorkerInterface)(nil).QueuePolling), varargs...) } diff --git a/sdk/hatchery.go b/sdk/hatchery.go index caa8a8c744..a265eaed95 100644 --- a/sdk/hatchery.go +++ b/sdk/hatchery.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strconv" + "sync" "time" ) @@ -46,6 +47,35 @@ type Hatchery struct { Token string `json:"token,omitempty" db:"-" cli:"token,omitempty"` } +type HatcheryPendingWorkerCreation struct { + mapSpawnJobRequest map[string]struct{} + mapSpawnJobRequestMutex *sync.Mutex +} + +func (c *HatcheryPendingWorkerCreation) Init() { + c.mapSpawnJobRequest = make(map[string]struct{}) + c.mapSpawnJobRequestMutex = new(sync.Mutex) +} + +func (c *HatcheryPendingWorkerCreation) SetJobInPendingWorkerCreation(id string) { + c.mapSpawnJobRequestMutex.Lock() + c.mapSpawnJobRequest[id] = struct{}{} + c.mapSpawnJobRequestMutex.Unlock() +} + +func (c *HatcheryPendingWorkerCreation) RemoveJobFromPendingWorkerCreation(id string) { + c.mapSpawnJobRequestMutex.Lock() + delete(c.mapSpawnJobRequest, id) + c.mapSpawnJobRequestMutex.Unlock() +} + +func (c *HatcheryPendingWorkerCreation) IsJobAlreadyPendingWorkerCreation(id string) bool { + c.mapSpawnJobRequestMutex.Lock() + _, has := c.mapSpawnJobRequest[id] + c.mapSpawnJobRequestMutex.Unlock() + return has +} + type HatcheryConfig map[string]interface{} func (hc HatcheryConfig) Value() (driver.Value, error) { diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index 407f81de03..5ecad137c0 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -26,7 +26,6 @@ var ( defaultMaxProvisioning = 10 models []sdk.Model defaultMaxAttemptsNumberBeforeFailure = 5 - CacheSpawnIDsTTL = 10 * time.Second CacheNbAttemptsIDsTTL = 1 * time.Hour ) @@ -86,9 +85,6 @@ func Create(ctx context.Context, h Interface) error { v2Runjobs := make(chan sdk.V2WorkflowRunJob, h.Configuration().Provision.MaxConcurrentProvisioning) errs := make(chan error, 1) - // Create a cache to keep in memory the jobID processed in the last 10s. - cacheSpawnIDs := cache.New(CacheSpawnIDsTTL, 2*CacheSpawnIDsTTL) - // Create a cache to only process each jobID only a number of attempts before force to fail the job cacheNbAttemptsIDs := &CacheNbAttemptsJobIDs{ cache: cache.New(CacheNbAttemptsIDsTTL, 2*CacheNbAttemptsIDsTTL), @@ -98,7 +94,7 @@ func Create(ctx context.Context, h Interface) error { h.GetGoRoutines().Run(ctx, "V2QueuePolling", func(ctx context.Context) { log.Debug(ctx, "starting v2 queue polling") - if err := h.CDSClientV2().V2QueuePolling(ctx, h.GetRegion(), h.GetGoRoutines(), v2Runjobs, errs, 20*time.Second); err != nil { + if err := h.CDSClientV2().V2QueuePolling(ctx, h.GetRegion(), h.GetGoRoutines(), h.GetMapPendingWorkerCreation(), v2Runjobs, errs, 20*time.Second); err != nil { log.Error(ctx, "V2 Queues polling stopped: %v", err) } }) @@ -120,7 +116,7 @@ func Create(ctx context.Context, h Interface) error { ms = append(ms, cdsclient.Region(regions...)) } - if err := h.CDSClient().QueuePolling(ctx, h.GetGoRoutines(), wjobs, errs, 20*time.Second, ms...); err != nil { + if err := h.CDSClient().QueuePolling(ctx, h.GetGoRoutines(), h.GetMapPendingWorkerCreation(), wjobs, errs, 20*time.Second, ms...); err != nil { log.Error(ctx, "Queues polling stopped: %v", err) } }) @@ -215,16 +211,6 @@ func Create(ctx context.Context, h Interface) error { stats.Record(currentCtx, GetMetrics().JobsWebsocket.M(1)) } - //Check if the jobs is concerned by a pending worker creation - if _, exist := cacheSpawnIDs.Get(strconv.FormatInt(j.ID, 10)); exist { - log.Debug(currentCtx, "job %d already spawned in previous routine", j.ID) - endTrace("already spawned") - continue - } - - //Before doing anything, push in cache - cacheSpawnIDs.SetDefault(strconv.FormatInt(j.ID, 10), j.ID) - //Check bookedBy current hatchery if j.BookedBy.ID != 0 { log.Debug(currentCtx, "hatchery> job %d is already booked", j.ID) @@ -270,7 +256,6 @@ func Create(ctx context.Context, h Interface) error { log.Debug(currentCtx, "cannot launch this job because it does not contains a region prerequisite and IgnoreJobWithNoRegion=true in hatchery configuration") canTakeJob = false } else if isWithModels { - // Test ascode model modelPath := strings.Split(jobModel, "/") if len(modelPath) >= 5 { @@ -360,7 +345,7 @@ func Create(ctx context.Context, h Interface) error { return nil } -func handleJobV2(ctx context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAttemptsJobIDs, workersStartChan chan<- workerStarterRequest) error { +func handleJobV2(_ context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAttemptsJobIDs, workersStartChan chan<- workerStarterRequest) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) ctx = telemetry.New(ctx, h, "hatchery.V2JobReceive", trace.AlwaysSample(), trace.SpanKindServer) ctx, end := telemetry.Span(ctx, "hatchery.V2JobReceive", telemetry.Tag(telemetry.TagWorkflow, j.WorkflowName), diff --git a/sdk/hatchery/hatchery_test.go b/sdk/hatchery/hatchery_test.go index abf06e1fc3..d9a577d897 100644 --- a/sdk/hatchery/hatchery_test.go +++ b/sdk/hatchery/hatchery_test.go @@ -19,6 +19,80 @@ import ( "github.com/ovh/cds/sdk/jws" ) +func TestCreateOneJob(t *testing.T) { + log.Factory = log.NewTestingWrapper(t) + ctx := context.TODO() + ctx, cancel := context.WithTimeout(ctx, 7*time.Second) + defer cancel() + ctrl1 := gomock.NewController(t) + ctrl2 := gomock.NewController(t) + + t.Cleanup(func() { + ctrl1.Finish() + ctrl2.Finish() + }) + + mockHatchery := mock_hatchery.NewMockInterface(ctrl1) + mockCDSClient := mock_cdsclient.NewMockInterface(ctrl2) + + grtn := sdk.NewGoRoutines(ctx) + hatcheryConfig := service.HatcheryCommonConfiguration{ + Name: t.Name(), + } + hatcheryConfig.Provision.MaxWorker = 1 + + hatcheryConfig.Provision.MaxAttemptsNumberBeforeFailure = 2 // decrease this value to speedup the test + + mockHatchery.EXPECT().Name().Return(t.Name()).AnyTimes() + mockHatchery.EXPECT().Type().Return(sdk.TypeHatchery).AnyTimes() + mockHatchery.EXPECT().Service().Return(&sdk.Service{}).AnyTimes() + mockHatchery.EXPECT().InitHatchery(gomock.Any()).Return(nil) + mockHatchery.EXPECT().Configuration().Return(hatcheryConfig).AnyTimes() + mockHatchery.EXPECT().GetGoRoutines().Return(grtn).AnyTimes() + mockHatchery.EXPECT().CDSClient().Return(mockCDSClient).AnyTimes() + mockHatchery.EXPECT().CDSClientV2().Return(nil).AnyTimes() + mockCDSClient.EXPECT().QueuePolling(gomock.Any(), grtn, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { + j := sdk.WorkflowNodeJobRun{ + ProjectID: 1, + ID: 777, + WorkflowNodeRunID: 1, + Status: sdk.StatusWaiting, + Job: sdk.ExecutedJob{ + Job: sdk.Job{}, + }, + Start: time.Now(), + } + + jobs <- j // Send the job a first time, it will trigger the first call on SpawnWorker + + <-ctx.Done() + return ctx.Err() + }, + ) + + m := &sdk.HatcheryPendingWorkerCreation{} + m.Init() + mockHatchery.EXPECT().GetMapPendingWorkerCreation().Return(m).Times(2) // two calls: call to QueuePolling and RemoveJobFromPendingWorkerCreation() in spawnWorkerForJob + + // This calls are expected for each job received in the channel + mockCDSClient.EXPECT().WorkerList(gomock.Any()).Return(nil, nil).AnyTimes() + mockHatchery.EXPECT().WorkersStarted(gomock.Any()).Return(nil, nil).AnyTimes() + mockHatchery.EXPECT().CanSpawn(gomock.Any(), gomock.Any(), "777", gomock.Any()).Return(true).AnyTimes() + mockCDSClient.EXPECT().QueueJobBook(gomock.Any(), "777").Return(sdk.WorkflowNodeJobRunBooked{}, nil).AnyTimes() + mockCDSClient.EXPECT().QueueJobSendSpawnInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + privateKey, err := jws.NewRandomRSAKey() + require.NoError(t, err) + mockHatchery.EXPECT().GetPrivateKey().Return(privateKey).AnyTimes() + + // Call to SpawnWorker regarding what append in "QueuePolling" + mockHatchery.EXPECT().SpawnWorker(gomock.Any(), gomock.Any()).Return(nil).Times(1) + + hatchery.Create(ctx, mockHatchery) + + <-ctx.Done() +} + func TestCreate(t *testing.T) { log.Factory = log.NewTestingWrapper(t) ctx := context.TODO() @@ -41,7 +115,6 @@ func TestCreate(t *testing.T) { } hatcheryConfig.Provision.MaxWorker = 1 - hatchery.CacheSpawnIDsTTL = 2 * time.Second // decrease this cache TTL to speedup the test hatcheryConfig.Provision.MaxAttemptsNumberBeforeFailure = 2 // decrease this value to speedup the test mockHatchery.EXPECT().Name().Return(t.Name()).AnyTimes() @@ -52,8 +125,8 @@ func TestCreate(t *testing.T) { mockHatchery.EXPECT().GetGoRoutines().Return(grtn).AnyTimes() mockHatchery.EXPECT().CDSClient().Return(mockCDSClient).AnyTimes() mockHatchery.EXPECT().CDSClientV2().Return(nil).AnyTimes() - mockCDSClient.EXPECT().QueuePolling(gomock.Any(), grtn, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { + mockCDSClient.EXPECT().QueuePolling(gomock.Any(), grtn, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { j := sdk.WorkflowNodeJobRun{ ProjectID: 1, ID: 666, @@ -67,22 +140,26 @@ func TestCreate(t *testing.T) { jobs <- j // Send the job a first time, it will trigger the first call on SpawnWorker time.Sleep(1 * time.Second) // Wait - jobs <- j // This one must be ignored with a log "already spawned in previous routine" + jobs <- j // This will start the workerStarter, but failed on book in the real life time.Sleep(2 * time.Second) // Wait - jobs <- j // This will trigger a second call on SpawnWorker + jobs <- j // This will trigger a second call on SpawnWorker should fail the job (nbAttempts: > 2) and call QueueSendResult time.Sleep(3 * time.Second) // Wait - jobs <- j // This shoud not trigger the call on SpawnWorker but should fail the job + jobs <- j // This shoud not trigger the call on SpawnWorker but should fail the job (nbAttempts: > 2) and call QueueSendResult <-ctx.Done() return ctx.Err() }, ) + m := &sdk.HatcheryPendingWorkerCreation{} + m.Init() + mockHatchery.EXPECT().GetMapPendingWorkerCreation().Return(m).Times(3) // Thred calls: call to QueuePolling and two RemoveJobFromPendingWorkerCreation() in spawnWorkerForJob + // This calls are expected for each job received in the channel mockCDSClient.EXPECT().WorkerList(gomock.Any()).Return(nil, nil).AnyTimes() mockHatchery.EXPECT().WorkersStarted(gomock.Any()).Return(nil, nil).AnyTimes() mockHatchery.EXPECT().CanSpawn(gomock.Any(), gomock.Any(), "666", gomock.Any()).Return(true).AnyTimes() - mockCDSClient.EXPECT().QueueJobBook(gomock.Any(), "666").Return(sdk.WorkflowNodeJobRunBooked{}, nil).AnyTimes() + mockCDSClient.EXPECT().QueueJobBook(gomock.Any(), "666").Return(sdk.WorkflowNodeJobRunBooked{}, nil).Times(2) mockCDSClient.EXPECT().QueueJobSendSpawnInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() privateKey, err := jws.NewRandomRSAKey() require.NoError(t, err) @@ -92,7 +169,7 @@ func TestCreate(t *testing.T) { mockHatchery.EXPECT().SpawnWorker(gomock.Any(), gomock.Any()).Return(nil).Times(2) // Expecing a call to QueueSendResult - mockCDSClient.EXPECT().QueueSendResult(gomock.Any(), int64(666), gomock.Any()).Return(nil) + mockCDSClient.EXPECT().QueueSendResult(gomock.Any(), int64(666), gomock.Any()).Return(nil).Times(2) hatchery.Create(ctx, mockHatchery) @@ -100,12 +177,6 @@ func TestCreate(t *testing.T) { } -func getMockLogger() *logrus.Logger { - log := logrus.New() - log.AddHook(&HookMock{}) - return log -} - type HookMock struct{} func (h *HookMock) Levels() []logrus.Level { diff --git a/sdk/hatchery/mock_hatchery/interface_mock.go b/sdk/hatchery/mock_hatchery/interface_mock.go index 55095d7968..6b43451a57 100644 --- a/sdk/hatchery/mock_hatchery/interface_mock.go +++ b/sdk/hatchery/mock_hatchery/interface_mock.go @@ -109,6 +109,20 @@ func (mr *MockInterfaceMockRecorder) GetGoRoutines() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGoRoutines", reflect.TypeOf((*MockInterface)(nil).GetGoRoutines)) } +// GetMapPendingWorkerCreation mocks base method. +func (m *MockInterface) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMapPendingWorkerCreation") + ret0, _ := ret[0].(*sdk.HatcheryPendingWorkerCreation) + return ret0 +} + +// GetMapPendingWorkerCreation indicates an expected call of GetMapPendingWorkerCreation. +func (mr *MockInterfaceMockRecorder) GetMapPendingWorkerCreation() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMapPendingWorkerCreation", reflect.TypeOf((*MockInterface)(nil).GetMapPendingWorkerCreation)) +} + // GetPrivateKey mocks base method. func (m *MockInterface) GetPrivateKey() *rsa.PrivateKey { m.ctrl.T.Helper() @@ -329,6 +343,20 @@ func (mr *MockInterfaceWithModelsMockRecorder) GetGoRoutines() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGoRoutines", reflect.TypeOf((*MockInterfaceWithModels)(nil).GetGoRoutines)) } +// GetMapPendingWorkerCreation mocks base method. +func (m *MockInterfaceWithModels) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMapPendingWorkerCreation") + ret0, _ := ret[0].(*sdk.HatcheryPendingWorkerCreation) + return ret0 +} + +// GetMapPendingWorkerCreation indicates an expected call of GetMapPendingWorkerCreation. +func (mr *MockInterfaceWithModelsMockRecorder) GetMapPendingWorkerCreation() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMapPendingWorkerCreation", reflect.TypeOf((*MockInterfaceWithModels)(nil).GetMapPendingWorkerCreation)) +} + // GetPrivateKey mocks base method. func (m *MockInterfaceWithModels) GetPrivateKey() *rsa.PrivateKey { m.ctrl.T.Helper() diff --git a/sdk/hatchery/starter.go b/sdk/hatchery/starter.go index a21e415028..10f7a6218d 100644 --- a/sdk/hatchery/starter.go +++ b/sdk/hatchery/starter.go @@ -122,6 +122,8 @@ func spawnWorkerForJob(ctx context.Context, h Interface, j workerStarterRequest) logStepInfo(ctx, "starting-worker", j.queued) + h.GetMapPendingWorkerCreation().RemoveJobFromPendingWorkerCreation(j.id) + maxProv := h.Configuration().Provision.MaxConcurrentProvisioning if maxProv < 1 { maxProv = defaultMaxProvisioning diff --git a/sdk/hatchery/types.go b/sdk/hatchery/types.go index 08d0f5a2d5..f5f4f20a64 100644 --- a/sdk/hatchery/types.go +++ b/sdk/hatchery/types.go @@ -171,6 +171,7 @@ type Interface interface { Serve(ctx context.Context) error GetPrivateKey() *rsa.PrivateKey GetGoRoutines() *sdk.GoRoutines + GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation GetRegion() string }