Skip to content

Commit a5e7e0e

Browse files
authored
fix(api,hatchery): take care of booked JobID (#5156)
* chore: jobID is mandatory on worker (registerOnly false) Signed-off-by: Yvonnick Esnault <yvonnick.esnault@corp.ovh.com>
1 parent 303394d commit a5e7e0e

File tree

12 files changed

+183
-59
lines changed

12 files changed

+183
-59
lines changed

engine/api/worker/registration.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,13 @@ type TakeForm struct {
5959
// RegisterWorker Register new worker
6060
func RegisterWorker(ctx context.Context, db gorp.SqlExecutor, store cache.Store, spawnArgs hatchery.SpawnArguments, hatcheryID int64, consumer *sdk.AuthConsumer, registrationForm sdk.WorkerRegistrationForm) (*sdk.Worker, error) {
6161
if spawnArgs.WorkerName == "" {
62-
return nil, sdk.WithStack(sdk.ErrWrongRequest)
62+
return nil, sdk.NewErrorFrom(sdk.ErrWrongRequest, "unauthorized to register a worker without a name")
6363
}
64+
65+
if !spawnArgs.RegisterOnly && spawnArgs.JobID == 0 {
66+
return nil, sdk.NewErrorFrom(sdk.ErrWrongRequest, "unauthorized to register a worker for a job without a JobID")
67+
}
68+
6469
var model *sdk.Model
6570
if spawnArgs.Model != nil {
6671
// Load Model
@@ -95,6 +100,9 @@ func RegisterWorker(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
95100
if model != nil {
96101
w.ModelID = &spawnArgs.Model.ID
97102
}
103+
if spawnArgs.JobID > 0 {
104+
w.JobRunID = &spawnArgs.JobID
105+
}
98106

99107
w.Uptodate = registrationForm.Version == sdk.VERSION
100108

engine/api/worker/worker_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,18 @@ func TestRegister(t *testing.T) {
9595
assert.Equal(t, w.ID, wk.ID)
9696
assert.Equal(t, w.ModelID, wk.ModelID)
9797
assert.Equal(t, w.HatcheryID, wk.HatcheryID)
98+
99+
// try to register a worker for a job, without a JobID
100+
w2, err := worker.RegisterWorker(context.TODO(), db, store, hatchery.SpawnArguments{
101+
HatcheryName: h.Name,
102+
Model: m,
103+
WorkerName: sdk.RandomString(10),
104+
}, h.ID, c, sdk.WorkerRegistrationForm{
105+
Arch: runtime.GOARCH,
106+
OS: runtime.GOOS,
107+
BinaryCapabilities: []string{"bash"},
108+
})
109+
110+
test.Error(t, err)
111+
assert.Nil(t, w2)
98112
}

engine/api/worker_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"github.com/ovh/cds/sdk/log"
2121
)
2222

23-
func RegisterWorker(t *testing.T, api *API, groupID int64, existingWorkerModelName string) (*sdk.Worker, string) {
23+
func RegisterWorker(t *testing.T, api *API, groupID int64, existingWorkerModelName string, jobID int64, registerOnly bool) (*sdk.Worker, string) {
2424
model, err := workermodel.LoadByNameAndGroupID(api.mustDB(), existingWorkerModelName, groupID)
2525
if err != nil {
2626
t.Fatalf("RegisterWorker> Error getting worker model : %s", err)
@@ -42,6 +42,8 @@ func RegisterWorker(t *testing.T, api *API, groupID int64, existingWorkerModelNa
4242
HatcheryName: hSrv.Name,
4343
Model: model,
4444
WorkerName: hSrv.Name + "-worker",
45+
JobID: jobID,
46+
RegisterOnly: registerOnly,
4547
})
4648
test.NoError(t, err)
4749
assert.NotNil(t, hConsumer)
@@ -112,7 +114,7 @@ func TestPostRegisterWorkerHandler(t *testing.T) {
112114

113115
model := LoadOrCreateWorkerModel(t, api, g.ID, "Test1")
114116

115-
_, workerJWT := RegisterWorker(t, api, g.ID, model.Name)
117+
_, workerJWT := RegisterWorker(t, api, g.ID, model.Name, 0, true)
116118

117119
uri := api.Router.GetRoute("POST", api.postRefreshWorkerHandler, nil)
118120
test.NotEmpty(t, uri)
@@ -132,3 +134,44 @@ func TestPostRegisterWorkerHandler(t *testing.T) {
132134
api.Router.Mux.ServeHTTP(rec, req)
133135
assert.Equal(t, 204, rec.Code)
134136
}
137+
138+
// TestPostInvalidRegister tests to register a worker for a job, without a JobID
139+
func TestPostInvalidRegister(t *testing.T) {
140+
api, _, _, end := newTestAPI(t)
141+
defer end()
142+
143+
g := LoadSharedInfraGroup(t, api)
144+
145+
model := LoadOrCreateWorkerModel(t, api, g.ID, "Test2")
146+
147+
hSrv, hPrivKey, hConsumer, _ := assets.InsertHatchery(t, api.mustDB(), *g)
148+
149+
hPubKey, err := jws.ExportPublicKey(hPrivKey)
150+
if err != nil {
151+
t.Fatalf("RegisterWorker> Error exporting public key : %s", err)
152+
}
153+
log.Debug("hatchery public key is %s", string(hPubKey))
154+
155+
jwt, err := hatchery.NewWorkerToken(hSrv.Name, hPrivKey, time.Now().Add(time.Hour), hatchery.SpawnArguments{
156+
HatcheryName: hSrv.Name,
157+
Model: model,
158+
WorkerName: hSrv.Name + "-worker",
159+
JobID: 0,
160+
RegisterOnly: false,
161+
})
162+
test.NoError(t, err)
163+
assert.NotNil(t, hConsumer)
164+
165+
uri := api.Router.GetRoute("POST", api.postRegisterWorkerHandler, nil)
166+
test.NotEmpty(t, uri)
167+
req := assets.NewJWTAuthentifiedRequest(t, jwt, "POST", uri, sdk.WorkerRegistrationForm{
168+
Arch: runtime.GOARCH,
169+
OS: runtime.GOOS,
170+
Version: sdk.VERSION,
171+
})
172+
173+
//Do the request
174+
rec := httptest.NewRecorder()
175+
api.Router.Mux.ServeHTTP(rec, req)
176+
assert.Equal(t, 401, rec.Code)
177+
}

engine/api/workflow_queue.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
6060
return err
6161
}
6262

63+
if wk.JobRunID == nil || *wk.JobRunID != id {
64+
return sdk.NewErrorFrom(sdk.ErrForbidden, "unauthorized to take this job. booked:%d vs asked:%d", wk.JobRunID, id)
65+
}
66+
6367
p, err := project.LoadProjectByNodeJobRunID(ctx, api.mustDB(), api.Cache, id, project.LoadOptions.WithVariables, project.LoadOptions.WithClearKeys)
6468
if err != nil {
6569
return sdk.WrapError(err, "cannot load project by nodeJobRunID: %d", id)

engine/api/workflow_queue_test.go

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -268,13 +268,13 @@ func testGetWorkflowJobAsRegularUser(t *testing.T, api *API, router *Router, jwt
268268

269269
jobs := []sdk.WorkflowNodeJobRun{}
270270
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &jobs))
271-
assert.True(t, len(jobs) >= 1)
271+
require.True(t, len(jobs) >= 1)
272272

273273
if t.Failed() {
274274
t.FailNow()
275275
}
276276

277-
ctx.job = &jobs[0]
277+
ctx.job = &jobs[len(jobs)-1]
278278
}
279279

280280
func testGetWorkflowJobAsWorker(t *testing.T, api *API, router *Router, ctx *testRunWorkflowCtx) {
@@ -290,7 +290,7 @@ func testGetWorkflowJobAsWorker(t *testing.T, api *API, router *Router, ctx *tes
290290

291291
jobs := []sdk.WorkflowNodeJobRun{}
292292
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &jobs))
293-
assert.Len(t, jobs, 1)
293+
require.Len(t, jobs, 1)
294294

295295
if t.Failed() {
296296
t.FailNow()
@@ -312,7 +312,7 @@ func testGetWorkflowJobAsHatchery(t *testing.T, api *API, router *Router, ctx *t
312312

313313
jobs := []sdk.WorkflowNodeJobRun{}
314314
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &jobs))
315-
assert.Len(t, jobs, 1)
315+
require.Len(t, jobs, 1)
316316

317317
if t.Failed() {
318318
t.FailNow()
@@ -327,7 +327,11 @@ func testRegisterWorker(t *testing.T, api *API, router *Router, ctx *testRunWork
327327
t.Fatalf("Error getting group: %+v", err)
328328
}
329329
model := LoadOrCreateWorkerModel(t, api, g.ID, "Test1")
330-
w, workerJWT := RegisterWorker(t, api, g.ID, model.Name)
330+
var jobID int64
331+
if ctx.job != nil {
332+
jobID = ctx.job.ID
333+
}
334+
w, workerJWT := RegisterWorker(t, api, g.ID, model.Name, jobID, jobID == 0)
331335
ctx.workerToken = workerJWT
332336
ctx.worker = w
333337
ctx.model = model
@@ -340,17 +344,31 @@ func testRegisterHatchery(t *testing.T, api *API, router *Router, ctx *testRunWo
340344
}
341345

342346
func TestGetWorkflowJobQueueHandler(t *testing.T) {
343-
api, _, router, end := newTestAPI(t)
347+
api, db, router, end := newTestAPI(t)
344348
defer end()
345349

350+
// delete all existing workers
351+
workers, err := worker.LoadAll(context.TODO(), db)
352+
test.NoError(t, err)
353+
for _, w := range workers {
354+
worker.Delete(db, w.ID)
355+
}
356+
357+
// remove all jobs in queue
358+
filterClean := workflow.NewQueueFilter()
359+
nrj, _ := workflow.LoadNodeJobRunQueue(context.TODO(), db, api.Cache, filterClean)
360+
for _, j := range nrj {
361+
_ = workflow.DeleteNodeJobRuns(db, j.WorkflowNodeRunID)
362+
}
363+
346364
_, jwt := assets.InsertAdminUser(t, api.mustDB())
347365
t.Log("checkin as a user")
348366

349367
ctx := testRunWorkflow(t, api, router)
350368
testGetWorkflowJobAsRegularUser(t, api, router, jwt, &ctx)
351369
assert.NotNil(t, ctx.job)
352370

353-
t.Log("checkin as a worker")
371+
t.Logf("checkin as a worker jobId:%d", ctx.job.ID)
354372

355373
testGetWorkflowJobAsWorker(t, api, router, &ctx)
356374
assert.NotNil(t, ctx.job)
@@ -459,6 +477,46 @@ func Test_postTakeWorkflowJobHandler(t *testing.T) {
459477
assert.NotEmpty(t, run.HatcheryName)
460478
}
461479

480+
func Test_postTakeWorkflowInvalidJobHandler(t *testing.T) {
481+
api, _, router, end := newTestAPI(t)
482+
defer end()
483+
ctx := testRunWorkflow(t, api, router)
484+
testGetWorkflowJobAsWorker(t, api, router, &ctx)
485+
require.NotNil(t, ctx.job)
486+
487+
//Prepare request
488+
vars := map[string]string{
489+
"key": ctx.project.Key,
490+
"permWorkflowName": ctx.workflow.Name,
491+
"id": fmt.Sprintf("%d", ctx.job.ID+1), // invalid job
492+
}
493+
494+
//Register the worker
495+
testRegisterWorker(t, api, router, &ctx)
496+
497+
uri := router.GetRoute("POST", api.postTakeWorkflowJobHandler, vars)
498+
require.NotEmpty(t, uri)
499+
500+
//this call must failed, we try to take a jobID not reserved at worker's registration
501+
req := assets.NewJWTAuthentifiedRequest(t, ctx.workerToken, "POST", uri, nil)
502+
rec := httptest.NewRecorder()
503+
router.Mux.ServeHTTP(rec, req)
504+
require.Equal(t, 403, rec.Code)
505+
506+
//This must be ok, take the jobID reserved
507+
vars2 := map[string]string{
508+
"key": ctx.project.Key,
509+
"permWorkflowName": ctx.workflow.Name,
510+
"id": fmt.Sprintf("%d", ctx.job.ID),
511+
}
512+
uri2 := router.GetRoute("POST", api.postTakeWorkflowJobHandler, vars2)
513+
require.NotEmpty(t, uri2)
514+
req2 := assets.NewJWTAuthentifiedRequest(t, ctx.workerToken, "POST", uri2, nil)
515+
rec2 := httptest.NewRecorder()
516+
router.Mux.ServeHTTP(rec2, req2)
517+
require.Equal(t, 200, rec2.Code)
518+
}
519+
462520
func Test_postBookWorkflowJobHandler(t *testing.T) {
463521
api, _, router, end := newTestAPI(t)
464522
defer end()

engine/hatchery/local/worker_spawn.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ const workerCmdTmpl = "{{.WorkerBinary}} --api={{.API}} --token={{.Token}} --log
5252
func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) error {
5353
log.Debug("HatcheryLocal.SpawnWorker> %s want to spawn a worker named %s (jobID = %d)", spawnArgs.HatcheryName, spawnArgs.WorkerName, spawnArgs.JobID)
5454

55+
if spawnArgs.JobID == 0 && !spawnArgs.RegisterOnly {
56+
return sdk.WithStack(fmt.Errorf("no job ID and no register"))
57+
}
58+
5559
// Generate a random string 16 chars length
5660
bs := make([]byte, 16)
5761
if _, err := rand.Read(bs); err != nil {
@@ -79,10 +83,9 @@ func (h *HatcheryLocal) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw
7983
GraylogExtraKey: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraKey,
8084
GraylogExtraValue: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraValue,
8185
WorkerBinary: path.Join(h.BasedirDedicated, h.getWorkerBinaryName()),
86+
WorkflowJobID: spawnArgs.JobID,
8287
}
8388

84-
udataParam.WorkflowJobID = spawnArgs.JobID
85-
8689
tmpl, errt := template.New("cmd").Parse(workerCmdTmpl)
8790
if errt != nil {
8891
return errt

engine/hatchery/marathon/marathon.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
210210
log.Debug("spawnWorker> spawning worker %s (%s)", spawnArgs.Model.Name, spawnArgs.Model.ModelDocker.Image)
211211
}
212212

213+
if spawnArgs.JobID == 0 && !spawnArgs.RegisterOnly {
214+
return sdk.WithStack(fmt.Errorf("no job ID and no register"))
215+
}
216+
213217
// Estimate needed memory, we will set 110% of required memory
214218
memory := int64(h.Config.DefaultMemory)
215219

@@ -228,10 +232,9 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
228232
GraylogPort: h.Configuration().Provision.WorkerLogsOptions.Graylog.Port,
229233
GraylogExtraKey: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraKey,
230234
GraylogExtraValue: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraValue,
235+
WorkflowJobID: spawnArgs.JobID,
231236
}
232237

233-
udataParam.WorkflowJobID = spawnArgs.JobID
234-
235238
tmpl, errt := template.New("cmd").Parse(spawnArgs.Model.ModelDocker.Cmd)
236239
if errt != nil {
237240
return errt

engine/hatchery/openstack/spawn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ func (h *HatcheryOpenstack) SpawnWorker(ctx context.Context, spawnArgs hatchery.
2525
log.Debug("spawnWorker> spawning worker %s model:%s", spawnArgs.WorkerName, spawnArgs.Model.Name)
2626
}
2727

28+
if spawnArgs.JobID == 0 && !spawnArgs.RegisterOnly {
29+
return sdk.WithStack(fmt.Errorf("no job ID and no register"))
30+
}
31+
2832
if len(h.getServers(ctx)) == h.Configuration().Provision.MaxWorker {
2933
log.Debug("MaxWorker limit (%d) reached", h.Configuration().Provision.MaxWorker)
3034
return nil

engine/hatchery/vsphere/spawn.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ type annotation struct {
3030

3131
// SpawnWorker creates a new vm instance
3232
func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) error {
33+
if spawnArgs.JobID == 0 && !spawnArgs.RegisterOnly {
34+
return sdk.WithStack(fmt.Errorf("no job ID and no register"))
35+
}
36+
3337
var vm *object.VirtualMachine
3438
var errV error
3539
_, errM := h.getModelByName(ctx, spawnArgs.Model.Name)
@@ -196,10 +200,9 @@ func (h *HatcheryVSphere) launchScriptWorker(name string, jobID int64, token str
196200
GraylogPort: h.Configuration().Provision.WorkerLogsOptions.Graylog.Port,
197201
GraylogExtraKey: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraKey,
198202
GraylogExtraValue: h.Configuration().Provision.WorkerLogsOptions.Graylog.ExtraValue,
203+
WorkflowJobID: jobID,
199204
}
200205

201-
udataParam.WorkflowJobID = jobID
202-
203206
var buffer bytes.Buffer
204207
if err := tmpl.Execute(&buffer, udataParam); err != nil {
205208
return err

engine/worker/cmd_run.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func runCmd() func(cmd *cobra.Command, args []string) {
3737
// Get the booked job ID
3838
bookedWJobID := FlagInt64(cmd, flagBookedWorkflowJobID)
3939

40+
if bookedWJobID == 0 {
41+
sdk.Exit("flag --booked-workflow-job-id is mandatory")
42+
}
43+
4044
ctx, cancel := context.WithCancel(ctx)
4145
// Gracefully shutdown connections
4246
c := make(chan os.Signal, 1)

0 commit comments

Comments
 (0)