diff --git a/cluster-autoscaler/Makefile b/cluster-autoscaler/Makefile index 6094b314c3..d626dc6701 100644 --- a/cluster-autoscaler/Makefile +++ b/cluster-autoscaler/Makefile @@ -16,6 +16,6 @@ test-unit: clean deps build $(ENVVAR) godep go test --test.short -race ./... $(FLAGS) clean: - rm -f clusterautoscaler + rm -f cluster-autoscaler .PHONY: all deps build test-unit clean diff --git a/cluster-autoscaler/cluster_autoscaler.go b/cluster-autoscaler/cluster_autoscaler.go index e5a4186adf..5012cdb058 100644 --- a/cluster-autoscaler/cluster_autoscaler.go +++ b/cluster-autoscaler/cluster_autoscaler.go @@ -22,11 +22,7 @@ import ( "time" "k8s.io/contrib/cluster-autoscaler/config" - kube_api "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" kube_client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" "github.com/golang/glog" ) @@ -53,27 +49,57 @@ func main() { } kubeClient := kube_client.NewOrDie(kubeConfig) - - // watch unscheduled pods - selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + - string(kube_api.PodSucceeded) + ",status.phase!=" + string(kube_api.PodFailed)) - podListWatch := cache.NewListWatchFromClient(kubeClient, "pods", kube_api.NamespaceAll, selector) - podListener := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} - podReflector := cache.NewReflector(podListWatch, &kube_api.Pod{}, podListener.Store, time.Hour) - podReflector.Run() + unscheduledPodLister := NewUnscheduledPodLister(kubeClient) + nodeLister := NewNodeLister(kubeClient) for { select { case <-time.After(time.Minute): { - pods, err := podListener.List(labels.Everything()) + pods, err := unscheduledPodLister.List() if err != nil { glog.Errorf("Failed to list pods: %v", err) - break + continue + } + if len(pods) == 0 { + glog.V(1).Info("No unscheduled pods") + continue } for _, pod := range pods { - glog.Infof("Pod %s/%s is not scheduled", pod.Namespace, pod.Name) + glog.V(1).Infof("Pod %s/%s is not scheduled", pod.Namespace, pod.Name) + } + + nodes, err := nodeLister.List() + if err != nil { + glog.Errorf("Failed to list nodes: %v", err) + continue + } + if len(nodes) == 0 { + glog.Errorf("No nodes in the cluster") + continue + } + + // TODO: Checking if all nodes are present. + + // Checks if scheduler tried to schedule the pods after thew newest node was added. + newestNode := GetNewestNode(nodes) + if newestNode == nil { + glog.Errorf("No newest node") + continue + } + oldestSchedulingTrial := GetOldestFailedSchedulingTrail(pods) + if oldestSchedulingTrial == nil { + glog.Errorf("No oldest unschedueled trial: %v", err) + continue + } + + // TODO: Find better way to check if all pods were checked after the newest node + // was added. + if newestNode.CreationTimestamp.After(oldestSchedulingTrial.Add(-1 * time.Minute)) { + // Lets give scheduler another chance. + glog.V(1).Infof("One of the pods have not been tried after adding %s", newestNode.Name) + continue } } } diff --git a/cluster-autoscaler/utils.go b/cluster-autoscaler/utils.go new file mode 100644 index 0000000000..1d9f966e74 --- /dev/null +++ b/cluster-autoscaler/utils.go @@ -0,0 +1,107 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "time" + + kube_api "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + kube_client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" +) + +// UnscheduledPodLister list unscheduled pods +type UnscheduledPodLister struct { + podLister *cache.StoreToPodLister +} + +// List returns all unscheduled pods. +func (unscheduledPodLister *UnscheduledPodLister) List() ([]*kube_api.Pod, error) { + //TODO: Extra filter based on pod condition. + return unscheduledPodLister.podLister.List(labels.Everything()) +} + +// NewUnscheduledPodLister returns a lister providing pods that failed to be scheduled. +func NewUnscheduledPodLister(kubeClient *kube_client.Client) *UnscheduledPodLister { + // watch unscheduled pods + selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + + string(kube_api.PodSucceeded) + ",status.phase!=" + string(kube_api.PodFailed)) + podListWatch := cache.NewListWatchFromClient(kubeClient, "pods", kube_api.NamespaceAll, selector) + podLister := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} + podReflector := cache.NewReflector(podListWatch, &kube_api.Pod{}, podLister.Store, time.Hour) + podReflector.Run() + + return &UnscheduledPodLister{ + podLister: podLister, + } +} + +// ReadyNodeLister lists ready nodes. +type ReadyNodeLister struct { + nodeLister *cache.StoreToNodeLister +} + +// List returns ready nodes. +func (readyNodeLister *ReadyNodeLister) List() ([]kube_api.Node, error) { + nodes, err := readyNodeLister.nodeLister.List() + if err != nil { + return []kube_api.Node{}, err + } + readyNodes := make([]kube_api.Node, 0, len(nodes.Items)) + for _, node := range nodes.Items { + for _, condition := range node.Status.Conditions { + if condition.Type == kube_api.NodeReady && condition.Status == kube_api.ConditionTrue { + readyNodes = append(readyNodes, node) + break + } + } + } + return readyNodes, nil +} + +// NewNodeLister builds a node lister. +func NewNodeLister(kubeClient *kube_client.Client) *ReadyNodeLister { + listWatcher := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything()) + nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} + reflector := cache.NewReflector(listWatcher, &kube_api.Node{}, nodeLister.Store, time.Hour) + reflector.Run() + return &ReadyNodeLister{ + nodeLister: nodeLister, + } +} + +// GetNewestNode returns the newest node from the given list. +func GetNewestNode(nodes []kube_api.Node) *kube_api.Node { + var result *kube_api.Node + for i, node := range nodes { + if result == nil || node.CreationTimestamp.After(result.CreationTimestamp.Time) { + result = &(nodes[i]) + } + } + return result +} + +// GetOldestFailedSchedulingTrail returns the oldest time when a pod from the given list failed to +// be scheduled. +func GetOldestFailedSchedulingTrail(pods []*kube_api.Pod) *time.Time { + // Dummy implementation. + //TODO: Implement once pod condition is there. + now := time.Now() + return &now +}