Skip to content
This repository was archived by the owner on Apr 17, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions cluster-autoscaler/cluster_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ func run(_ <-chan struct{}) {
loopStart := time.Now()
updateLastTime("main")

// TODO: remove once switched to all nodes.
readyNodes, err := readyNodeLister.List()
if err != nil {
glog.Errorf("Failed to list ready nodes: %v", err)
Expand Down Expand Up @@ -316,12 +315,6 @@ func run(_ <-chan struct{}) {
continue
}

// TODO: remove once all of the unready node handling elements are in place.
if err := CheckGroupsAndNodes(readyNodes, autoscalingContext.CloudProvider); err != nil {
glog.Warningf("Cluster is not ready for autoscaling: %v", err)
continue
}

allUnschedulablePods, err := unschedulablePodLister.List()
if err != nil {
glog.Errorf("Failed to list unscheduled pods: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't method CheckGroupsAndNodes unused? If yes, can you remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Expand Down Expand Up @@ -405,7 +398,7 @@ func run(_ <-chan struct{}) {
glog.V(4).Infof("Calculating unneeded nodes")

scaleDown.CleanUp(time.Now())
err := scaleDown.UpdateUnneededNodes(readyNodes, allScheduled, time.Now())
err := scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now())
if err != nil {
glog.Warningf("Failed to scale down: %v", err)
continue
Expand All @@ -424,7 +417,7 @@ func run(_ <-chan struct{}) {

scaleDownStart := time.Now()
updateLastTime("scaledown")
result, err := scaleDown.TryToScaleDown(readyNodes, allScheduled)
result, err := scaleDown.TryToScaleDown(allNodes, allScheduled)
updateDuration("scaledown", scaleDownStart)

// TODO: revisit result handling
Expand Down
17 changes: 2 additions & 15 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.
package clusterstate

import (
"fmt"
"reflect"
"sync"
"time"

"k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/utils/deletetaint"
kube_util "k8s.io/contrib/cluster-autoscaler/utils/kubernetes"

apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/sets"
Expand Down Expand Up @@ -322,7 +322,7 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {

for _, node := range csr.nodes {
nodeGroup, errNg := csr.cloudProvider.NodeGroupForNode(node)
ready, _, errReady := GetReadinessState(node)
ready, _, errReady := kube_util.GetReadinessState(node)

// Node is most likely not autoscaled, however check the errors.
if reflect.ValueOf(nodeGroup).IsNil() {
Expand Down Expand Up @@ -399,19 +399,6 @@ func (csr *ClusterStateRegistry) GetUnregisteredNodes() []UnregisteredNode {
return result
}

// GetReadinessState gets readiness state for the node
func GetReadinessState(node *apiv1.Node) (isNodeReady bool, lastTransitionTime time.Time, err error) {
for _, condition := range node.Status.Conditions {
if condition.Type == apiv1.NodeReady {
if condition.Status == apiv1.ConditionTrue {
return true, condition.LastTransitionTime.Time, nil
}
return false, condition.LastTransitionTime.Time, nil
}
}
return false, time.Time{}, fmt.Errorf("NodeReady condition for %s not found", node.Name)
}

func isNodeNotStarted(node *apiv1.Node) bool {
for _, condition := range node.Status.Conditions {
if condition.Type == apiv1.NodeReady &&
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/estimator/binpacking_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package estimator

import (
"testing"
"time"

"k8s.io/contrib/cluster-autoscaler/simulator"
. "k8s.io/contrib/cluster-autoscaler/utils/test"
"k8s.io/kubernetes/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand Down Expand Up @@ -48,6 +50,7 @@ func TestBinpackingEstimate(t *testing.T) {
},
}
node.Status.Allocatable = node.Status.Capacity
SetNodeReadyState(node, true, time.Time{})

nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(node)
Expand Down Expand Up @@ -76,6 +79,7 @@ func TestBinpackingEstimateComingNodes(t *testing.T) {
},
}
node.Status.Allocatable = node.Status.Capacity
SetNodeReadyState(node, true, time.Time{})

nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(node)
Expand Down Expand Up @@ -109,6 +113,7 @@ func TestBinpackingEstimateWithPorts(t *testing.T) {
},
}
node.Status.Allocatable = node.Status.Capacity
SetNodeReadyState(node, true, time.Time{})

nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(node)
Expand Down
4 changes: 4 additions & 0 deletions cluster-autoscaler/expander/waste/waste_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package waste

import (
"testing"
"time"

. "k8s.io/contrib/cluster-autoscaler/utils/test"

"github.com/stretchr/testify/assert"
"k8s.io/contrib/cluster-autoscaler/expander"
Expand Down Expand Up @@ -51,6 +54,7 @@ func makeNodeInfo(cpu int64, memory int64, pods int64) *schedulercache.NodeInfo
},
}
node.Status.Allocatable = node.Status.Capacity
SetNodeReadyState(node, true, time.Time{})

nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(node)
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/contrib/cluster-autoscaler/clusterstate"
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/contrib/cluster-autoscaler/utils/deletetaint"
kube_util "k8s.io/contrib/cluster-autoscaler/utils/kubernetes"

"k8s.io/kubernetes/pkg/api/errors"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -153,7 +154,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod) (Sca

glog.V(2).Infof("%s was unneeded for %s", node.Name, now.Sub(val).String())

ready, _, _ := clusterstate.GetReadinessState(node)
ready, _, _ := kube_util.GetReadinessState(node)

// Check how long the node was underutilized.
if ready && !val.Add(sd.context.ScaleDownUnneededTime).Before(now) {
Expand Down
87 changes: 86 additions & 1 deletion cluster-autoscaler/scale_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func TestFindUnneededNodes(t *testing.T) {
n3 := BuildTestNode("n3", 1000, 10)
n4 := BuildTestNode("n4", 10000, 10)

SetNodeReadyState(n1, true, time.Time{})
SetNodeReadyState(n2, true, time.Time{})
SetNodeReadyState(n3, true, time.Time{})
SetNodeReadyState(n4, true, time.Time{})

context := AutoscalingContext{
PredicateChecker: simulator.NewTestPredicateChecker(),
ScaleDownUtilizationThreshold: 0.35,
Expand Down Expand Up @@ -94,6 +99,7 @@ func TestDrainNode(t *testing.T) {
p1 := BuildTestPod("p1", 100, 0)
p2 := BuildTestPod("p2", 300, 0)
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})

fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1, *p2}}, nil
Expand Down Expand Up @@ -204,7 +210,7 @@ func TestScaleDown(t *testing.T) {
assert.Equal(t, n1.Name, getStringFromChan(updatedNodes))
}

func TestNoScaleDown(t *testing.T) {
func TestNoScaleDownUnready(t *testing.T) {
fakeClient := &fake.Clientset{}
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, false, time.Now().Add(-3*time.Minute))
Expand Down Expand Up @@ -249,6 +255,8 @@ func TestNoScaleDown(t *testing.T) {
MaxGratefulTerminationSec: 60,
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}),
}

// N1 is unready so it requires a bigger unneeded time.
scaleDown := NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-5*time.Minute))
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2})
Expand All @@ -266,6 +274,7 @@ func TestNoScaleDown(t *testing.T) {
provider.AddNode("ng1", n1)
provider.AddNode("ng1", n2)

// N1 has been unready for 2 hours, ok to delete.
context.CloudProvider = provider
scaleDown = NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-2*time.Hour))
Expand All @@ -275,6 +284,82 @@ func TestNoScaleDown(t *testing.T) {
assert.Equal(t, n1.Name, getStringFromChan(deletedNodes))
}

func TestScaleDownNoMove(t *testing.T) {
fakeClient := &fake.Clientset{}

job := batchv1.Job{
ObjectMeta: apiv1.ObjectMeta{
Name: "job",
Namespace: "default",
SelfLink: "/apivs/extensions/v1beta1/namespaces/default/jobs/job",
},
}
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})

// N2 is unready so no pods can be moved there.
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, false, time.Time{})

p1 := BuildTestPod("p1", 100, 0)
p1.Annotations = map[string]string{
"kubernetes.io/created-by": RefJSON(&job),
}

p2 := BuildTestPod("p2", 800, 0)
p1.Spec.NodeName = "n1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move 2 lines up

p2.Spec.NodeName = "n2"

fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1, *p2}}, nil
})
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
})
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.GetAction)
switch getAction.GetName() {
case n1.Name:
return true, n1, nil
case n2.Name:
return true, n2, nil
}
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
})
fakeClient.Fake.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
t.FailNow()
return false, nil, nil
})
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
t.FailNow()
return false, nil, nil
})
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
t.FailNow()
return nil
})
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", n1)
provider.AddNode("ng1", n2)
assert.NotNil(t, provider)

context := &AutoscalingContext{
PredicateChecker: simulator.NewTestPredicateChecker(),
CloudProvider: provider,
ClientSet: fakeClient,
Recorder: createEventRecorder(fakeClient),
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
MaxGratefulTerminationSec: 60,
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}),
}
scaleDown := NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute))
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2})
assert.NoError(t, err)
assert.Equal(t, ScaleDownNoUnneeded, result)
}

func getStringFromChan(c chan string) string {
select {
case val := <-c:
Expand Down
17 changes: 16 additions & 1 deletion cluster-autoscaler/simulator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

. "k8s.io/contrib/cluster-autoscaler/utils/test"

apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand All @@ -35,6 +34,7 @@ func TestUtilization(t *testing.T) {

nodeInfo := schedulercache.NewNodeInfo(pod, pod, pod2)
node := BuildTestNode("node1", 2000, 2000000)
SetNodeReadyState(node, true, time.Time{})

utilization, err := CalculateUtilization(node, nodeInfo)
assert.NoError(t, err)
Expand All @@ -56,7 +56,9 @@ func TestFindPlaceAllOk(t *testing.T) {
"n2": schedulercache.NewNodeInfo(),
}
node1 := BuildTestNode("n1", 1000, 2000000)
SetNodeReadyState(node1, true, time.Time{})
node2 := BuildTestNode("n2", 1000, 2000000)
SetNodeReadyState(node2, true, time.Time{})
nodeInfos["n1"].SetNode(node1)
nodeInfos["n2"].SetNode(node2)

Expand Down Expand Up @@ -90,7 +92,11 @@ func TestFindPlaceAllBas(t *testing.T) {
}
nodebad := BuildTestNode("nbad", 1000, 2000000)
node1 := BuildTestNode("n1", 1000, 2000000)
SetNodeReadyState(node1, true, time.Time{})

node2 := BuildTestNode("n2", 1000, 2000000)
SetNodeReadyState(node2, true, time.Time{})

nodeInfos["n1"].SetNode(node1)
nodeInfos["n2"].SetNode(node2)
nodeInfos["nbad"].SetNode(nodebad)
Expand Down Expand Up @@ -120,7 +126,11 @@ func TestFindNone(t *testing.T) {
"n2": schedulercache.NewNodeInfo(),
}
node1 := BuildTestNode("n1", 1000, 2000000)
SetNodeReadyState(node1, true, time.Time{})

node2 := BuildTestNode("n2", 1000, 2000000)
SetNodeReadyState(node2, true, time.Time{})

nodeInfos["n1"].SetNode(node1)
nodeInfos["n2"].SetNode(node2)

Expand Down Expand Up @@ -166,6 +176,11 @@ func TestFindEmptyNodes(t *testing.T) {
node3 := BuildTestNode("n3", 1000, 2000000)
node4 := BuildTestNode("n4", 1000, 2000000)

SetNodeReadyState(node1, true, time.Time{})
SetNodeReadyState(node2, true, time.Time{})
SetNodeReadyState(node3, true, time.Time{})
SetNodeReadyState(node4, true, time.Time{})

emptyNodes := FindEmptyNodesToRemove([]*apiv1.Node{node1, node2, node3, node4}, []*apiv1.Pod{pod1, pod2})
assert.Equal(t, []*apiv1.Node{node2, node3, node4}, emptyNodes)
}
Loading