Skip to content

Commit

Permalink
feat(api): allow to customize book delay for an hatchery (#6583)
Browse files Browse the repository at this point in the history
* feat(api): allow to customize book delay for an hatchery

* fix: unit test
  • Loading branch information
richardlt committed Jul 10, 2023
1 parent 2041c93 commit 2e886d1
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 15 deletions.
18 changes: 10 additions & 8 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,13 @@ type Configuration struct {
Error string `toml:"error" comment:"Help displayed to user on each error. Warning: this message could be view by anonymous user. Markdown accepted." json:"error" default:""`
} `toml:"help" comment:"######################\n 'Help' informations \n######################" json:"help"`
Workflow struct {
MaxRuns int64 `toml:"maxRuns" comment:"Maximum of runs by workflow" json:"maxRuns" default:"255"`
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
TemplateBulkRunnerCount int64 `toml:"templateBulkRunnerCount" comment:"The count of runner that will execute the workflow template bulk operation." json:"templateBulkRunnerCount" default:"10"`
JobDefaultRegion string `toml:"jobDefaultRegion" comment:"The default region where the job will be sent if no one is defined on a job" json:"jobDefaultRegion"`
MaxRuns int64 `toml:"maxRuns" comment:"Maximum of runs by workflow" json:"maxRuns" default:"255"`
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
TemplateBulkRunnerCount int64 `toml:"templateBulkRunnerCount" comment:"The count of runner that will execute the workflow template bulk operation." json:"templateBulkRunnerCount" default:"10"`
JobDefaultRegion string `toml:"jobDefaultRegion" comment:"The default region where the job will be sent if no one is defined on a job" json:"jobDefaultRegion"`
JobDefaultBookDelay int64 `toml:"jobDefaultBookDelay" comment:"The default book delay for a job in queue (in seconds)" json:"jobDefaultBookDelay" default:"120"`
CustomServiceJobBookDelay map[string]int64 `toml:"customServiceJobBookDelay" comment:"Set custom job book delay for given CDS Hatchery (in seconds)" json:"customServiceJobBookDelay" commented:"true"`
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
Project struct {
CreationDisabled bool `toml:"creationDisabled" comment:"Disable project creation for CDS non admin users." json:"creationDisabled" default:"false" commented:"true"`
Expand Down Expand Up @@ -670,9 +672,9 @@ func (a *API) Serve(ctx context.Context) error {
if err := a.initWebsocket(event.DefaultPubSubKey); err != nil {
return err
}
if err := a.initHatcheryWebsocket(event.JobQueuedPubSubKey); err != nil {
return err
}
if err := a.initHatcheryWebsocket(event.JobQueuedPubSubKey); err != nil {
return err
}
if err := InitRouterMetrics(ctx, a); err != nil {
log.Error(ctx, "unable to init router metrics: %v", err)
}
Expand Down
19 changes: 14 additions & 5 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (r *ProcessorReport) addWorkflowNodeRun(nr sdk.WorkflowNodeRun) {
r.nodes = append(r.nodes, nr)
}

//All returns all the objects in the reports
// All returns all the objects in the reports
func (r *ProcessorReport) All() []interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
Expand Down Expand Up @@ -351,8 +351,8 @@ func LoadDecryptSecrets(ctx context.Context, db gorp.SqlExecutor, wr *sdk.Workfl
return secrets, nil
}

//BookNodeJobRun Book a job for a hatchery
func BookNodeJobRun(ctx context.Context, store cache.Store, id int64, hatchery *sdk.Service) (*sdk.Service, error) {
// BookNodeJobRun Book a job for a hatchery
func BookNodeJobRun(ctx context.Context, store cache.Store, defaultBookDelay int64, customBookDelay map[string]int64, id int64, hatchery *sdk.Service) (*sdk.Service, error) {
k := keyBookJob(id)
h := sdk.Service{}
find, err := store.Get(k, &h)
Expand All @@ -361,7 +361,16 @@ func BookNodeJobRun(ctx context.Context, store cache.Store, id int64, hatchery *
}
if !find {
// job not already booked, book it for 2 min
if err := store.SetWithTTL(k, hatchery, 120); err != nil {
delay := 120
if defaultBookDelay > 0 {
delay = int(defaultBookDelay)
}
if customBookDelay != nil {
if d, ok := customBookDelay[hatchery.Name]; ok {
delay = int(d)
}
}
if err := store.SetWithTTL(k, hatchery, delay); err != nil {
log.Error(ctx, "cannot SetWithTTL: %s: %v", k, err)
}
return nil, nil
Expand All @@ -372,7 +381,7 @@ func BookNodeJobRun(ctx context.Context, store cache.Store, id int64, hatchery *
return &h, sdk.WrapError(sdk.ErrJobAlreadyBooked, "BookNodeJobRun> job %d already booked by %s (%d)", id, h.Name, h.ID)
}

//FreeNodeJobRun Free a job for a hatchery
// FreeNodeJobRun Free a job for a hatchery
func FreeNodeJobRun(ctx context.Context, store cache.Store, id int64) error {
k := keyBookJob(id)
h := sdk.Service{}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ queueRun:
t.Logf("##### work on job : %+v\n", j.Job.Action.Name)

//BookNodeJobRun
_, err = workflow.BookNodeJobRun(context.TODO(), cache, j.ID, &sdk.Service{
_, err = workflow.BookNodeJobRun(context.TODO(), cache, 0, nil, j.ID, &sdk.Service{
CanonicalService: sdk.CanonicalService{
Name: "Hatchery",
ID: 1,
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (api *API) postBookWorkflowJobHandler() service.Handler {
return err
}

if _, err := workflow.BookNodeJobRun(ctx, api.Cache, id, s); err != nil {
if _, err := workflow.BookNodeJobRun(ctx, api.Cache, api.Config.Workflow.JobDefaultBookDelay, api.Config.Workflow.CustomServiceJobBookDelay, id, s); err != nil {
return sdk.WrapError(err, "job already booked")
}

Expand Down
3 changes: 3 additions & 0 deletions engine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func configBootstrap(args []string) Configuration {
conf.API.Database.Schema = "public"
conf.API.HTTP.Port = 8081
conf.API.Auth.AllowedOrganizations = []string{"default"}
conf.API.Workflow.CustomServiceJobBookDelay = map[string]int64{
"my-service": 120,
}
case sdk.TypeUI:
conf.UI = &ui.Configuration{}
conf.UI.Name = "cds-ui-" + namesgenerator.GetRandomNameCDS()
Expand Down

0 comments on commit 2e886d1

Please sign in to comment.