diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index 064da568fb56e..e199042d8d1ee 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -61,10 +61,11 @@ SERVICE_CLUSTER_IP_RANGE="10.0.0.0/16" # formerly PORTAL_NET ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}" TEST_CLUSTER_LOG_LEVEL="${TEST_CLUSTER_LOG_LEVEL:---v=4}" +TEST_CLUSTER_RESYNC_PERIOD="${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m}" KUBELET_TEST_ARGS="--max-pods=100 $TEST_CLUSTER_LOG_LEVEL" APISERVER_TEST_ARGS="--runtime-config=extensions/v1beta1 ${TEST_CLUSTER_LOG_LEVEL}" -CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}" +CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL} ${TEST_CLUSTER_RESYNC_PERIOD}" SCHEDULER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}" KUBEPROXY_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}" diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index c09b56a4c8150..51278165b58b5 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/apiserver" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/node" replicationControllerPkg "k8s.io/kubernetes/pkg/controller/replication" @@ -192,14 +193,13 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string eventBroadcaster.StartRecordingToSink(cl.Events("")) scheduler.New(schedulerConfig).Run() - endpoints := endpointcontroller.NewEndpointController(cl) // ensure the service endpoints are sync'd several times within the window that the integration tests wait - go endpoints.Run(3, util.NeverStop) - - controllerManager := replicationControllerPkg.NewReplicationManager(cl, replicationControllerPkg.BurstReplicas) + go endpointcontroller.NewEndpointController(cl, controller.NoResyncPeriodFunc). + Run(3, util.NeverStop) // TODO: Write an integration test for the replication controllers watch. - go controllerManager.Run(3, util.NeverStop) + go replicationControllerPkg.NewReplicationManager(cl, controller.NoResyncPeriodFunc, replicationControllerPkg.BurstReplicas). + Run(3, util.NeverStop) nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index fe3ff40c6e617..8ea48deebbaa4 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -25,6 +25,7 @@ package app import ( "fmt" "io/ioutil" + "math/rand" "net" "net/http" "net/http/pprof" @@ -79,6 +80,7 @@ type CMServer struct { TerminatedPodGCThreshold int HorizontalPodAutoscalerSyncPeriod time.Duration DeploymentControllerSyncPeriod time.Duration + MinResyncPeriod time.Duration RegisterRetryCount int NodeMonitorGracePeriod time.Duration NodeStartupGracePeriod time.Duration @@ -115,6 +117,7 @@ func NewCMServer() *CMServer { PVClaimBinderSyncPeriod: 10 * time.Second, HorizontalPodAutoscalerSyncPeriod: 30 * time.Second, DeploymentControllerSyncPeriod: 30 * time.Second, + MinResyncPeriod: 12 * time.Hour, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, ClusterName: "kubernetes", @@ -157,6 +160,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource-quota-sync-period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") fs.DurationVar(&s.NamespaceSyncPeriod, "namespace-sync-period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates") fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder-sync-period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims") + fs.DurationVar(&s.MinResyncPeriod, "min-resync-period", s.MinResyncPeriod, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod") fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "pv-recycler-pod-template-filepath-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "The file path to a pod definition used as a template for NFS persistent volume recycling") fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "pv-recycler-minimum-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "The minimum ActiveDeadlineSeconds to use for an NFS Recycler pod") fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "pv-recycler-increment-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "the increment of time added per Gi to ActiveDeadlineSeconds for an NFS scrubber pod") @@ -190,6 +194,11 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.RootCAFile, "root-ca-file", s.RootCAFile, "If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.") } +func (s *CMServer) resyncPeriod() time.Duration { + factor := rand.Float64() + 1 + return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor) +} + // Run runs the CMServer. This should never exit. func (s *CMServer) Run(_ []string) error { if s.Kubeconfig == "" && s.Master == "" { @@ -230,20 +239,14 @@ func (s *CMServer) Run(_ []string) error { glog.Fatal(server.ListenAndServe()) }() - endpoints := endpointcontroller.NewEndpointController(kubeClient) - go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) - - controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas) - go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) - - go daemon.NewDaemonSetsController(kubeClient). - Run(s.ConcurrentDSCSyncs, util.NeverStop) + go endpointcontroller.NewEndpointController(kubeClient, s.resyncPeriod). + Run(s.ConcurrentEndpointSyncs, util.NeverStop) - go job.NewJobController(kubeClient). - Run(s.ConcurrentJobSyncs, util.NeverStop) + go replicationControllerPkg.NewReplicationManager(kubeClient, s.resyncPeriod, replicationControllerPkg.BurstReplicas). + Run(s.ConcurrentRCSyncs, util.NeverStop) if s.TerminatedPodGCThreshold > 0 { - go gc.New(kubeClient, s.TerminatedPodGCThreshold). + go gc.New(kubeClient, s.resyncPeriod, s.TerminatedPodGCThreshold). Run(util.NeverStop) } @@ -273,8 +276,7 @@ func (s *CMServer) Run(_ []string) error { } } - resourceQuotaController := resourcequotacontroller.NewResourceQuotaController(kubeClient) - resourceQuotaController.Run(s.ResourceQuotaSyncPeriod) + resourcequotacontroller.NewResourceQuotaController(kubeClient).Run(s.ResourceQuotaSyncPeriod) versionStrings, err := client.ServerAPIVersions(kubeconfig) if err != nil { @@ -302,13 +304,13 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(kubeClient). + go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod). Run(s.ConcurrentDSCSyncs, util.NeverStop) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(kubeClient). + go job.NewJobController(kubeClient, s.resyncPeriod). Run(s.ConcurrentJobSyncs, util.NeverStop) } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index a19105d0fbbaa..c14cfec712f00 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -19,9 +19,11 @@ package controllermanager import ( "fmt" "io/ioutil" + "math/rand" "net" "net/http" "strconv" + "time" "k8s.io/kubernetes/cmd/kube-controller-manager/app" "k8s.io/kubernetes/pkg/api/unversioned" @@ -73,6 +75,11 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.UseHostPortEndpoints, "host_port_endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.") } +func (s *CMServer) resyncPeriod() time.Duration { + factor := rand.Float64() + 1 + return time.Duration(float64(time.Hour) * 12.0 * factor) +} + func (s *CMServer) Run(_ []string) error { if s.Kubeconfig == "" && s.Master == "" { glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.") @@ -112,10 +119,10 @@ func (s *CMServer) Run(_ []string) error { endpoints := s.createEndpointController(kubeClient) go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) - controllerManager := replicationcontroller.NewReplicationManager(kubeClient, replicationcontroller.BurstReplicas) - go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) + go replicationcontroller.NewReplicationManager(kubeClient, s.resyncPeriod, replicationcontroller.BurstReplicas). + Run(s.ConcurrentRCSyncs, util.NeverStop) - go daemon.NewDaemonSetsController(kubeClient). + go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod). Run(s.ConcurrentDSCSyncs, util.NeverStop) //TODO(jdef) should eventually support more cloud providers here @@ -203,6 +210,6 @@ func (s *CMServer) createEndpointController(client *client.Client) kmendpoint.En return kmendpoint.NewEndpointController(client) } glog.V(2).Infof("Creating podIP:containerPort endpoint controller") - stockEndpointController := kendpoint.NewEndpointController(client) + stockEndpointController := kendpoint.NewEndpointController(client, s.resyncPeriod) return stockEndpointController } diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 575b5e68db65d..e5968c18ad8cd 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -85,7 +85,7 @@ func NewEndpointController(client *client.Client) *endpointController { }, }, &api.Pod{}, - kservice.PodRelistPeriod, + 5*time.Minute, framework.ResourceEventHandlerFuncs{ AddFunc: e.addPod, UpdateFunc: e.updatePod, diff --git a/hack/jenkins/e2e.sh b/hack/jenkins/e2e.sh index 7ceb422d218c5..fceef250fda78 100755 --- a/hack/jenkins/e2e.sh +++ b/hack/jenkins/e2e.sh @@ -348,6 +348,8 @@ case ${JOB_NAME} in NUM_MINIONS="100" # Reduce logs verbosity TEST_CLUSTER_LOG_LEVEL="--v=1" + # Increase resync period to simulate production + TEST_CLUSTER_RESYNC_PERIOD="--min-resync-period=12h" ;; # Runs tests on GCE soak cluster. diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 19fb39f33d988..f26626fc85e30 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -181,6 +181,7 @@ minion-max-log-size minion-path-override min-pr-number min-request-timeout +min-resync-period namespace-sync-period network-plugin network-plugin-dir diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index aa8a3cb22f1f9..2601b5b9c8bfa 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -54,6 +54,13 @@ var ( KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc ) +type ResyncPeriodFunc func() time.Duration + +// Returns 0 for resyncPeriod in case resyncing is not needed. +func NoResyncPeriodFunc() time.Duration { + return 0 +} + // Expectations are a way for controllers to tell the controller manager what they expect. eg: // ControllerExpectations: { // controller1: expects 2 adds in 2 minutes diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 70e3ff68fa18a..1d371ee046b5f 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -41,12 +41,6 @@ const ( // Daemon sets will periodically check that their daemon pods are running as expected. FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable. - // Nodes don't need relisting. - FullNodeResyncPeriod = 0 - - // Daemon pods don't need relisting. - FullDaemonPodResyncPeriod = 0 - // We must avoid counting pods until the pod store has synced. If it hasn't synced, to // avoid a hot loop, we'll wait this long between checks. PodStoreSyncedPollPeriod = 100 * time.Millisecond @@ -85,7 +79,7 @@ type DaemonSetsController struct { queue *workqueue.Type } -func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController { +func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -110,6 +104,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController }, }, &extensions.DaemonSet{}, + // TODO: Can we have much longer period here? FullDaemonSetResyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -141,7 +136,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController }, }, &api.Pod{}, - FullDaemonPodResyncPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: dsc.addPod, UpdateFunc: dsc.updatePod, @@ -159,7 +154,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController }, }, &api.Node{}, - FullNodeResyncPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: dsc.addNode, UpdateFunc: dsc.updateNode, diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index fb70085d66613..56adeb531b77c 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -130,7 +130,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num func newTestController() (*DaemonSetsController, *controller.FakePodControl) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewDaemonSetsController(client) + manager := NewDaemonSetsController(client, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady podControl := &controller.FakePodControl{} manager.podControl = podControl diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index f2332e885ee5e..60ceb6a8fe771 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" @@ -45,11 +46,6 @@ const ( // often. Higher numbers = lower CPU/network load; lower numbers = // shorter amount of time before a mistaken endpoint is corrected. FullServiceResyncPeriod = 30 * time.Second - - // We'll keep pod watches open up to this long. In the unlikely case - // that a watch misdelivers info about a pod, it'll take this long for - // that mistake to be rectified. - PodRelistPeriod = 5 * time.Minute ) var ( @@ -57,7 +53,7 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(client *client.Client) *EndpointController { +func NewEndpointController(client *client.Client, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { e := &EndpointController{ client: client, queue: workqueue.New(), @@ -73,6 +69,7 @@ func NewEndpointController(client *client.Client) *EndpointController { }, }, &api.Service{}, + // TODO: Can we have much longer period here? FullServiceResyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: e.enqueueService, @@ -93,7 +90,7 @@ func NewEndpointController(client *client.Client) *EndpointController { }, }, &api.Pod{}, - PodRelistPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: e.addPod, UpdateFunc: e.updatePod, diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 8ec61c4d048c3..2d81629f51b81 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" ) @@ -188,7 +189,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}}, @@ -220,7 +221,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) endpoints.checkLeftoverEndpoints() if e, a := 1, endpoints.queue.Len(); e != a { @@ -248,7 +249,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -276,7 +277,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -301,7 +302,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -338,7 +339,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 0, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -375,7 +376,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 1, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -416,7 +417,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -456,7 +457,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, @@ -475,7 +476,7 @@ func TestSyncEndpointsItems(t *testing.T) { serverResponse{http.StatusOK, &api.Endpoints{}}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 3, 2, 0) addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found! endpoints.serviceStore.Store.Add(&api.Service{ @@ -517,7 +518,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { serverResponse{http.StatusOK, &api.Endpoints{}}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Store.Add(&api.Service{ @@ -577,7 +578,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Store.Add(&api.Service{ diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index bec6958d23644..0ca5c4c4420b4 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -37,8 +37,7 @@ import ( ) const ( - fullResyncPeriod = 0 - gcCheckPeriod = 20 * time.Second + gcCheckPeriod = 20 * time.Second ) type GCController struct { @@ -49,7 +48,7 @@ type GCController struct { threshold int } -func New(kubeClient client.Interface, threshold int) *GCController { +func New(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, threshold int) *GCController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -75,7 +74,7 @@ func New(kubeClient client.Interface, threshold int) *GCController { }, }, &api.Pod{}, - fullResyncPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{}, ) return gcc diff --git a/pkg/controller/gc/gc_controller_test.go b/pkg/controller/gc/gc_controller_test.go index e8ee1ffd42465..631ab16aa2321 100644 --- a/pkg/controller/gc/gc_controller_test.go +++ b/pkg/controller/gc/gc_controller_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/sets" ) @@ -98,7 +99,7 @@ func TestGC(t *testing.T) { for i, test := range testCases { client := testclient.NewSimpleFake() - gcc := New(client, test.threshold) + gcc := New(client, controller.NoResyncPeriodFunc, test.threshold) fake := &FakePodControl{} gcc.podControl = fake diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index b3a304181627a..2debc2d3e6583 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -68,7 +68,7 @@ type JobController struct { queue *workqueue.Type } -func NewJobController(kubeClient client.Interface) *JobController { +func NewJobController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -93,6 +93,7 @@ func NewJobController(kubeClient client.Interface) *JobController { }, }, &extensions.Job{}, + // TODO: Can we have much longer period here? replicationcontroller.FullControllerResyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: jm.enqueueController, @@ -119,7 +120,7 @@ func NewJobController(kubeClient client.Interface) *JobController { }, }, &api.Pod{}, - replicationcontroller.PodRelistPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index edc3c16e78f55..363e75fa39c56 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -169,7 +169,7 @@ func TestControllerSyncJob(t *testing.T) { for name, tc := range testCases { // job manager setup client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControllerError} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -233,7 +233,7 @@ func TestControllerSyncJob(t *testing.T) { func TestSyncJobDeleted(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -253,7 +253,7 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -283,7 +283,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { func TestJobPodLookup(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady testCases := []struct { job *extensions.Job @@ -373,7 +373,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { // and checking expectations. func TestSyncJobExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -410,7 +410,7 @@ func TestWatchJobs(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady var testJob extensions.Job @@ -473,7 +473,8 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) + manager.podStoreSynced = alwaysReady // Put one job and one pod into the store diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 602c39b0860ab..b6e82837e30de 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -55,6 +55,7 @@ func NewNamespaceController(kubeClient client.Interface, versions *unversioned.A }, }, &api.Namespace{}, + // TODO: Can we have much longer period here? resyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 9bd1cee188830..3e2be50906b30 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -20,20 +20,26 @@ import ( "errors" "fmt" "net" + "strings" "sync" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/watch" ) var ( @@ -98,6 +104,14 @@ type NodeController struct { // The maximum duration before a pod evicted from a node can be forcefully terminated. maximumGracePeriod time.Duration recorder record.EventRecorder + // Pod framework and store + podController *framework.Controller + podStore cache.StoreToPodLister + // Node framework and store + nodeController *framework.Controller + nodeStore cache.StoreToNodeLister + + forcefullyDeletePod func(*api.Pod) } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -124,7 +138,8 @@ func NewNodeController( glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") } evictorLock := sync.Mutex{} - return &NodeController{ + + nc := &NodeController{ cloud: cloud, knownNodeSet: make(sets.String), kubeClient: kubeClient, @@ -142,11 +157,45 @@ func NewNodeController( now: unversioned.Now, clusterCIDR: clusterCIDR, allocateNodeCIDRs: allocateNodeCIDRs, + forcefullyDeletePod: func(p *api.Pod) { forcefullyDeletePod(kubeClient, p) }, } + + nc.podStore.Store, nc.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return nc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + controller.NoResyncPeriodFunc(), + framework.ResourceEventHandlerFuncs{ + AddFunc: nc.maybeDeleteTerminatingPod, + UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) }, + }, + ) + nc.nodeStore.Store, nc.nodeController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return nc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Node{}, + controller.NoResyncPeriodFunc(), + framework.ResourceEventHandlerFuncs{}, + ) + return nc } // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration) { + go nc.nodeController.Run(util.NeverStop) + go nc.podController.Run(util.NeverStop) // Incorporate the results of node status pushed from kubelet to master. go util.Until(func() { if err := nc.monitorNodeStatus(); err != nil { @@ -175,6 +224,7 @@ func (nc *NodeController) Run(period time.Duration) { util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 } + if remaining { nc.terminationEvictor.Add(value.Value) } @@ -237,6 +287,61 @@ func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api return nil } +// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating +// that should not be gracefully terminated. +func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { + pod, ok := obj.(*api.Pod) + if !ok { + return + } + + // consider only terminating pods + if pod.DeletionTimestamp == nil { + return + } + + // delete terminating pods that have not yet been scheduled + if len(pod.Spec.NodeName) == 0 { + nc.forcefullyDeletePod(pod) + return + } + + nodeObj, found, err := nc.nodeStore.GetByKey(pod.Spec.NodeName) + if err != nil { + // this can only happen if the Store.KeyFunc has a problem creating + // a key for the pod. If it happens once, it will happen again so + // don't bother requeuing the pod. + util.HandleError(err) + return + } + + // delete terminating pods that have been scheduled on + // nonexistant nodes + if !found { + nc.forcefullyDeletePod(pod) + return + } + + // delete terminating pods that have been scheduled on + // nodes that do not support graceful termination + // TODO(mikedanese): this can be removed when we no longer + // guarantee backwards compatibility of master API to kubelets with + // versions less than 1.1.0 + node := nodeObj.(*api.Node) + if strings.HasPrefix(node.Status.NodeInfo.KubeletVersion, "v1.0") { + nc.forcefullyDeletePod(pod) + return + } +} + +func forcefullyDeletePod(c client.Interface, pod *api.Pod) { + var zero int64 + err := c.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero}) + if err != nil { + util.HandleError(err) + } +} + // monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, // post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or // not reachable for a long period of time. diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index f87321410d8c1..d8433d15b5cfd 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -27,6 +27,7 @@ import ( apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/fields" @@ -646,6 +647,111 @@ func TestNodeDeletion(t *testing.T) { } } +func TestCheckPod(t *testing.T) { + + tcs := []struct { + pod api.Pod + prune bool + }{ + + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil}, + Spec: api.PodSpec{NodeName: "new"}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil}, + Spec: api.PodSpec{NodeName: "old"}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil}, + Spec: api.PodSpec{NodeName: ""}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: nil}, + Spec: api.PodSpec{NodeName: "nonexistant"}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}}, + Spec: api.PodSpec{NodeName: "new"}, + }, + prune: false, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}}, + Spec: api.PodSpec{NodeName: "old"}, + }, + prune: true, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}}, + Spec: api.PodSpec{NodeName: ""}, + }, + prune: true, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{DeletionTimestamp: &unversioned.Time{}}, + Spec: api.PodSpec{NodeName: "nonexistant"}, + }, + prune: true, + }, + } + + nc := NewNodeController(nil, nil, 0, nil, 0, 0, 0, nil, false) + nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) + nc.nodeStore.Store.Add(&api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "new", + }, + Status: api.NodeStatus{ + NodeInfo: api.NodeSystemInfo{ + KubeletVersion: "v1.1.0", + }, + }, + }) + nc.nodeStore.Store.Add(&api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "old", + }, + Status: api.NodeStatus{ + NodeInfo: api.NodeSystemInfo{ + KubeletVersion: "v1.0.0", + }, + }, + }) + + for i, tc := range tcs { + var deleteCalls int + nc.forcefullyDeletePod = func(_ *api.Pod) { + deleteCalls++ + } + + nc.maybeDeleteTerminatingPod(&tc.pod) + + if tc.prune && deleteCalls != 1 { + t.Errorf("[%v] expected number of delete calls to be 1 but got %v", i, deleteCalls) + } + if !tc.prune && deleteCalls != 0 { + t.Errorf("[%v] expected number of delete calls to be 0 but got %v", i, deleteCalls) + } + } +} + func newNode(name string) *api.Node { return &api.Node{ ObjectMeta: api.ObjectMeta{Name: name}, diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go index d106baa64d377..230bff06c9746 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go @@ -63,6 +63,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time }, }, &api.PersistentVolume{}, + // TODO: Can we have much longer period here? syncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: binder.addVolume, @@ -80,6 +81,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time }, }, &api.PersistentVolumeClaim{}, + // TODO: Can we have much longer period here? syncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: binder.addClaim, diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go index af1f6243d9814..65de821adee74 100644 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go @@ -71,6 +71,7 @@ func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Du }, }, &api.PersistentVolume{}, + // TODO: Can we have much longer period here? syncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index c912588b2c8f8..6cf0ef815323b 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -43,12 +43,6 @@ const ( // happens based on contents in local pod storage. FullControllerResyncPeriod = 30 * time.Second - // If a watch misdelivers info about a pod, it'll take at least this long - // to rectify the number of replicas. Note that dropped deletes are only - // rectified after the expectation times out because we don't know the - // final resting state of the pod. - PodRelistPeriod = 5 * time.Minute - // Realistic value of the burstReplica field for the replication manager based off // performance requirements for kubernetes 1.0. BurstReplicas = 500 @@ -95,7 +89,7 @@ type ReplicationManager struct { } // NewReplicationManager creates a new ReplicationManager. -func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *ReplicationManager { +func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -121,6 +115,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl }, }, &api.ReplicationController{}, + // TODO: Can we have much longer period here? FullControllerResyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: rm.enqueueController, @@ -161,7 +156,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl }, }, &api.Pod{}, - PodRelistPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: rm.addPod, // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 7207c97a2a971..48b8665c1c1ae 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -135,7 +135,7 @@ type serverResponse struct { func TestSyncReplicationControllerDoesNothing(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // 2 running pods, a controller with 2 replicas, sync is a no-op @@ -151,7 +151,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { func TestSyncReplicationControllerDeletes(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -167,7 +167,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -199,7 +199,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // A controller with 2 replicas and no pods in the store, 2 creates expected @@ -221,7 +221,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // Steady state for the replication controller, no Status.Replicas updates expected @@ -263,7 +263,7 @@ func TestControllerUpdateReplicas(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // Insufficient number of pods in the system, and Status.Replicas is wrong; @@ -303,7 +303,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -349,7 +349,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}), BurstReplicas) + manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}), controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady testCases := []struct { inRCs []*api.ReplicationController @@ -417,7 +417,7 @@ func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady var testControllerSpec api.ReplicationController @@ -460,7 +460,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // Put one rc and one pod into the controller's stores @@ -505,7 +505,7 @@ func TestUpdatePods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady received := make(chan string) @@ -564,7 +564,7 @@ func TestControllerUpdateRequeue(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady rc := newReplicationController(1) @@ -645,7 +645,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, burstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, burstReplicas) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -765,7 +765,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool { func TestRCSyncExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, 2) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 2) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -789,7 +789,7 @@ func TestRCSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, 10) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 10) manager.podStoreSynced = alwaysReady rc := newReplicationController(1) @@ -832,7 +832,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { func TestRCManagerNotReady(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, 2) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 2) manager.podControl = &fakePodControl manager.podStoreSynced = func() bool { return false } @@ -870,7 +870,7 @@ func TestOverlappingRCs(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) for i := 0; i < 5; i++ { - manager := NewReplicationManager(client, 10) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 10) manager.podStoreSynced = alwaysReady // Create 10 rcs, shuffled them randomly and insert them into the rc manager's store diff --git a/pkg/registry/pod/strategy.go b/pkg/registry/pod/strategy.go index cc721099a527c..a7b801096c8ae 100644 --- a/pkg/registry/pod/strategy.go +++ b/pkg/registry/pod/strategy.go @@ -109,6 +109,10 @@ func (podStrategy) CheckGracefulDelete(obj runtime.Object, options *api.DeleteOp if len(pod.Spec.NodeName) == 0 { period = 0 } + // if the pod is already terminated, delete immediately + if pod.Status.Phase == api.PodFailed || pod.Status.Phase == api.PodSucceeded { + period = 0 + } // ensure the options and the pod are in sync options.GracePeriodSeconds = &period return true diff --git a/pkg/registry/pod/strategy_test.go b/pkg/registry/pod/strategy_test.go new file mode 100644 index 0000000000000..fec29d108175a --- /dev/null +++ b/pkg/registry/pod/strategy_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" +) + +func TestCheckGracefulDelete(t *testing.T) { + defaultGracePeriod := int64(30) + tcs := []struct { + in *api.Pod + gracePeriod int64 + }{ + { + in: &api.Pod{ + Spec: api.PodSpec{NodeName: "something"}, + Status: api.PodStatus{Phase: api.PodPending}, + }, + + gracePeriod: defaultGracePeriod, + }, + { + in: &api.Pod{ + Spec: api.PodSpec{NodeName: "something"}, + Status: api.PodStatus{Phase: api.PodFailed}, + }, + gracePeriod: 0, + }, + { + in: &api.Pod{ + Spec: api.PodSpec{}, + Status: api.PodStatus{Phase: api.PodPending}, + }, + gracePeriod: 0, + }, + { + in: &api.Pod{ + Spec: api.PodSpec{}, + Status: api.PodStatus{Phase: api.PodSucceeded}, + }, + gracePeriod: 0, + }, + { + in: &api.Pod{ + Spec: api.PodSpec{}, + Status: api.PodStatus{}, + }, + gracePeriod: 0, + }, + } + for _, tc := range tcs { + out := &api.DeleteOptions{GracePeriodSeconds: &defaultGracePeriod} + Strategy.CheckGracefulDelete(tc.in, out) + if out.GracePeriodSeconds == nil { + t.Errorf("out grace period was nil but supposed to be %v", tc.gracePeriod) + } + if *(out.GracePeriodSeconds) != tc.gracePeriod { + t.Errorf("out grace period was %v but was expected to be %v", *out, tc.gracePeriod) + } + } +} diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index ad80581a2a34c..73fb19ee5be03 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/apiserver" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubectl" @@ -97,7 +98,7 @@ func NewMasterComponents(c *Config) *MasterComponents { } restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Default.Version(), QPS: c.QPS, Burst: c.Burst}) rcStopCh := make(chan struct{}) - controllerManager := replicationcontroller.NewReplicationManager(restClient, c.Burst) + controllerManager := replicationcontroller.NewReplicationManager(restClient, controller.NoResyncPeriodFunc, c.Burst) // TODO: Support events once we can cleanly shutdown an event recorder. controllerManager.SetEventRecorder(&record.FakeRecorder{})