diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 5671208259dd..0124af07e94e 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -201,7 +201,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string go replicationControllerPkg.NewReplicationManager(cl, controller.NoResyncPeriodFunc, replicationControllerPkg.BurstReplicas). Run(3, util.NeverStop) - nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), + nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5 * time.Second) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f11799530f76..485f8d121865 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -281,6 +281,7 @@ func (s *CMServer) Run(_ []string) error { nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), + util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index c14cfec712f0..9d2296471b84 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -136,6 +136,7 @@ func (s *CMServer) Run(_ []string) error { nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), + util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 3e2be50906b3..cef3bd6f5464 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -119,7 +119,8 @@ func NewNodeController( cloud cloudprovider.Interface, kubeClient client.Interface, podEvictionTimeout time.Duration, - podEvictionLimiter util.RateLimiter, + deletionEvictionLimiter util.RateLimiter, + terminationEvictionLimiter util.RateLimiter, nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, @@ -147,8 +148,8 @@ func NewNodeController( podEvictionTimeout: podEvictionTimeout, maximumGracePeriod: 5 * time.Minute, evictorLock: &evictorLock, - podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter), - terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter), + podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter), + terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter), nodeStatusMap: make(map[string]nodeStatusData), nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorPeriod: nodeMonitorPeriod, @@ -706,8 +707,8 @@ func (nc *NodeController) deletePods(nodeName string) (bool, error) { continue } - glog.V(2).Infof("Delete pod %v", pod.Name) - nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeName) + glog.V(2).Infof("Starting deletion of pod %v", pod.Name) + nc.recorder.Eventf(&pod, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { return false, err } diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index d8433d15b5cf..84e83afeca17 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -326,7 +326,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, item.fakeNodeHandler, - evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, + evictionTimeout, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -544,7 +544,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -622,7 +622,7 @@ func TestNodeDeletion(t *testing.T) { Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), + nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -712,7 +712,7 @@ func TestCheckPod(t *testing.T) { }, } - nc := NewNodeController(nil, nil, 0, nil, 0, 0, 0, nil, false) + nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store.Add(&api.Node{ ObjectMeta: api.ObjectMeta{ diff --git a/pkg/controller/node/rate_limited_queue.go b/pkg/controller/node/rate_limited_queue.go index 71950f54a476..1ce719d2021c 100644 --- a/pkg/controller/node/rate_limited_queue.go +++ b/pkg/controller/node/rate_limited_queue.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" ) @@ -163,16 +164,14 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { for ok { // rate limit the queue checking if !q.limiter.CanAccept() { + glog.V(10).Info("Try rate limitted...") // Try again later break } now := now() if now.Before(val.ProcessAt) { - q.queue.Replace(val) - val, ok = q.queue.Head() - // we do not sleep here because other values may be added at the front of the queue - continue + break } if ok, wait := fn(val); !ok { diff --git a/pkg/controller/node/rate_limited_queue_test.go b/pkg/controller/node/rate_limited_queue_test.go index 762e82638224..ab25806bd3cc 100644 --- a/pkg/controller/node/rate_limited_queue_test.go +++ b/pkg/controller/node/rate_limited_queue_test.go @@ -175,10 +175,10 @@ func TestTryOrdering(t *testing.T) { order = append(order, value.Value) return true, 0 }) - if !reflect.DeepEqual(order, []string{"first", "third", "second"}) { + if !reflect.DeepEqual(order, []string{"first", "third"}) { t.Fatalf("order was wrong: %v", order) } - if count != 4 { + if count != 3 { t.Fatalf("unexpected iterations: %d", count) } }