Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #14748 upstream release 1.1 #16920

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Expand Up @@ -201,7 +201,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
go replicationControllerPkg.NewReplicationManager(cl, controller.NoResyncPeriodFunc, replicationControllerPkg.BurstReplicas).
Run(3, util.NeverStop)

nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(),
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisor.Fake)
Expand Down
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/controllermanager.go
Expand Up @@ -281,6 +281,7 @@ func (s *CMServer) Run(_ []string) error {

nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)

Expand Down
1 change: 1 addition & 0 deletions contrib/mesos/pkg/controllermanager/controllermanager.go
Expand Up @@ -136,6 +136,7 @@ func (s *CMServer) Run(_ []string) error {

nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)

Expand Down
11 changes: 6 additions & 5 deletions pkg/controller/node/nodecontroller.go
Expand Up @@ -119,7 +119,8 @@ func NewNodeController(
cloud cloudprovider.Interface,
kubeClient client.Interface,
podEvictionTimeout time.Duration,
podEvictionLimiter util.RateLimiter,
deletionEvictionLimiter util.RateLimiter,
terminationEvictionLimiter util.RateLimiter,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
Expand Down Expand Up @@ -147,8 +148,8 @@ func NewNodeController(
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
evictorLock: &evictorLock,
podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter),
terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter),
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
Expand Down Expand Up @@ -706,8 +707,8 @@ func (nc *NodeController) deletePods(nodeName string) (bool, error) {
continue
}

glog.V(2).Infof("Delete pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeName)
glog.V(2).Infof("Starting deletion of pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/node/nodecontroller_test.go
Expand Up @@ -326,7 +326,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {

for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
evictionTimeout, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -544,7 +544,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {

for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -622,7 +622,7 @@ func TestNodeDeletion(t *testing.T) {
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}

nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -712,7 +712,7 @@ func TestCheckPod(t *testing.T) {
},
}

nc := NewNodeController(nil, nil, 0, nil, 0, 0, 0, nil, false)
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/node/rate_limited_queue.go
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
)
Expand Down Expand Up @@ -163,16 +164,14 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
for ok {
// rate limit the queue checking
if !q.limiter.CanAccept() {
glog.V(10).Info("Try rate limitted...")
// Try again later
break
}

now := now()
if now.Before(val.ProcessAt) {
q.queue.Replace(val)
val, ok = q.queue.Head()
// we do not sleep here because other values may be added at the front of the queue
continue
break
}

if ok, wait := fn(val); !ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/node/rate_limited_queue_test.go
Expand Up @@ -175,10 +175,10 @@ func TestTryOrdering(t *testing.T) {
order = append(order, value.Value)
return true, 0
})
if !reflect.DeepEqual(order, []string{"first", "third", "second"}) {
if !reflect.DeepEqual(order, []string{"first", "third"}) {
t.Fatalf("order was wrong: %v", order)
}
if count != 4 {
if count != 3 {
t.Fatalf("unexpected iterations: %d", count)
}
}
Expand Down