Skip to content

Commit

Permalink
feat: expose job concurrency policy to the API (#2680)
Browse files Browse the repository at this point in the history
Co-authored-by: Wilson Júnior <wilsonpjunior@gmail.com>
  • Loading branch information
arthurcgc and wpjunior committed Mar 20, 2024
1 parent 2fe8cb8 commit 358ce9c
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 22 deletions.
9 changes: 6 additions & 3 deletions api/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type inputJob struct {
Manual bool `json:"manual"` // creates a cronjob with the suspended attr + label tsuru.io/job-manual = true + "invalid" schedule
Trigger bool `json:"trigger"` // Trigger means the client wants to forcefully run a job
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`
ConcurrencyPolicy *string `json:"concurrencyPolicy,omitempty"`
}

func getJob(ctx stdContext.Context, name string) (*jobTypes.Job, error) {
Expand Down Expand Up @@ -352,6 +353,7 @@ func updateJob(w http.ResponseWriter, r *http.Request, t auth.Token) (err error)
Pool: ij.Pool,
Metadata: ij.Metadata,
Spec: jobTypes.JobSpec{
ConcurrencyPolicy: ij.ConcurrencyPolicy,
Schedule: ij.Schedule,
Container: ij.Container,
Manual: ij.Manual,
Expand Down Expand Up @@ -430,9 +432,10 @@ func createJob(w http.ResponseWriter, r *http.Request, t auth.Token) (err error)
Metadata: ij.Metadata,
DeployOptions: ij.DeployOptions,
Spec: jobTypes.JobSpec{
Manual: ij.Manual,
Schedule: ij.Schedule,
Container: ij.Container,
ConcurrencyPolicy: ij.ConcurrencyPolicy,
Manual: ij.Manual,
Schedule: ij.Schedule,
Container: ij.Container,
},
}
if ij.ActiveDeadlineSeconds != nil && *ij.ActiveDeadlineSeconds >= 0 {
Expand Down
19 changes: 9 additions & 10 deletions api/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ func (s *S) TestCreateFullyFeaturedCronjob(c *check.C) {
OriginalImageSrc: "busybox:1.28",
Command: []string{"/bin/sh", "-c", "echo Hello!"},
},
Schedule: "* * * * *",
Manual: false,
Schedule: "* * * * *",
Manual: false,
ConcurrencyPolicy: func() *string { s := "Allow"; return &s }(),
}
var buffer bytes.Buffer
err := json.NewEncoder(&buffer).Encode(j)
Expand Down Expand Up @@ -247,14 +248,12 @@ func (s *S) TestCreateFullyFeaturedCronjob(c *check.C) {
OriginalImageSrc: "busybox:1.28",
Command: []string{"/bin/sh", "-c", "echo Hello!"},
},
Schedule: "* * * * *",
Manual: false,
ServiceEnvs: []bindTypes.ServiceEnvVar{},
Envs: []bindTypes.EnvVar{},
ActiveDeadlineSeconds: func() *int64 {
v := int64(0)
return &v
}(),
Schedule: "* * * * *",
Manual: false,
ServiceEnvs: []bindTypes.ServiceEnvVar{},
Envs: []bindTypes.EnvVar{},
ActiveDeadlineSeconds: func() *int64 { v := int64(0); return &v }(),
ConcurrencyPolicy: func() *string { s := "Allow"; return &s }(),
},
}
c.Assert(gotJob, check.DeepEquals, expectedJob)
Expand Down
6 changes: 6 additions & 0 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,5 +631,11 @@ func validateJob(ctx context.Context, j *jobTypes.Job) error {
return &tsuruErrors.ValidationError{Message: jobTypes.ErrInvalidSchedule.Error()}
}
}
if j.Spec.ConcurrencyPolicy != nil {
allowedValues := []string{"Allow", "Forbid", "Replace"}
if !set.FromSlice(allowedValues).Includes(*j.Spec.ConcurrencyPolicy) {
return &tsuruErrors.ValidationError{Message: jobTypes.ErrInvalidConcurrencyPolicy.Error()}
}
}
return nil
}
44 changes: 44 additions & 0 deletions job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,50 @@ func (s *S) TestUpdateJob(c *check.C) {
},
expectedErr: nil,
},
{
name: "update concurrency policy",
oldJob: jobTypes.Job{
Name: "some-job",
TeamOwner: s.team.Name,
Pool: s.Pool,
Teams: []string{s.team.Name},
Plan: *s.defaultPlan,
Owner: s.user.Email,
Spec: jobTypes.JobSpec{
Schedule: "* * * * *",
Container: jobTypes.ContainerInfo{
OriginalImageSrc: "alpine:v1",
Command: []string{"echo", "hello!"},
},
},
},
newJob: jobTypes.Job{
Name: "some-job",
Spec: jobTypes.JobSpec{
ConcurrencyPolicy: func() *string { s := "Forbid"; return &s }(),
},
},
expectedJob: jobTypes.Job{
Name: "some-job",
TeamOwner: s.team.Name,
Plan: *s.defaultPlan,
Owner: s.user.Email,
Pool: s.Pool,
Teams: []string{s.team.Name},
Metadata: app.Metadata{Labels: []app.MetadataItem{}, Annotations: []app.MetadataItem{}},
Spec: jobTypes.JobSpec{
Schedule: "* * * * *",
Container: jobTypes.ContainerInfo{
OriginalImageSrc: "alpine:v1",
InternalRegistryImage: "fake.registry.io/job-some-job:latest",
Command: []string{"echo", "hello!"},
},
ConcurrencyPolicy: func() *string { s := "Forbid"; return &s }(),
ServiceEnvs: []bind.ServiceEnvVar{}, Envs: []bind.EnvVar{},
ActiveDeadlineSeconds: func() *int64 { i := int64(0); return &i }(),
},
},
},
}
for _, t := range updateTests {
if t.beforeFunc != nil {
Expand Down
6 changes: 6 additions & 0 deletions provision/kubernetes/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func ensureCronjob(ctx context.Context, client *ClusterClient, job *jobTypes.Job
return errors.WithStack(err)
}

concurrencyPolicy := ""
if job.Spec.ConcurrencyPolicy != nil {
concurrencyPolicy = *job.Spec.ConcurrencyPolicy
}

cronjob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: job.Name,
Expand All @@ -143,6 +148,7 @@ func ensureCronjob(ctx context.Context, client *ClusterClient, job *jobTypes.Job
JobTemplate: batchv1.JobTemplateSpec{
Spec: jobSpec,
},
ConcurrencyPolicy: batchv1.ConcurrencyPolicy(concurrencyPolicy),
},
}

Expand Down
39 changes: 37 additions & 2 deletions provision/kubernetes/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,39 @@ func (s *S) TestProvisionerCreateCronJob(c *check.C) {
c.Assert(err, check.IsNil)
},
},
{
name: "create cronjob with concurrency policy set to forbid",
jobName: "myjob",
namespace: "default",
scenario: func() {
cj := jobTypes.Job{
Name: "myjob",
TeamOwner: s.team.Name,
Pool: "test-default",
Spec: jobTypes.JobSpec{
ConcurrencyPolicy: func() *string { r := "Forbid"; return &r }(),
Schedule: "* * * * *",
Container: jobTypes.ContainerInfo{
OriginalImageSrc: "ubuntu:latest",
Command: []string{"echo", "hello world"},
},
},
}
err := s.p.EnsureJob(context.TODO(), &cj)
waitCron()
c.Assert(err, check.IsNil)
},
assertion: func(c *check.C, gotCron *batchv1.CronJob) {
c.Assert(gotCron.Spec.ConcurrencyPolicy, check.DeepEquals, batchv1.ForbidConcurrent)
},
teardown: func() {
err := s.p.DestroyJob(context.TODO(), &jobTypes.Job{
Name: "myjob",
Pool: "test-default",
})
c.Assert(err, check.IsNil)
},
},
}
for _, tt := range tests {
tt.scenario()
Expand Down Expand Up @@ -312,6 +345,7 @@ func (s *S) TestProvisionerUpdateCronJob(c *check.C) {
},
Spec: jobTypes.JobSpec{
Schedule: "* * * * *",
ConcurrencyPolicy: func() *string { r := "Forbid"; return &r }(),
Parallelism: func() *int32 { r := int32(2); return &r }(),
Completions: func() *int32 { r := int32(1); return &r }(),
ActiveDeadlineSeconds: func() *int64 { r := int64(0); return &r }(),
Expand Down Expand Up @@ -349,8 +383,9 @@ func (s *S) TestProvisionerUpdateCronJob(c *check.C) {
Annotations: map[string]string{"annotation2": "value4"},
},
Spec: batchv1.CronJobSpec{
Schedule: "* * * * *",
Suspend: func() *bool { r := false; return &r }(),
Schedule: "* * * * *",
ConcurrencyPolicy: batchv1.ForbidConcurrent,
Suspend: func() *bool { r := false; return &r }(),
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: func() *int32 { defaultTTL := int32(86400); return &defaultTTL }(),
Expand Down
15 changes: 8 additions & 7 deletions types/job/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
)

var (
ErrJobNotFound = errors.New("Job not found")
ErrJobUnitNotFound = errors.New("Job unit not found")
MaxAttempts = 5
ErrMaxAttemptsReached = fmt.Errorf("Unable to generate unique job name: max attempts reached (%d)", MaxAttempts)
ErrJobAlreadyExists = errors.New("a job with the same name already exists")
ErrInvalidSchedule = errors.New("invalid schedule")
ErrInvalidDeployKind = errors.New("invalid deploy kind")
ErrJobNotFound = errors.New("Job not found")
ErrJobUnitNotFound = errors.New("Job unit not found")
MaxAttempts = 5
ErrMaxAttemptsReached = fmt.Errorf("Unable to generate unique job name: max attempts reached (%d)", MaxAttempts)
ErrJobAlreadyExists = errors.New("a job with the same name already exists")
ErrInvalidSchedule = errors.New("invalid schedule")
ErrInvalidConcurrencyPolicy = errors.New("invalid concurrency policy, allowed values are: Allow, Forbid, Replace")
ErrInvalidDeployKind = errors.New("invalid deploy kind")
)

type JobCreationError struct {
Expand Down
1 change: 1 addition & 0 deletions types/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ContainerInfo struct {
}

type JobSpec struct {
ConcurrencyPolicy *string `json:"concurrentPolicy,omitempty"`
Completions *int32 `json:"completions,omitempty"`
Parallelism *int32 `json:"parallelism,omitempty"`
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`
Expand Down

0 comments on commit 358ce9c

Please sign in to comment.