Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable batch/v1beta1.CronJobs by default #51465

Merged
merged 1 commit into from Aug 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 2 additions & 4 deletions cmd/kube-apiserver/app/server.go
Expand Up @@ -51,7 +51,6 @@ import (
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
serverstorage "k8s.io/apiserver/pkg/server/storage"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
//aggregatorinformers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
openapi "k8s.io/kube-openapi/pkg/common"

"k8s.io/apiserver/pkg/storage/etcd3/preflight"
Expand Down Expand Up @@ -548,9 +547,8 @@ func BuildStorageFactory(s *options.ServerRunOptions) (*serverstorage.DefaultSto
storageFactory, err := kubeapiserver.NewStorageFactory(
s.Etcd.StorageConfig, s.Etcd.DefaultStorageMediaType, api.Codecs,
serverstorage.NewDefaultResourceEncodingConfig(api.Registry), storageGroupsToEncodingVersion,
// FIXME: this GroupVersionResource override should be configurable
// TODO we need to update this to batch/v1beta1 when it's enabled by default
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
// FIXME (soltysh): this GroupVersionResource override should be configurable
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v1beta1")},
master.DefaultAPIResourceConfigSource(), s.APIEnablement.RuntimeConfig)
if err != nil {
return nil, fmt.Errorf("error in initializing storage factory: %s", err)
Expand Down
1 change: 0 additions & 1 deletion cmd/kube-controller-manager/app/BUILD
Expand Up @@ -29,7 +29,6 @@ go_library(
"//pkg/apis/authentication/install:go_default_library",
"//pkg/apis/authorization/install:go_default_library",
"//pkg/apis/autoscaling/install:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/batch/install:go_default_library",
"//pkg/apis/certificates/install:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
Expand Down
9 changes: 2 additions & 7 deletions cmd/kube-controller-manager/app/batch.go
Expand Up @@ -22,8 +22,6 @@ package app

import (
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job"
)
Expand All @@ -41,14 +39,11 @@ func startJobController(ctx ControllerContext) (bool, error) {
}

func startCronJobController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v2alpha1", Resource: "cronjobs"}] {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
return false, nil
}
// TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset
cronjobConfig := ctx.ClientBuilder.ConfigOrDie("cronjob-controller")
cronjobConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
go cronjob.NewCronJobController(
clientset.NewForConfigOrDie(cronjobConfig),
ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
).Run(ctx.Stop)
return true, nil
}
18 changes: 8 additions & 10 deletions hack/make-rules/test-cmd-util.sh
Expand Up @@ -1224,16 +1224,14 @@ run_kubectl_run_tests() {
# Clean up
kubectl delete deployment nginx-apps "${kube_flags[@]}"

# TODO: enable batch/v1beta1 by default before 1.8 release, after issues
# with CronJobs existing in multiple versions at once is solved
# # Pre-Condition: no Job exists
# kube::test::get_object_assert cronjobs "{{range.items}}{{$id_field}}:{{end}}" ''
# # Command
# kubectl run pi --schedule="*/5 * * * *" --generator=cronjob/v1beta1 "--image=$IMAGE_PERL" --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)' "${kube_flags[@]}"
# # Post-Condition: CronJob "pi" is created
# kube::test::get_object_assert cronjobs "{{range.items}}{{$id_field}}:{{end}}" 'pi:'
# # Clean up
# kubectl delete cronjobs pi "${kube_flags[@]}"
# Pre-Condition: no Job exists
kube::test::get_object_assert cronjob.v1beta1.batch "{{range.items}}{{$id_field}}:{{end}}" ''
# Command
kubectl run pi --schedule="*/5 * * * *" --generator=cronjob/v1beta1 "--image=$IMAGE_PERL" --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)' "${kube_flags[@]}"
# Post-Condition: CronJob "pi" is created
kube::test::get_object_assert cronjob.v1beta1.batch "{{range.items}}{{$id_field}}:{{end}}" 'pi:'
# Clean up
kubectl delete cronjob.v1beta1.batch pi "${kube_flags[@]}"

set +o nounset
set +o errexit
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/batch/v1beta1/defaults_test.go
Expand Up @@ -34,7 +34,7 @@ func TestSetDefaultCronJob(t *testing.T) {
original *batchv1beta1.CronJob
expected *batchv1beta1.CronJob
}{
"empty batchv2alpha1.CronJob should default batchv2alpha1.ConcurrencyPolicy and Suspend": {
"empty batchv1beta1.CronJob should default batchv1beta1.ConcurrencyPolicy and Suspend": {
original: &batchv1beta1.CronJob{},
expected: &batchv1beta1.CronJob{
Spec: batchv1beta1.CronJobSpec{
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/cronjob/BUILD
Expand Up @@ -20,7 +20,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/robfig/cron:go_default_library",
"//vendor/k8s.io/api/batch/v1:go_default_library",
"//vendor/k8s.io/api/batch/v2alpha1:go_default_library",
"//vendor/k8s.io/api/batch/v1beta1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
Expand Down Expand Up @@ -50,7 +50,7 @@ go_test(
"//pkg/api/install:go_default_library",
"//pkg/apis/batch/install:go_default_library",
"//vendor/k8s.io/api/batch/v1:go_default_library",
"//vendor/k8s.io/api/batch/v2alpha1:go_default_library",
"//vendor/k8s.io/api/batch/v1beta1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
Expand Down
18 changes: 9 additions & 9 deletions pkg/controller/cronjob/cronjob_controller.go
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/golang/glog"

batchv1 "k8s.io/api/batch/v1"
batchv2alpha1 "k8s.io/api/batch/v2alpha1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -56,7 +56,7 @@ import (
// Utilities for dealing with Jobs and CronJobs and time.

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batchv2alpha1.SchemeGroupVersion.WithKind("CronJob")
var controllerKind = batchv1beta1.SchemeGroupVersion.WithKind("CronJob")

type CronJobController struct {
kubeClient clientset.Interface
Expand Down Expand Up @@ -116,7 +116,7 @@ func (jm *CronJobController) syncAll() {
js := jl.Items
glog.V(4).Infof("Found %d jobs", len(js))

sjl, err := jm.kubeClient.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this break HA clusters during upgrades? Not all masters are updated simultaneously: #51049 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is slightly different usecase than what you're pointing to. There, the issue is about upgrading from beta1 to beta2, where both are enabled by default. Here, we are migrating from alpha to beta, iow. from non-default to a default enabled API.
If I don't change controller to use the beta (enabled by default) cronjobs will not be working. Because you'll have the beta API available, but controller will rely on non-default API to be turned on.
This will break HA clusters, but only those that explicitly enable the alpha version. I'll defer to @bgrant0607 or @smarterclayton for the decision but imho this should get in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support HA alpha upgrading to beta without interruption. So someone on a cluster like that has already broken the glass.

if err != nil {
utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
return
Expand All @@ -134,7 +134,7 @@ func (jm *CronJobController) syncAll() {
}

// cleanupFinishedJobs cleanups finished jobs created by a CronJob
func cleanupFinishedJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControlInterface,
func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
// If neither limits are active, there is no need to do anything.
if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil {
Expand Down Expand Up @@ -179,7 +179,7 @@ func cleanupFinishedJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobCont
}

// removeOldestJobs removes the oldest jobs from a list of jobs
func removeOldestJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControlInterface,
func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
pc podControlInterface, maxJobs int32, recorder record.EventRecorder) {
numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 {
Expand All @@ -200,7 +200,7 @@ func removeOldestJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControl
// All known jobs created by "sj" should be included in "js".
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func syncOne(sj *batchv2alpha1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)

childrenJobs := make(map[types.UID]bool)
Expand Down Expand Up @@ -284,7 +284,7 @@ func syncOne(sj *batchv2alpha1.CronJob, js []batchv1.Job, now time.Time, jc jobC
// can see easily that there was a missed execution.
return
}
if sj.Spec.ConcurrencyPolicy == batchv2alpha1.ForbidConcurrent && len(sj.Status.Active) > 0 {
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod).
Expand All @@ -297,7 +297,7 @@ func syncOne(sj *batchv2alpha1.CronJob, js []batchv1.Job, now time.Time, jc jobC
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return
}
if sj.Spec.ConcurrencyPolicy == batchv2alpha1.ReplaceConcurrent {
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
for _, j := range sj.Status.Active {
// TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go
Expand Down Expand Up @@ -353,7 +353,7 @@ func syncOne(sj *batchv2alpha1.CronJob, js []batchv1.Job, now time.Time, jc jobC
}

// deleteJob reaps a job, deleting the job, the pobs and the reference in the active list
func deleteJob(sj *batchv2alpha1.CronJob, job *batchv1.Job, jc jobControlInterface,
func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface,
pc podControlInterface, recorder record.EventRecorder, reason string) bool {
// TODO: this should be replaced with server side job deletion
// currencontinuetly this mimics JobReaper from pkg/kubectl/stop.go
Expand Down
38 changes: 19 additions & 19 deletions pkg/controller/cronjob/cronjob_controller_test.go
Expand Up @@ -24,7 +24,7 @@ import (
"time"

batchv1 "k8s.io/api/batch/v1"
batchv2alpha1 "k8s.io/api/batch/v2alpha1"
batchV1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -96,19 +96,19 @@ func startTimeStringToTime(startTime string) time.Time {
}

// returns a cronJob with some fields filled in.
func cronJob() batchv2alpha1.CronJob {
return batchv2alpha1.CronJob{
func cronJob() batchV1beta1.CronJob {
return batchV1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: "snazzycats",
UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v2alpha1/namespaces/snazzycats/cronjobs/mycronjob",
SelfLink: "/apis/batch/v1beta1/namespaces/snazzycats/cronjobs/mycronjob",
CreationTimestamp: metav1.Time{Time: justBeforeTheHour()},
},
Spec: batchv2alpha1.CronJobSpec{
Spec: batchV1beta1.CronJobSpec{
Schedule: "* * * * ?",
ConcurrencyPolicy: batchv2alpha1.AllowConcurrent,
JobTemplate: batchv2alpha1.JobTemplateSpec{
ConcurrencyPolicy: batchV1beta1.AllowConcurrent,
JobTemplate: batchV1beta1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"x": "y"},
Expand Down Expand Up @@ -152,15 +152,15 @@ func newJob(UID string) batchv1.Job {
}

var (
shortDead int64 = 10
mediumDead int64 = 2 * 60 * 60
longDead int64 = 1000000
noDead int64 = -12345
A batchv2alpha1.ConcurrencyPolicy = batchv2alpha1.AllowConcurrent
f batchv2alpha1.ConcurrencyPolicy = batchv2alpha1.ForbidConcurrent
R batchv2alpha1.ConcurrencyPolicy = batchv2alpha1.ReplaceConcurrent
T bool = true
F bool = false
shortDead int64 = 10
mediumDead int64 = 2 * 60 * 60
longDead int64 = 1000000
noDead int64 = -12345
A batchV1beta1.ConcurrencyPolicy = batchV1beta1.AllowConcurrent
f batchV1beta1.ConcurrencyPolicy = batchV1beta1.ForbidConcurrent
R batchV1beta1.ConcurrencyPolicy = batchV1beta1.ReplaceConcurrent
T bool = true
F bool = false
)

func TestSyncOne_RunOrNot(t *testing.T) {
Expand All @@ -179,7 +179,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {

testCases := map[string]struct {
// sj spec
concurrencyPolicy batchv2alpha1.ConcurrencyPolicy
concurrencyPolicy batchV1beta1.ConcurrencyPolicy
suspend bool
schedule string
deadline int64
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
if controllerRef == nil {
t.Errorf("%s: expected job to have ControllerRef: %#v", name, job)
} else {
if got, want := controllerRef.APIVersion, "batch/v2alpha1"; got != want {
if got, want := controllerRef.APIVersion, "batch/v1beta1"; got != want {
t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want)
}
if got, want := controllerRef.Kind, "CronJob"; got != want {
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestSyncOne_Status(t *testing.T) {

testCases := map[string]struct {
// sj spec
concurrencyPolicy batchv2alpha1.ConcurrencyPolicy
concurrencyPolicy batchV1beta1.ConcurrencyPolicy
suspend bool
schedule string
deadline int64
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/cronjob/injection.go
Expand Up @@ -21,7 +21,7 @@ import (
"sync"

batchv1 "k8s.io/api/batch/v1"
batchv2alpha1 "k8s.io/api/batch/v2alpha1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -33,7 +33,7 @@ import (
// sjControlInterface is an interface that knows how to update CronJob status
// created as an interface to allow testing.
type sjControlInterface interface {
UpdateStatus(sj *batchv2alpha1.CronJob) (*batchv2alpha1.CronJob, error)
UpdateStatus(sj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error)
}

// realSJControl is the default implementation of sjControlInterface.
Expand All @@ -43,18 +43,18 @@ type realSJControl struct {

var _ sjControlInterface = &realSJControl{}

func (c *realSJControl) UpdateStatus(sj *batchv2alpha1.CronJob) (*batchv2alpha1.CronJob, error) {
return c.KubeClient.BatchV2alpha1().CronJobs(sj.Namespace).UpdateStatus(sj)
func (c *realSJControl) UpdateStatus(sj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) {
return c.KubeClient.BatchV1beta1().CronJobs(sj.Namespace).UpdateStatus(sj)
}

// fakeSJControl is the default implementation of sjControlInterface.
type fakeSJControl struct {
Updates []batchv2alpha1.CronJob
Updates []batchv1beta1.CronJob
}

var _ sjControlInterface = &fakeSJControl{}

func (c *fakeSJControl) UpdateStatus(sj *batchv2alpha1.CronJob) (*batchv2alpha1.CronJob, error) {
func (c *fakeSJControl) UpdateStatus(sj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) {
c.Updates = append(c.Updates, *sj)
return sj, nil
}
Expand Down Expand Up @@ -85,15 +85,15 @@ type realJobControl struct {

var _ jobControlInterface = &realJobControl{}

func copyLabels(template *batchv2alpha1.JobTemplateSpec) labels.Set {
func copyLabels(template *batchv1beta1.JobTemplateSpec) labels.Set {
l := make(labels.Set)
for k, v := range template.Labels {
l[k] = v
}
return l
}

func copyAnnotations(template *batchv2alpha1.JobTemplateSpec) labels.Set {
func copyAnnotations(template *batchv1beta1.JobTemplateSpec) labels.Set {
a := make(labels.Set)
for k, v := range template.Annotations {
a[k] = v
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/cronjob/utils.go
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/robfig/cron"

batchv1 "k8s.io/api/batch/v1"
batchv2alpha1 "k8s.io/api/batch/v2alpha1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -36,7 +36,7 @@ import (

// Utilities for dealing with Jobs and CronJobs and time.

func inActiveList(sj batchv2alpha1.CronJob, uid types.UID) bool {
func inActiveList(sj batchv1beta1.CronJob, uid types.UID) bool {
for _, j := range sj.Status.Active {
if j.UID == uid {
return true
Expand All @@ -45,7 +45,7 @@ func inActiveList(sj batchv2alpha1.CronJob, uid types.UID) bool {
return false
}

func deleteFromActiveList(sj *batchv2alpha1.CronJob, uid types.UID) {
func deleteFromActiveList(sj *batchv1beta1.CronJob, uid types.UID) {
if sj == nil {
return
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func getNextStartTimeAfter(schedule string, now time.Time) (time.Time, error) {
//
// If there are too many (>100) unstarted times, just give up and return an empty slice.
// If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(sj batchv2alpha1.CronJob, now time.Time) ([]time.Time, error) {
func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{}
sched, err := cron.ParseStandard(sj.Spec.Schedule)
if err != nil {
Expand Down Expand Up @@ -170,7 +170,7 @@ func getRecentUnmetScheduleTimes(sj batchv2alpha1.CronJob, now time.Time) ([]tim
}

// getJobFromTemplate makes a Job from a CronJob
func getJobFromTemplate(sj *batchv2alpha1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
func getJobFromTemplate(sj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
// TODO: consider adding the following labels:
// nominal-start-time=$RFC_3339_DATE_OF_INTENDED_START -- for user convenience
// scheduled-job-name=$SJ_NAME -- for user convenience
Expand Down