From c7ed4a5dcc2302766d3ec56900b2a43eb23fb647 Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Thu, 7 Dec 2023 10:16:54 +0000 Subject: [PATCH 1/7] fix(hatchery): do not add same job into workersStarter Signed-off-by: Yvonnick Esnault --- engine/hatchery/serve.go | 24 +++++ sdk/hatchery/hatchery.go | 25 +++--- sdk/hatchery/hatchery_test.go | 95 +++++++++++++++++--- sdk/hatchery/mock_hatchery/interface_mock.go | 76 ++++++++++++++++ sdk/hatchery/starter.go | 2 + sdk/hatchery/types.go | 3 + 6 files changed, 201 insertions(+), 24 deletions(-) diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 73d4629c3e..04df8806fc 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -32,6 +32,8 @@ type Common struct { Clientv2 cdsclient.HatcheryServiceClient mapServiceNextLineNumberMutex sync.Mutex mapServiceNextLineNumber map[string]int64 + mapSpawnJobRequest map[string]bool + mapSpawnJobRequestMutex *sync.Mutex } func (c *Common) MaxHeartbeat() int { @@ -65,6 +67,25 @@ func (c *Common) GetGoRoutines() *sdk.GoRoutines { return c.GoRoutines } +func (c *Common) SetJobInPendingWorkerCreation(id string) { + c.mapSpawnJobRequestMutex.Lock() + c.mapSpawnJobRequest[id] = true + c.mapSpawnJobRequestMutex.Unlock() +} + +func (c *Common) RemoveJobFromPendingWorkerCreation(id string) { + c.mapSpawnJobRequestMutex.Lock() + delete(c.mapSpawnJobRequest, id) + c.mapSpawnJobRequestMutex.Unlock() +} + +func (c *Common) IsJobAlreadyPendingWorkerCreation(id string) bool { + c.mapSpawnJobRequestMutex.Lock() + res := c.mapSpawnJobRequest[id] + c.mapSpawnJobRequestMutex.Unlock() + return res +} + // CommonServe start the HatcheryLocal server func (c *Common) CommonServe(ctx context.Context, h hatchery.Interface) error { log.Info(ctx, "%s> Starting service %s (%s)...", c.Name(), h.Configuration().Name, sdk.VERSION) @@ -231,6 +252,9 @@ func (c *Common) Init(ctx context.Context, h hatchery.Interface) error { } } + c.mapSpawnJobRequest = make(map[string]bool) + c.mapSpawnJobRequestMutex = new(sync.Mutex) + return c.initServiceLogger(ctx) } diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index 407f81de03..165ddb279c 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -26,7 +26,6 @@ var ( defaultMaxProvisioning = 10 models []sdk.Model defaultMaxAttemptsNumberBeforeFailure = 5 - CacheSpawnIDsTTL = 10 * time.Second CacheNbAttemptsIDsTTL = 1 * time.Hour ) @@ -86,9 +85,6 @@ func Create(ctx context.Context, h Interface) error { v2Runjobs := make(chan sdk.V2WorkflowRunJob, h.Configuration().Provision.MaxConcurrentProvisioning) errs := make(chan error, 1) - // Create a cache to keep in memory the jobID processed in the last 10s. - cacheSpawnIDs := cache.New(CacheSpawnIDsTTL, 2*CacheSpawnIDsTTL) - // Create a cache to only process each jobID only a number of attempts before force to fail the job cacheNbAttemptsIDs := &CacheNbAttemptsJobIDs{ cache: cache.New(CacheNbAttemptsIDsTTL, 2*CacheNbAttemptsIDsTTL), @@ -154,7 +150,7 @@ func Create(ctx context.Context, h Interface) error { log.Error(ctx, "error on h.WorkerModelsEnabled(): %v", errwm) } case j := <-v2Runjobs: - if err := handleJobV2(ctx, h, j, cacheNbAttemptsIDs, workersStartChan); err != nil { + if err := handleJobV2(h, j, cacheNbAttemptsIDs, workersStartChan); err != nil { log.ErrorWithStackTrace(ctx, err) } case j := <-wjobs: @@ -216,15 +212,12 @@ func Create(ctx context.Context, h Interface) error { } //Check if the jobs is concerned by a pending worker creation - if _, exist := cacheSpawnIDs.Get(strconv.FormatInt(j.ID, 10)); exist { + if h.IsJobAlreadyPendingWorkerCreation(strconv.FormatInt(j.ID, 10)) { log.Debug(currentCtx, "job %d already spawned in previous routine", j.ID) - endTrace("already spawned") + endTrace("already in worker creation process") continue } - //Before doing anything, push in cache - cacheSpawnIDs.SetDefault(strconv.FormatInt(j.ID, 10), j.ID) - //Check bookedBy current hatchery if j.BookedBy.ID != 0 { log.Debug(currentCtx, "hatchery> job %d is already booked", j.ID) @@ -270,7 +263,6 @@ func Create(ctx context.Context, h Interface) error { log.Debug(currentCtx, "cannot launch this job because it does not contains a region prerequisite and IgnoreJobWithNoRegion=true in hatchery configuration") canTakeJob = false } else if isWithModels { - // Test ascode model modelPath := strings.Split(jobModel, "/") if len(modelPath) >= 5 { @@ -349,6 +341,7 @@ func Create(ctx context.Context, h Interface) error { } logStepInfo(currentCtx, "processed", j.Queued) + h.SetJobInPendingWorkerCreation(strconv.FormatInt(j.ID, 10)) workersStartChan <- workerRequest case <-chanRegister: if err := workerRegister(ctx, hWithModels, workersStartChan); err != nil { @@ -360,7 +353,7 @@ func Create(ctx context.Context, h Interface) error { return nil } -func handleJobV2(ctx context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAttemptsJobIDs, workersStartChan chan<- workerStarterRequest) error { +func handleJobV2(h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAttemptsJobIDs, workersStartChan chan<- workerStarterRequest) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) ctx = telemetry.New(ctx, h, "hatchery.V2JobReceive", trace.AlwaysSample(), trace.SpanKindServer) ctx, end := telemetry.Span(ctx, "hatchery.V2JobReceive", telemetry.Tag(telemetry.TagWorkflow, j.WorkflowName), @@ -403,6 +396,13 @@ func handleJobV2(ctx context.Context, h Interface, j sdk.V2WorkflowRunJob, cache endTrace("no capacities") } + //Check if the jobs is concerned by a pending worker creation + if h.IsJobAlreadyPendingWorkerCreation(j.ID) { + log.Debug(ctx, "job %d already spawned in previous routine", j.ID) + endTrace("already in worker creation process") + return nil + } + workerRequest := workerStarterRequest{ ctx: ctx, cancel: cancel, @@ -453,6 +453,7 @@ func handleJobV2(ctx context.Context, h Interface, j sdk.V2WorkflowRunJob, cache } logStepInfo(ctx, "processed", j.Queued) + h.SetJobInPendingWorkerCreation(j.ID) workersStartChan <- workerRequest return nil } diff --git a/sdk/hatchery/hatchery_test.go b/sdk/hatchery/hatchery_test.go index abf06e1fc3..0a02821398 100644 --- a/sdk/hatchery/hatchery_test.go +++ b/sdk/hatchery/hatchery_test.go @@ -19,6 +19,80 @@ import ( "github.com/ovh/cds/sdk/jws" ) +func TestCreateOneJob(t *testing.T) { + log.Factory = log.NewTestingWrapper(t) + ctx := context.TODO() + ctx, cancel := context.WithTimeout(ctx, 7*time.Second) + defer cancel() + ctrl1 := gomock.NewController(t) + ctrl2 := gomock.NewController(t) + + t.Cleanup(func() { + ctrl1.Finish() + ctrl2.Finish() + }) + + mockHatchery := mock_hatchery.NewMockInterface(ctrl1) + mockCDSClient := mock_cdsclient.NewMockInterface(ctrl2) + + grtn := sdk.NewGoRoutines(ctx) + hatcheryConfig := service.HatcheryCommonConfiguration{ + Name: t.Name(), + } + hatcheryConfig.Provision.MaxWorker = 1 + + hatcheryConfig.Provision.MaxAttemptsNumberBeforeFailure = 2 // decrease this value to speedup the test + + mockHatchery.EXPECT().Name().Return(t.Name()).AnyTimes() + mockHatchery.EXPECT().Type().Return(sdk.TypeHatchery).AnyTimes() + mockHatchery.EXPECT().Service().Return(&sdk.Service{}).AnyTimes() + mockHatchery.EXPECT().InitHatchery(gomock.Any()).Return(nil) + mockHatchery.EXPECT().Configuration().Return(hatcheryConfig).AnyTimes() + mockHatchery.EXPECT().GetGoRoutines().Return(grtn).AnyTimes() + mockHatchery.EXPECT().CDSClient().Return(mockCDSClient).AnyTimes() + mockHatchery.EXPECT().CDSClientV2().Return(nil).AnyTimes() + mockCDSClient.EXPECT().QueuePolling(gomock.Any(), grtn, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { + j := sdk.WorkflowNodeJobRun{ + ProjectID: 1, + ID: 777, + WorkflowNodeRunID: 1, + Status: sdk.StatusWaiting, + Job: sdk.ExecutedJob{ + Job: sdk.Job{}, + }, + Start: time.Now(), + } + + jobs <- j // Send the job a first time, it will trigger the first call on SpawnWorker + + <-ctx.Done() + return ctx.Err() + }, + ) + + mockHatchery.EXPECT().IsJobAlreadyPendingWorkerCreation(gomock.Any()).Times(1) + mockHatchery.EXPECT().SetJobInPendingWorkerCreation(gomock.Any()).Times(1) + mockHatchery.EXPECT().RemoveJobFromPendingWorkerCreation(gomock.Any()).Times(1) + + // This calls are expected for each job received in the channel + mockCDSClient.EXPECT().WorkerList(gomock.Any()).Return(nil, nil).AnyTimes() + mockHatchery.EXPECT().WorkersStarted(gomock.Any()).Return(nil, nil).AnyTimes() + mockHatchery.EXPECT().CanSpawn(gomock.Any(), gomock.Any(), "777", gomock.Any()).Return(true).AnyTimes() + mockCDSClient.EXPECT().QueueJobBook(gomock.Any(), "777").Return(sdk.WorkflowNodeJobRunBooked{}, nil).AnyTimes() + mockCDSClient.EXPECT().QueueJobSendSpawnInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + privateKey, err := jws.NewRandomRSAKey() + require.NoError(t, err) + mockHatchery.EXPECT().GetPrivateKey().Return(privateKey).AnyTimes() + + // Call to SpawnWorker regarding what append in "QueuePolling" + mockHatchery.EXPECT().SpawnWorker(gomock.Any(), gomock.Any()).Return(nil).Times(1) + + hatchery.Create(ctx, mockHatchery) + + <-ctx.Done() +} + func TestCreate(t *testing.T) { log.Factory = log.NewTestingWrapper(t) ctx := context.TODO() @@ -41,7 +115,6 @@ func TestCreate(t *testing.T) { } hatcheryConfig.Provision.MaxWorker = 1 - hatchery.CacheSpawnIDsTTL = 2 * time.Second // decrease this cache TTL to speedup the test hatcheryConfig.Provision.MaxAttemptsNumberBeforeFailure = 2 // decrease this value to speedup the test mockHatchery.EXPECT().Name().Return(t.Name()).AnyTimes() @@ -67,22 +140,26 @@ func TestCreate(t *testing.T) { jobs <- j // Send the job a first time, it will trigger the first call on SpawnWorker time.Sleep(1 * time.Second) // Wait - jobs <- j // This one must be ignored with a log "already spawned in previous routine" + jobs <- j // This will start the workerStarter, but failed on book in the real life time.Sleep(2 * time.Second) // Wait - jobs <- j // This will trigger a second call on SpawnWorker + jobs <- j // This will trigger a second call on SpawnWorker should fail the job (nbAttempts: > 2) and call QueueSendResult time.Sleep(3 * time.Second) // Wait - jobs <- j // This shoud not trigger the call on SpawnWorker but should fail the job + jobs <- j // This shoud not trigger the call on SpawnWorker but should fail the job (nbAttempts: > 2) and call QueueSendResult <-ctx.Done() return ctx.Err() }, ) + mockHatchery.EXPECT().IsJobAlreadyPendingWorkerCreation(gomock.Any()).Times(4) + mockHatchery.EXPECT().SetJobInPendingWorkerCreation(gomock.Any()).Times(2) + mockHatchery.EXPECT().RemoveJobFromPendingWorkerCreation(gomock.Any()).Times(2) + // This calls are expected for each job received in the channel mockCDSClient.EXPECT().WorkerList(gomock.Any()).Return(nil, nil).AnyTimes() mockHatchery.EXPECT().WorkersStarted(gomock.Any()).Return(nil, nil).AnyTimes() mockHatchery.EXPECT().CanSpawn(gomock.Any(), gomock.Any(), "666", gomock.Any()).Return(true).AnyTimes() - mockCDSClient.EXPECT().QueueJobBook(gomock.Any(), "666").Return(sdk.WorkflowNodeJobRunBooked{}, nil).AnyTimes() + mockCDSClient.EXPECT().QueueJobBook(gomock.Any(), "666").Return(sdk.WorkflowNodeJobRunBooked{}, nil).Times(2) mockCDSClient.EXPECT().QueueJobSendSpawnInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() privateKey, err := jws.NewRandomRSAKey() require.NoError(t, err) @@ -92,7 +169,7 @@ func TestCreate(t *testing.T) { mockHatchery.EXPECT().SpawnWorker(gomock.Any(), gomock.Any()).Return(nil).Times(2) // Expecing a call to QueueSendResult - mockCDSClient.EXPECT().QueueSendResult(gomock.Any(), int64(666), gomock.Any()).Return(nil) + mockCDSClient.EXPECT().QueueSendResult(gomock.Any(), int64(666), gomock.Any()).Return(nil).Times(2) hatchery.Create(ctx, mockHatchery) @@ -100,12 +177,6 @@ func TestCreate(t *testing.T) { } -func getMockLogger() *logrus.Logger { - log := logrus.New() - log.AddHook(&HookMock{}) - return log -} - type HookMock struct{} func (h *HookMock) Levels() []logrus.Level { diff --git a/sdk/hatchery/mock_hatchery/interface_mock.go b/sdk/hatchery/mock_hatchery/interface_mock.go index 55095d7968..829147314d 100644 --- a/sdk/hatchery/mock_hatchery/interface_mock.go +++ b/sdk/hatchery/mock_hatchery/interface_mock.go @@ -151,6 +151,20 @@ func (mr *MockInterfaceMockRecorder) InitHatchery(ctx interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitHatchery", reflect.TypeOf((*MockInterface)(nil).InitHatchery), ctx) } +// IsJobAlreadyPendingWorkerCreation mocks base method. +func (m *MockInterface) IsJobAlreadyPendingWorkerCreation(id string) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsJobAlreadyPendingWorkerCreation", id) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsJobAlreadyPendingWorkerCreation indicates an expected call of IsJobAlreadyPendingWorkerCreation. +func (mr *MockInterfaceMockRecorder) IsJobAlreadyPendingWorkerCreation(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsJobAlreadyPendingWorkerCreation", reflect.TypeOf((*MockInterface)(nil).IsJobAlreadyPendingWorkerCreation), id) +} + // Name mocks base method. func (m *MockInterface) Name() string { m.ctrl.T.Helper() @@ -165,6 +179,18 @@ func (mr *MockInterfaceMockRecorder) Name() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockInterface)(nil).Name)) } +// RemoveJobFromPendingWorkerCreation mocks base method. +func (m *MockInterface) RemoveJobFromPendingWorkerCreation(id string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RemoveJobFromPendingWorkerCreation", id) +} + +// RemoveJobFromPendingWorkerCreation indicates an expected call of RemoveJobFromPendingWorkerCreation. +func (mr *MockInterfaceMockRecorder) RemoveJobFromPendingWorkerCreation(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveJobFromPendingWorkerCreation", reflect.TypeOf((*MockInterface)(nil).RemoveJobFromPendingWorkerCreation), id) +} + // Serve mocks base method. func (m *MockInterface) Serve(ctx context.Context) error { m.ctrl.T.Helper() @@ -193,6 +219,18 @@ func (mr *MockInterfaceMockRecorder) Service() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Service", reflect.TypeOf((*MockInterface)(nil).Service)) } +// SetJobInPendingWorkerCreation mocks base method. +func (m *MockInterface) SetJobInPendingWorkerCreation(id string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetJobInPendingWorkerCreation", id) +} + +// SetJobInPendingWorkerCreation indicates an expected call of SetJobInPendingWorkerCreation. +func (mr *MockInterfaceMockRecorder) SetJobInPendingWorkerCreation(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetJobInPendingWorkerCreation", reflect.TypeOf((*MockInterface)(nil).SetJobInPendingWorkerCreation), id) +} + // SpawnWorker mocks base method. func (m *MockInterface) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) error { m.ctrl.T.Helper() @@ -371,6 +409,20 @@ func (mr *MockInterfaceWithModelsMockRecorder) InitHatchery(ctx interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitHatchery", reflect.TypeOf((*MockInterfaceWithModels)(nil).InitHatchery), ctx) } +// IsJobAlreadyPendingWorkerCreation mocks base method. +func (m *MockInterfaceWithModels) IsJobAlreadyPendingWorkerCreation(id string) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsJobAlreadyPendingWorkerCreation", id) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsJobAlreadyPendingWorkerCreation indicates an expected call of IsJobAlreadyPendingWorkerCreation. +func (mr *MockInterfaceWithModelsMockRecorder) IsJobAlreadyPendingWorkerCreation(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsJobAlreadyPendingWorkerCreation", reflect.TypeOf((*MockInterfaceWithModels)(nil).IsJobAlreadyPendingWorkerCreation), id) +} + // ModelType mocks base method. func (m *MockInterfaceWithModels) ModelType() string { m.ctrl.T.Helper() @@ -413,6 +465,18 @@ func (mr *MockInterfaceWithModelsMockRecorder) NeedRegistration(ctx, model inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NeedRegistration", reflect.TypeOf((*MockInterfaceWithModels)(nil).NeedRegistration), ctx, model) } +// RemoveJobFromPendingWorkerCreation mocks base method. +func (m *MockInterfaceWithModels) RemoveJobFromPendingWorkerCreation(id string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RemoveJobFromPendingWorkerCreation", id) +} + +// RemoveJobFromPendingWorkerCreation indicates an expected call of RemoveJobFromPendingWorkerCreation. +func (mr *MockInterfaceWithModelsMockRecorder) RemoveJobFromPendingWorkerCreation(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveJobFromPendingWorkerCreation", reflect.TypeOf((*MockInterfaceWithModels)(nil).RemoveJobFromPendingWorkerCreation), id) +} + // Serve mocks base method. func (m *MockInterfaceWithModels) Serve(ctx context.Context) error { m.ctrl.T.Helper() @@ -441,6 +505,18 @@ func (mr *MockInterfaceWithModelsMockRecorder) Service() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Service", reflect.TypeOf((*MockInterfaceWithModels)(nil).Service)) } +// SetJobInPendingWorkerCreation mocks base method. +func (m *MockInterfaceWithModels) SetJobInPendingWorkerCreation(id string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetJobInPendingWorkerCreation", id) +} + +// SetJobInPendingWorkerCreation indicates an expected call of SetJobInPendingWorkerCreation. +func (mr *MockInterfaceWithModelsMockRecorder) SetJobInPendingWorkerCreation(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetJobInPendingWorkerCreation", reflect.TypeOf((*MockInterfaceWithModels)(nil).SetJobInPendingWorkerCreation), id) +} + // SpawnWorker mocks base method. func (m *MockInterfaceWithModels) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) error { m.ctrl.T.Helper() diff --git a/sdk/hatchery/starter.go b/sdk/hatchery/starter.go index a21e415028..ab2db0ec26 100644 --- a/sdk/hatchery/starter.go +++ b/sdk/hatchery/starter.go @@ -122,6 +122,8 @@ func spawnWorkerForJob(ctx context.Context, h Interface, j workerStarterRequest) logStepInfo(ctx, "starting-worker", j.queued) + h.RemoveJobFromPendingWorkerCreation(j.id) + maxProv := h.Configuration().Provision.MaxConcurrentProvisioning if maxProv < 1 { maxProv = defaultMaxProvisioning diff --git a/sdk/hatchery/types.go b/sdk/hatchery/types.go index 08d0f5a2d5..8266a526a4 100644 --- a/sdk/hatchery/types.go +++ b/sdk/hatchery/types.go @@ -171,6 +171,9 @@ type Interface interface { Serve(ctx context.Context) error GetPrivateKey() *rsa.PrivateKey GetGoRoutines() *sdk.GoRoutines + SetJobInPendingWorkerCreation(id string) + IsJobAlreadyPendingWorkerCreation(id string) bool + RemoveJobFromPendingWorkerCreation(id string) GetRegion() string } From b8b8822378d42f1e010fe7604bd8387f3e73ae4c Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Thu, 7 Dec 2023 11:17:21 +0000 Subject: [PATCH 2/7] feat: cr Signed-off-by: Yvonnick Esnault --- engine/hatchery/serve.go | 26 +---- sdk/cdsclient/client_queue.go | 7 +- sdk/cdsclient/client_queue_V2.go | 2 +- sdk/cdsclient/interface.go | 4 +- .../mock_cdsclient/interface_mock.go | 56 +++++----- sdk/hatchery.go | 30 +++++ sdk/hatchery/hatchery.go | 11 +- sdk/hatchery/mock_hatchery/interface_mock.go | 104 +++++------------- sdk/hatchery/starter.go | 2 +- sdk/hatchery/types.go | 4 +- 10 files changed, 107 insertions(+), 139 deletions(-) diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 04df8806fc..516dd0dabb 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -32,8 +32,7 @@ type Common struct { Clientv2 cdsclient.HatcheryServiceClient mapServiceNextLineNumberMutex sync.Mutex mapServiceNextLineNumber map[string]int64 - mapSpawnJobRequest map[string]bool - mapSpawnJobRequestMutex *sync.Mutex + mapPendingWorkerCreation *sdk.HatcheryPendingWorkerCreation } func (c *Common) MaxHeartbeat() int { @@ -67,23 +66,8 @@ func (c *Common) GetGoRoutines() *sdk.GoRoutines { return c.GoRoutines } -func (c *Common) SetJobInPendingWorkerCreation(id string) { - c.mapSpawnJobRequestMutex.Lock() - c.mapSpawnJobRequest[id] = true - c.mapSpawnJobRequestMutex.Unlock() -} - -func (c *Common) RemoveJobFromPendingWorkerCreation(id string) { - c.mapSpawnJobRequestMutex.Lock() - delete(c.mapSpawnJobRequest, id) - c.mapSpawnJobRequestMutex.Unlock() -} - -func (c *Common) IsJobAlreadyPendingWorkerCreation(id string) bool { - c.mapSpawnJobRequestMutex.Lock() - res := c.mapSpawnJobRequest[id] - c.mapSpawnJobRequestMutex.Unlock() - return res +func (c *Common) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation { + return c.mapPendingWorkerCreation } // CommonServe start the HatcheryLocal server @@ -252,8 +236,8 @@ func (c *Common) Init(ctx context.Context, h hatchery.Interface) error { } } - c.mapSpawnJobRequest = make(map[string]bool) - c.mapSpawnJobRequestMutex = new(sync.Mutex) + c.mapPendingWorkerCreation = &sdk.HatcheryPendingWorkerCreation{} + c.mapPendingWorkerCreation.Init() return c.initServiceLogger(ctx) } diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 96d2edbe4d..5ef934b205 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -42,7 +42,7 @@ func shrinkQueue(queue *sdk.WorkflowQueue, nbJobsToKeep int) time.Time { return t0 } -func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error { +func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error { jobsTicker := time.NewTicker(delay) // This goroutine call the SSE route @@ -117,6 +117,11 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, j shrinkQueue(&queue, cap(jobs)) for _, j := range queue { + id := strconv.FormatInt(j.ID, 10) + if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) { + continue + } + pendingWorkerCreation.SetJobInPendingWorkerCreation(id) jobs <- j } } diff --git a/sdk/cdsclient/client_queue_V2.go b/sdk/cdsclient/client_queue_V2.go index 71dace3bfe..1ea07d89a4 100644 --- a/sdk/cdsclient/client_queue_V2.go +++ b/sdk/cdsclient/client_queue_V2.go @@ -103,7 +103,7 @@ func (c *client) V2QueueGetJobRun(ctx context.Context, regionName, id string) (* return &job, nil } -func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error { +func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error { jobsTicker := time.NewTicker(delay) // This goroutine call the Websocket diff --git a/sdk/cdsclient/interface.go b/sdk/cdsclient/interface.go index 640b277e73..f851c25890 100644 --- a/sdk/cdsclient/interface.go +++ b/sdk/cdsclient/interface.go @@ -295,7 +295,7 @@ type ProjectVariablesClient interface { type V2QueueClient interface { V2QueueGetJobRun(ctx context.Context, regionName string, id string) (*sdk.V2WorkflowRunJob, error) - V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error + V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...RequestModifier) error V2QueueJobResult(ctx context.Context, region string, jobRunID string, result sdk.V2WorkflowRunJobResult) error V2QueueJobRunResultGet(ctx context.Context, regionName string, jobRunID string, runResultID string) (*sdk.V2WorkflowRunResult, error) V2QueueJobRunResultsGet(ctx context.Context, regionName string, jobRunID string) ([]sdk.V2WorkflowRunResult, error) @@ -311,7 +311,7 @@ type V2QueueClient interface { type QueueClient interface { QueueWorkflowNodeJobRun(mods ...RequestModifier) ([]sdk.WorkflowNodeJobRun, error) QueueCountWorkflowNodeJobRun(since *time.Time, until *time.Time, modelType string) (sdk.WorkflowNodeJobRunCount, error) - QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error + QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...RequestModifier) error QueueTakeJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (*sdk.WorkflowNodeJobRunData, error) QueueJobBook(ctx context.Context, id string) (sdk.WorkflowNodeJobRunBooked, error) QueueJobRelease(ctx context.Context, id string) error diff --git a/sdk/cdsclient/mock_cdsclient/interface_mock.go b/sdk/cdsclient/mock_cdsclient/interface_mock.go index 7f818f0f40..4d0b8533d7 100644 --- a/sdk/cdsclient/mock_cdsclient/interface_mock.go +++ b/sdk/cdsclient/mock_cdsclient/interface_mock.go @@ -2820,9 +2820,9 @@ func (mr *MockHatcheryServiceClientMockRecorder) V2QueueJobStepUpdate(ctx, regio } // V2QueuePolling mocks base method. -func (m *MockHatcheryServiceClient) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockHatcheryServiceClient) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, region, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -2832,9 +2832,9 @@ func (m *MockHatcheryServiceClient) V2QueuePolling(ctx context.Context, region s } // V2QueuePolling indicates an expected call of V2QueuePolling. -func (mr *MockHatcheryServiceClientMockRecorder) V2QueuePolling(ctx, region, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockHatcheryServiceClientMockRecorder) V2QueuePolling(ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, region, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2QueuePolling", reflect.TypeOf((*MockHatcheryServiceClient)(nil).V2QueuePolling), varargs...) } @@ -3934,9 +3934,9 @@ func (mr *MockV2QueueClientMockRecorder) V2QueueJobStepUpdate(ctx, regionName, i } // V2QueuePolling mocks base method. -func (m *MockV2QueueClient) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockV2QueueClient) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, region, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -3946,9 +3946,9 @@ func (m *MockV2QueueClient) V2QueuePolling(ctx context.Context, region string, g } // V2QueuePolling indicates an expected call of V2QueuePolling. -func (mr *MockV2QueueClientMockRecorder) V2QueuePolling(ctx, region, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockV2QueueClientMockRecorder) V2QueuePolling(ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, region, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2QueuePolling", reflect.TypeOf((*MockV2QueueClient)(nil).V2QueuePolling), varargs...) } @@ -4120,9 +4120,9 @@ func (mr *MockQueueClientMockRecorder) QueueJobTag(ctx, jobID, tags interface{}) } // QueuePolling mocks base method. -func (m *MockQueueClient) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockQueueClient) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -4132,9 +4132,9 @@ func (m *MockQueueClient) QueuePolling(ctx context.Context, goRoutines *sdk.GoRo } // QueuePolling indicates an expected call of QueuePolling. -func (mr *MockQueueClientMockRecorder) QueuePolling(ctx, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockQueueClientMockRecorder) QueuePolling(ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueuePolling", reflect.TypeOf((*MockQueueClient)(nil).QueuePolling), varargs...) } @@ -8892,9 +8892,9 @@ func (mr *MockInterfaceMockRecorder) QueueJobTag(ctx, jobID, tags interface{}) * } // QueuePolling mocks base method. -func (m *MockInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -8904,9 +8904,9 @@ func (m *MockInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRout } // QueuePolling indicates an expected call of QueuePolling. -func (mr *MockInterfaceMockRecorder) QueuePolling(ctx, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockInterfaceMockRecorder) QueuePolling(ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueuePolling", reflect.TypeOf((*MockInterface)(nil).QueuePolling), varargs...) } @@ -9901,9 +9901,9 @@ func (mr *MockInterfaceMockRecorder) V2QueueJobStepUpdate(ctx, regionName, id, s } // V2QueuePolling mocks base method. -func (m *MockInterface) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockInterface) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, region, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -9913,9 +9913,9 @@ func (m *MockInterface) V2QueuePolling(ctx context.Context, region string, goRou } // V2QueuePolling indicates an expected call of V2QueuePolling. -func (mr *MockInterfaceMockRecorder) V2QueuePolling(ctx, region, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockInterfaceMockRecorder) V2QueuePolling(ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, region, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2QueuePolling", reflect.TypeOf((*MockInterface)(nil).V2QueuePolling), varargs...) } @@ -11458,9 +11458,9 @@ func (mr *MockV2WorkerInterfaceMockRecorder) V2QueueJobStepUpdate(ctx, regionNam } // V2QueuePolling mocks base method. -func (m *MockV2WorkerInterface) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockV2WorkerInterface) V2QueuePolling(ctx context.Context, region string, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.V2WorkflowRunJob, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, region, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -11470,9 +11470,9 @@ func (m *MockV2WorkerInterface) V2QueuePolling(ctx context.Context, region strin } // V2QueuePolling indicates an expected call of V2QueuePolling. -func (mr *MockV2WorkerInterfaceMockRecorder) V2QueuePolling(ctx, region, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockV2WorkerInterfaceMockRecorder) V2QueuePolling(ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, region, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, region, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2QueuePolling", reflect.TypeOf((*MockV2WorkerInterface)(nil).V2QueuePolling), varargs...) } @@ -11890,9 +11890,9 @@ func (mr *MockWorkerInterfaceMockRecorder) QueueJobTag(ctx, jobID, tags interfac } // QueuePolling mocks base method. -func (m *MockWorkerInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { +func (m *MockWorkerInterface) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { m.ctrl.T.Helper() - varargs := []interface{}{ctx, goRoutines, jobs, errs, delay} + varargs := []interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay} for _, a := range ms { varargs = append(varargs, a) } @@ -11902,9 +11902,9 @@ func (m *MockWorkerInterface) QueuePolling(ctx context.Context, goRoutines *sdk. } // QueuePolling indicates an expected call of QueuePolling. -func (mr *MockWorkerInterfaceMockRecorder) QueuePolling(ctx, goRoutines, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { +func (mr *MockWorkerInterfaceMockRecorder) QueuePolling(ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay interface{}, ms ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, goRoutines, jobs, errs, delay}, ms...) + varargs := append([]interface{}{ctx, goRoutines, pendingWorkerCreation, jobs, errs, delay}, ms...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueuePolling", reflect.TypeOf((*MockWorkerInterface)(nil).QueuePolling), varargs...) } diff --git a/sdk/hatchery.go b/sdk/hatchery.go index caa8a8c744..5817b742a8 100644 --- a/sdk/hatchery.go +++ b/sdk/hatchery.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strconv" + "sync" "time" ) @@ -46,6 +47,35 @@ type Hatchery struct { Token string `json:"token,omitempty" db:"-" cli:"token,omitempty"` } +type HatcheryPendingWorkerCreation struct { + mapSpawnJobRequest map[string]bool + mapSpawnJobRequestMutex *sync.Mutex +} + +func (c *HatcheryPendingWorkerCreation) Init() { + c.mapSpawnJobRequest = make(map[string]bool) + c.mapSpawnJobRequestMutex = new(sync.Mutex) +} + +func (c *HatcheryPendingWorkerCreation) SetJobInPendingWorkerCreation(id string) { + c.mapSpawnJobRequestMutex.Lock() + c.mapSpawnJobRequest[id] = true + c.mapSpawnJobRequestMutex.Unlock() +} + +func (c *HatcheryPendingWorkerCreation) RemoveJobFromPendingWorkerCreation(id string) { + c.mapSpawnJobRequestMutex.Lock() + delete(c.mapSpawnJobRequest, id) + c.mapSpawnJobRequestMutex.Unlock() +} + +func (c *HatcheryPendingWorkerCreation) IsJobAlreadyPendingWorkerCreation(id string) bool { + c.mapSpawnJobRequestMutex.Lock() + res := c.mapSpawnJobRequest[id] + c.mapSpawnJobRequestMutex.Unlock() + return res +} + type HatcheryConfig map[string]interface{} func (hc HatcheryConfig) Value() (driver.Value, error) { diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index 165ddb279c..99b403cdce 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -94,7 +94,7 @@ func Create(ctx context.Context, h Interface) error { h.GetGoRoutines().Run(ctx, "V2QueuePolling", func(ctx context.Context) { log.Debug(ctx, "starting v2 queue polling") - if err := h.CDSClientV2().V2QueuePolling(ctx, h.GetRegion(), h.GetGoRoutines(), v2Runjobs, errs, 20*time.Second); err != nil { + if err := h.CDSClientV2().V2QueuePolling(ctx, h.GetRegion(), h.GetGoRoutines(), h.GetMapPendingWorkerCreation(), v2Runjobs, errs, 20*time.Second); err != nil { log.Error(ctx, "V2 Queues polling stopped: %v", err) } }) @@ -116,7 +116,7 @@ func Create(ctx context.Context, h Interface) error { ms = append(ms, cdsclient.Region(regions...)) } - if err := h.CDSClient().QueuePolling(ctx, h.GetGoRoutines(), wjobs, errs, 20*time.Second, ms...); err != nil { + if err := h.CDSClient().QueuePolling(ctx, h.GetGoRoutines(), h.GetMapPendingWorkerCreation(), wjobs, errs, 20*time.Second, ms...); err != nil { log.Error(ctx, "Queues polling stopped: %v", err) } }) @@ -212,7 +212,7 @@ func Create(ctx context.Context, h Interface) error { } //Check if the jobs is concerned by a pending worker creation - if h.IsJobAlreadyPendingWorkerCreation(strconv.FormatInt(j.ID, 10)) { + if h.GetMapPendingWorkerCreation().IsJobAlreadyPendingWorkerCreation(strconv.FormatInt(j.ID, 10)) { log.Debug(currentCtx, "job %d already spawned in previous routine", j.ID) endTrace("already in worker creation process") continue @@ -341,7 +341,6 @@ func Create(ctx context.Context, h Interface) error { } logStepInfo(currentCtx, "processed", j.Queued) - h.SetJobInPendingWorkerCreation(strconv.FormatInt(j.ID, 10)) workersStartChan <- workerRequest case <-chanRegister: if err := workerRegister(ctx, hWithModels, workersStartChan); err != nil { @@ -397,7 +396,7 @@ func handleJobV2(h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAtte } //Check if the jobs is concerned by a pending worker creation - if h.IsJobAlreadyPendingWorkerCreation(j.ID) { + if h.GetMapPendingWorkerCreation().IsJobAlreadyPendingWorkerCreation(j.ID) { log.Debug(ctx, "job %d already spawned in previous routine", j.ID) endTrace("already in worker creation process") return nil @@ -453,7 +452,7 @@ func handleJobV2(h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAtte } logStepInfo(ctx, "processed", j.Queued) - h.SetJobInPendingWorkerCreation(j.ID) + h.GetMapPendingWorkerCreation().SetJobInPendingWorkerCreation(j.ID) workersStartChan <- workerRequest return nil } diff --git a/sdk/hatchery/mock_hatchery/interface_mock.go b/sdk/hatchery/mock_hatchery/interface_mock.go index 829147314d..6b43451a57 100644 --- a/sdk/hatchery/mock_hatchery/interface_mock.go +++ b/sdk/hatchery/mock_hatchery/interface_mock.go @@ -109,6 +109,20 @@ func (mr *MockInterfaceMockRecorder) GetGoRoutines() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGoRoutines", reflect.TypeOf((*MockInterface)(nil).GetGoRoutines)) } +// GetMapPendingWorkerCreation mocks base method. +func (m *MockInterface) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMapPendingWorkerCreation") + ret0, _ := ret[0].(*sdk.HatcheryPendingWorkerCreation) + return ret0 +} + +// GetMapPendingWorkerCreation indicates an expected call of GetMapPendingWorkerCreation. +func (mr *MockInterfaceMockRecorder) GetMapPendingWorkerCreation() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMapPendingWorkerCreation", reflect.TypeOf((*MockInterface)(nil).GetMapPendingWorkerCreation)) +} + // GetPrivateKey mocks base method. func (m *MockInterface) GetPrivateKey() *rsa.PrivateKey { m.ctrl.T.Helper() @@ -151,20 +165,6 @@ func (mr *MockInterfaceMockRecorder) InitHatchery(ctx interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitHatchery", reflect.TypeOf((*MockInterface)(nil).InitHatchery), ctx) } -// IsJobAlreadyPendingWorkerCreation mocks base method. -func (m *MockInterface) IsJobAlreadyPendingWorkerCreation(id string) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsJobAlreadyPendingWorkerCreation", id) - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsJobAlreadyPendingWorkerCreation indicates an expected call of IsJobAlreadyPendingWorkerCreation. -func (mr *MockInterfaceMockRecorder) IsJobAlreadyPendingWorkerCreation(id interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsJobAlreadyPendingWorkerCreation", reflect.TypeOf((*MockInterface)(nil).IsJobAlreadyPendingWorkerCreation), id) -} - // Name mocks base method. func (m *MockInterface) Name() string { m.ctrl.T.Helper() @@ -179,18 +179,6 @@ func (mr *MockInterfaceMockRecorder) Name() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockInterface)(nil).Name)) } -// RemoveJobFromPendingWorkerCreation mocks base method. -func (m *MockInterface) RemoveJobFromPendingWorkerCreation(id string) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveJobFromPendingWorkerCreation", id) -} - -// RemoveJobFromPendingWorkerCreation indicates an expected call of RemoveJobFromPendingWorkerCreation. -func (mr *MockInterfaceMockRecorder) RemoveJobFromPendingWorkerCreation(id interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveJobFromPendingWorkerCreation", reflect.TypeOf((*MockInterface)(nil).RemoveJobFromPendingWorkerCreation), id) -} - // Serve mocks base method. func (m *MockInterface) Serve(ctx context.Context) error { m.ctrl.T.Helper() @@ -219,18 +207,6 @@ func (mr *MockInterfaceMockRecorder) Service() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Service", reflect.TypeOf((*MockInterface)(nil).Service)) } -// SetJobInPendingWorkerCreation mocks base method. -func (m *MockInterface) SetJobInPendingWorkerCreation(id string) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetJobInPendingWorkerCreation", id) -} - -// SetJobInPendingWorkerCreation indicates an expected call of SetJobInPendingWorkerCreation. -func (mr *MockInterfaceMockRecorder) SetJobInPendingWorkerCreation(id interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetJobInPendingWorkerCreation", reflect.TypeOf((*MockInterface)(nil).SetJobInPendingWorkerCreation), id) -} - // SpawnWorker mocks base method. func (m *MockInterface) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) error { m.ctrl.T.Helper() @@ -367,6 +343,20 @@ func (mr *MockInterfaceWithModelsMockRecorder) GetGoRoutines() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGoRoutines", reflect.TypeOf((*MockInterfaceWithModels)(nil).GetGoRoutines)) } +// GetMapPendingWorkerCreation mocks base method. +func (m *MockInterfaceWithModels) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMapPendingWorkerCreation") + ret0, _ := ret[0].(*sdk.HatcheryPendingWorkerCreation) + return ret0 +} + +// GetMapPendingWorkerCreation indicates an expected call of GetMapPendingWorkerCreation. +func (mr *MockInterfaceWithModelsMockRecorder) GetMapPendingWorkerCreation() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMapPendingWorkerCreation", reflect.TypeOf((*MockInterfaceWithModels)(nil).GetMapPendingWorkerCreation)) +} + // GetPrivateKey mocks base method. func (m *MockInterfaceWithModels) GetPrivateKey() *rsa.PrivateKey { m.ctrl.T.Helper() @@ -409,20 +399,6 @@ func (mr *MockInterfaceWithModelsMockRecorder) InitHatchery(ctx interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitHatchery", reflect.TypeOf((*MockInterfaceWithModels)(nil).InitHatchery), ctx) } -// IsJobAlreadyPendingWorkerCreation mocks base method. -func (m *MockInterfaceWithModels) IsJobAlreadyPendingWorkerCreation(id string) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsJobAlreadyPendingWorkerCreation", id) - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsJobAlreadyPendingWorkerCreation indicates an expected call of IsJobAlreadyPendingWorkerCreation. -func (mr *MockInterfaceWithModelsMockRecorder) IsJobAlreadyPendingWorkerCreation(id interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsJobAlreadyPendingWorkerCreation", reflect.TypeOf((*MockInterfaceWithModels)(nil).IsJobAlreadyPendingWorkerCreation), id) -} - // ModelType mocks base method. func (m *MockInterfaceWithModels) ModelType() string { m.ctrl.T.Helper() @@ -465,18 +441,6 @@ func (mr *MockInterfaceWithModelsMockRecorder) NeedRegistration(ctx, model inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NeedRegistration", reflect.TypeOf((*MockInterfaceWithModels)(nil).NeedRegistration), ctx, model) } -// RemoveJobFromPendingWorkerCreation mocks base method. -func (m *MockInterfaceWithModels) RemoveJobFromPendingWorkerCreation(id string) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveJobFromPendingWorkerCreation", id) -} - -// RemoveJobFromPendingWorkerCreation indicates an expected call of RemoveJobFromPendingWorkerCreation. -func (mr *MockInterfaceWithModelsMockRecorder) RemoveJobFromPendingWorkerCreation(id interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveJobFromPendingWorkerCreation", reflect.TypeOf((*MockInterfaceWithModels)(nil).RemoveJobFromPendingWorkerCreation), id) -} - // Serve mocks base method. func (m *MockInterfaceWithModels) Serve(ctx context.Context) error { m.ctrl.T.Helper() @@ -505,18 +469,6 @@ func (mr *MockInterfaceWithModelsMockRecorder) Service() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Service", reflect.TypeOf((*MockInterfaceWithModels)(nil).Service)) } -// SetJobInPendingWorkerCreation mocks base method. -func (m *MockInterfaceWithModels) SetJobInPendingWorkerCreation(id string) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetJobInPendingWorkerCreation", id) -} - -// SetJobInPendingWorkerCreation indicates an expected call of SetJobInPendingWorkerCreation. -func (mr *MockInterfaceWithModelsMockRecorder) SetJobInPendingWorkerCreation(id interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetJobInPendingWorkerCreation", reflect.TypeOf((*MockInterfaceWithModels)(nil).SetJobInPendingWorkerCreation), id) -} - // SpawnWorker mocks base method. func (m *MockInterfaceWithModels) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) error { m.ctrl.T.Helper() diff --git a/sdk/hatchery/starter.go b/sdk/hatchery/starter.go index ab2db0ec26..10f7a6218d 100644 --- a/sdk/hatchery/starter.go +++ b/sdk/hatchery/starter.go @@ -122,7 +122,7 @@ func spawnWorkerForJob(ctx context.Context, h Interface, j workerStarterRequest) logStepInfo(ctx, "starting-worker", j.queued) - h.RemoveJobFromPendingWorkerCreation(j.id) + h.GetMapPendingWorkerCreation().RemoveJobFromPendingWorkerCreation(j.id) maxProv := h.Configuration().Provision.MaxConcurrentProvisioning if maxProv < 1 { diff --git a/sdk/hatchery/types.go b/sdk/hatchery/types.go index 8266a526a4..f5f4f20a64 100644 --- a/sdk/hatchery/types.go +++ b/sdk/hatchery/types.go @@ -171,9 +171,7 @@ type Interface interface { Serve(ctx context.Context) error GetPrivateKey() *rsa.PrivateKey GetGoRoutines() *sdk.GoRoutines - SetJobInPendingWorkerCreation(id string) - IsJobAlreadyPendingWorkerCreation(id string) bool - RemoveJobFromPendingWorkerCreation(id string) + GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation GetRegion() string } From 37cae4ce8b5266eb2a1b95d1ca34c51354c05703 Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Thu, 7 Dec 2023 11:21:27 +0000 Subject: [PATCH 3/7] fix: cr Signed-off-by: Yvonnick Esnault --- sdk/hatchery/hatchery.go | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index 99b403cdce..b5948a810b 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -211,13 +211,6 @@ func Create(ctx context.Context, h Interface) error { stats.Record(currentCtx, GetMetrics().JobsWebsocket.M(1)) } - //Check if the jobs is concerned by a pending worker creation - if h.GetMapPendingWorkerCreation().IsJobAlreadyPendingWorkerCreation(strconv.FormatInt(j.ID, 10)) { - log.Debug(currentCtx, "job %d already spawned in previous routine", j.ID) - endTrace("already in worker creation process") - continue - } - //Check bookedBy current hatchery if j.BookedBy.ID != 0 { log.Debug(currentCtx, "hatchery> job %d is already booked", j.ID) @@ -352,7 +345,7 @@ func Create(ctx context.Context, h Interface) error { return nil } -func handleJobV2(h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAttemptsJobIDs, workersStartChan chan<- workerStarterRequest) error { +func handleJobV2(_ context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAttemptsJobIDs, workersStartChan chan<- workerStarterRequest) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) ctx = telemetry.New(ctx, h, "hatchery.V2JobReceive", trace.AlwaysSample(), trace.SpanKindServer) ctx, end := telemetry.Span(ctx, "hatchery.V2JobReceive", telemetry.Tag(telemetry.TagWorkflow, j.WorkflowName), @@ -395,13 +388,6 @@ func handleJobV2(h Interface, j sdk.V2WorkflowRunJob, cacheAttempts *CacheNbAtte endTrace("no capacities") } - //Check if the jobs is concerned by a pending worker creation - if h.GetMapPendingWorkerCreation().IsJobAlreadyPendingWorkerCreation(j.ID) { - log.Debug(ctx, "job %d already spawned in previous routine", j.ID) - endTrace("already in worker creation process") - return nil - } - workerRequest := workerStarterRequest{ ctx: ctx, cancel: cancel, From d46957f72f4ce4902d490aadcf5b124488a3245c Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Thu, 7 Dec 2023 11:21:54 +0000 Subject: [PATCH 4/7] fix: cr Signed-off-by: Yvonnick Esnault --- sdk/hatchery/hatchery.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index b5948a810b..1942964381 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -438,7 +438,6 @@ func handleJobV2(_ context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAt } logStepInfo(ctx, "processed", j.Queued) - h.GetMapPendingWorkerCreation().SetJobInPendingWorkerCreation(j.ID) workersStartChan <- workerRequest return nil } From 974b626ff1e6e0562382b8c23b847f028865d78f Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Thu, 7 Dec 2023 12:18:50 +0000 Subject: [PATCH 5/7] fix: cr Signed-off-by: Yvonnick Esnault --- engine/hatchery/serve.go | 7 ++++--- sdk/cdsclient/client_queue.go | 13 ++++++++++--- sdk/cdsclient/client_queue_V2.go | 17 ++++++++++++++--- sdk/hatchery/hatchery.go | 2 +- sdk/hatchery/hatchery_test.go | 20 ++++++++++---------- 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 516dd0dabb..c36032d376 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -67,6 +67,10 @@ func (c *Common) GetGoRoutines() *sdk.GoRoutines { } func (c *Common) GetMapPendingWorkerCreation() *sdk.HatcheryPendingWorkerCreation { + if c.mapPendingWorkerCreation == nil { + c.mapPendingWorkerCreation = &sdk.HatcheryPendingWorkerCreation{} + c.mapPendingWorkerCreation.Init() + } return c.mapPendingWorkerCreation } @@ -236,9 +240,6 @@ func (c *Common) Init(ctx context.Context, h hatchery.Interface) error { } } - c.mapPendingWorkerCreation = &sdk.HatcheryPendingWorkerCreation{} - c.mapPendingWorkerCreation.Init() - return c.initServiceLogger(ctx) } diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 5ef934b205..293217a7d7 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ovh/cds/sdk" + "github.com/rockbears/log" ) // shrinkQueue is used to shrink the polled queue 200% of the channel capacity (l) @@ -115,13 +116,19 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, p fmt.Println("Jobs Queue size: ", len(queue)) } - shrinkQueue(&queue, cap(jobs)) - for _, j := range queue { - id := strconv.FormatInt(j.ID, 10) + queueFiltered := sdk.WorkflowQueue{} + for _, job := range queue { + id := strconv.FormatInt(job.ID, 10) if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) { + log.Debug(ctx, "skipping job %s", id) continue } pendingWorkerCreation.SetJobInPendingWorkerCreation(id) + queueFiltered = append(queueFiltered, job) + } + + shrinkQueue(&queueFiltered, cap(jobs)) + for _, j := range queueFiltered { jobs <- j } } diff --git a/sdk/cdsclient/client_queue_V2.go b/sdk/cdsclient/client_queue_V2.go index 1ea07d89a4..0cc26ca4f9 100644 --- a/sdk/cdsclient/client_queue_V2.go +++ b/sdk/cdsclient/client_queue_V2.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ovh/cds/sdk" + "github.com/rockbears/log" ) func (c *client) V2QueueJobStepUpdate(ctx context.Context, regionName string, jobRunID string, stepsStatus sdk.JobStepsStatus) error { @@ -161,12 +162,22 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin fmt.Println("Jobs Queue size: ", len(queue)) } + queueFiltered := []sdk.V2WorkflowRunJob{} + for _, job := range queue { + if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(job.ID) { + log.Debug(ctx, "skipping job %s", job.ID) + continue + } + pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID) + queueFiltered = append(queueFiltered, job) + } + max := cap(jobs) * 2 - if len(queue) < max { - max = len(queue) + if len(queueFiltered) < max { + max = len(queueFiltered) } for i := 0; i < max; i++ { - jobs <- queue[i] + jobs <- queueFiltered[i] } } diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index 1942964381..5ecad137c0 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -150,7 +150,7 @@ func Create(ctx context.Context, h Interface) error { log.Error(ctx, "error on h.WorkerModelsEnabled(): %v", errwm) } case j := <-v2Runjobs: - if err := handleJobV2(h, j, cacheNbAttemptsIDs, workersStartChan); err != nil { + if err := handleJobV2(ctx, h, j, cacheNbAttemptsIDs, workersStartChan); err != nil { log.ErrorWithStackTrace(ctx, err) } case j := <-wjobs: diff --git a/sdk/hatchery/hatchery_test.go b/sdk/hatchery/hatchery_test.go index 0a02821398..d9a577d897 100644 --- a/sdk/hatchery/hatchery_test.go +++ b/sdk/hatchery/hatchery_test.go @@ -51,8 +51,8 @@ func TestCreateOneJob(t *testing.T) { mockHatchery.EXPECT().GetGoRoutines().Return(grtn).AnyTimes() mockHatchery.EXPECT().CDSClient().Return(mockCDSClient).AnyTimes() mockHatchery.EXPECT().CDSClientV2().Return(nil).AnyTimes() - mockCDSClient.EXPECT().QueuePolling(gomock.Any(), grtn, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { + mockCDSClient.EXPECT().QueuePolling(gomock.Any(), grtn, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { j := sdk.WorkflowNodeJobRun{ ProjectID: 1, ID: 777, @@ -71,9 +71,9 @@ func TestCreateOneJob(t *testing.T) { }, ) - mockHatchery.EXPECT().IsJobAlreadyPendingWorkerCreation(gomock.Any()).Times(1) - mockHatchery.EXPECT().SetJobInPendingWorkerCreation(gomock.Any()).Times(1) - mockHatchery.EXPECT().RemoveJobFromPendingWorkerCreation(gomock.Any()).Times(1) + m := &sdk.HatcheryPendingWorkerCreation{} + m.Init() + mockHatchery.EXPECT().GetMapPendingWorkerCreation().Return(m).Times(2) // two calls: call to QueuePolling and RemoveJobFromPendingWorkerCreation() in spawnWorkerForJob // This calls are expected for each job received in the channel mockCDSClient.EXPECT().WorkerList(gomock.Any()).Return(nil, nil).AnyTimes() @@ -125,8 +125,8 @@ func TestCreate(t *testing.T) { mockHatchery.EXPECT().GetGoRoutines().Return(grtn).AnyTimes() mockHatchery.EXPECT().CDSClient().Return(mockCDSClient).AnyTimes() mockHatchery.EXPECT().CDSClientV2().Return(nil).AnyTimes() - mockCDSClient.EXPECT().QueuePolling(gomock.Any(), grtn, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, goRoutines *sdk.GoRoutines, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { + mockCDSClient.EXPECT().QueuePolling(gomock.Any(), grtn, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, goRoutines *sdk.GoRoutines, pendingWorkerCreation *sdk.HatcheryPendingWorkerCreation, jobs chan<- sdk.WorkflowNodeJobRun, errs chan<- error, delay time.Duration, ms ...cdsclient.RequestModifier) error { j := sdk.WorkflowNodeJobRun{ ProjectID: 1, ID: 666, @@ -151,9 +151,9 @@ func TestCreate(t *testing.T) { }, ) - mockHatchery.EXPECT().IsJobAlreadyPendingWorkerCreation(gomock.Any()).Times(4) - mockHatchery.EXPECT().SetJobInPendingWorkerCreation(gomock.Any()).Times(2) - mockHatchery.EXPECT().RemoveJobFromPendingWorkerCreation(gomock.Any()).Times(2) + m := &sdk.HatcheryPendingWorkerCreation{} + m.Init() + mockHatchery.EXPECT().GetMapPendingWorkerCreation().Return(m).Times(3) // Thred calls: call to QueuePolling and two RemoveJobFromPendingWorkerCreation() in spawnWorkerForJob // This calls are expected for each job received in the channel mockCDSClient.EXPECT().WorkerList(gomock.Any()).Return(nil, nil).AnyTimes() From be6e9496a8baaabb3c5a1838aceb39e29ea14058 Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Thu, 7 Dec 2023 12:21:15 +0000 Subject: [PATCH 6/7] fix: cr Signed-off-by: Yvonnick Esnault --- sdk/hatchery.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/hatchery.go b/sdk/hatchery.go index 5817b742a8..a265eaed95 100644 --- a/sdk/hatchery.go +++ b/sdk/hatchery.go @@ -48,18 +48,18 @@ type Hatchery struct { } type HatcheryPendingWorkerCreation struct { - mapSpawnJobRequest map[string]bool + mapSpawnJobRequest map[string]struct{} mapSpawnJobRequestMutex *sync.Mutex } func (c *HatcheryPendingWorkerCreation) Init() { - c.mapSpawnJobRequest = make(map[string]bool) + c.mapSpawnJobRequest = make(map[string]struct{}) c.mapSpawnJobRequestMutex = new(sync.Mutex) } func (c *HatcheryPendingWorkerCreation) SetJobInPendingWorkerCreation(id string) { c.mapSpawnJobRequestMutex.Lock() - c.mapSpawnJobRequest[id] = true + c.mapSpawnJobRequest[id] = struct{}{} c.mapSpawnJobRequestMutex.Unlock() } @@ -71,9 +71,9 @@ func (c *HatcheryPendingWorkerCreation) RemoveJobFromPendingWorkerCreation(id st func (c *HatcheryPendingWorkerCreation) IsJobAlreadyPendingWorkerCreation(id string) bool { c.mapSpawnJobRequestMutex.Lock() - res := c.mapSpawnJobRequest[id] + _, has := c.mapSpawnJobRequest[id] c.mapSpawnJobRequestMutex.Unlock() - return res + return has } type HatcheryConfig map[string]interface{} From 8922ad1bb5a09241132d1fcc02414de8b1ae485d Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Thu, 7 Dec 2023 14:36:27 +0000 Subject: [PATCH 7/7] fix: cr Signed-off-by: Yvonnick Esnault --- sdk/cdsclient/client_queue.go | 8 ++++++++ sdk/cdsclient/client_queue_V2.go | 6 +++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 293217a7d7..e95fd43238 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -87,6 +87,14 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, p // push the job in the channel if job.Status == sdk.StatusWaiting && job.BookedBy.Name == "" { job.Header["WS"] = "true" + + id := strconv.FormatInt(job.ID, 10) + if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) { + log.Debug(ctx, "skipping job %s", id) + continue + } + pendingWorkerCreation.SetJobInPendingWorkerCreation(id) + jobs <- *job } } diff --git a/sdk/cdsclient/client_queue_V2.go b/sdk/cdsclient/client_queue_V2.go index 0cc26ca4f9..53be1e346f 100644 --- a/sdk/cdsclient/client_queue_V2.go +++ b/sdk/cdsclient/client_queue_V2.go @@ -136,6 +136,11 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin } // push the job in the channel if j.Status == sdk.StatusWaiting { + if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(wsEvent.Event.JobRunID) { + log.Debug(ctx, "skipping job %s", wsEvent.Event.JobRunID) + continue + } + pendingWorkerCreation.SetJobInPendingWorkerCreation(wsEvent.Event.JobRunID) jobs <- *j } case <-jobsTicker.C: @@ -179,7 +184,6 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin for i := 0; i < max; i++ { jobs <- queueFiltered[i] } - } } }