Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listing pods only once when getting pods for RS in deployment #27012

Merged
merged 1 commit into from
Jun 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 24 additions & 10 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,18 +772,21 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension
}
syncedRSList = append(syncedRSList, *syncedRS)
}
syncedPodList, err := deploymentutil.ListPods(deployment,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
return &podList, err
})

syncedPodList, err := dc.listPods(deployment)
if err != nil {
return nil, nil, err
}
return syncedRSList, syncedPodList, nil
}

func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) {
return deploymentutil.ListPods(deployment,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
return &podList, err
})
}

// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps:
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
Expand Down Expand Up @@ -977,6 +980,14 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
return scaled, err
}

func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
podList, err := dc.listPods(deployment)
if err != nil {
return 0, err
}
return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds)
}

func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
Expand All @@ -986,7 +997,8 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep

minReadySeconds := deployment.Spec.MinReadySeconds
allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, []*extensions.ReplicaSet{newRS}, minReadySeconds)
// TODO: use dc.getAvailablePodsForReplicaSets instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't you switch to it now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explained in #27012 (comment)

newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{newRS}, minReadySeconds)
if err != nil {
return false, fmt.Errorf("could not find available pods: %v", err)
}
Expand Down Expand Up @@ -1068,7 +1080,8 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re
// cannot scale down this replica set.
continue
}
readyPodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, []*extensions.ReplicaSet{targetRS}, 0)
// TODO: use dc.getAvailablePodsForReplicaSets instead
readyPodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{targetRS}, 0)
if err != nil {
return nil, totalScaledDown, fmt.Errorf("could not find available pods: %v", err)
}
Expand Down Expand Up @@ -1104,7 +1117,8 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [
minAvailable := deployment.Spec.Replicas - maxUnavailable
minReadySeconds := deployment.Spec.MinReadySeconds
// Find the number of ready pods.
availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, allRSs, minReadySeconds)
// TODO: use dc.getAvailablePodsForReplicaSets instead
availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, allRSs, minReadySeconds)
if err != nil {
return 0, fmt.Errorf("could not find available pods: %v", err)
}
Expand Down Expand Up @@ -1220,7 +1234,7 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet,
totalActualReplicas = deploymentutil.GetActualReplicaCountForReplicaSets(allRSs)
updatedReplicas = deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS})
minReadySeconds := deployment.Spec.MinReadySeconds
availableReplicas, err = deploymentutil.GetAvailablePodsForReplicaSets(dc.client, allRSs, minReadySeconds)
availableReplicas, err = dc.getAvailablePodsForReplicaSets(deployment, allRSs, minReadySeconds)
if err != nil {
err = fmt.Errorf("failed to count available pods: %v", err)
return
Expand Down
13 changes: 6 additions & 7 deletions pkg/controller/deployment/deployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ func newRSWithStatus(name string, specReplicas, statusReplicas int, selector map
return rs
}

func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOrString) exp.Deployment {
func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOrString, selector map[string]string) exp.Deployment {
return exp.Deployment{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: exp.DeploymentSpec{
Replicas: int32(replicas),
Selector: &unversioned.LabelSelector{MatchLabels: selector},
Strategy: exp.DeploymentStrategy{
Type: exp.RollingUpdateDeploymentStrategyType,
RollingUpdate: &exp.RollingUpdateDeployment{
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) {
newRS := rs("foo-v2", test.newReplicas, nil)
oldRS := rs("foo-v2", test.oldReplicas, nil)
allRSs := []*exp.ReplicaSet{newRS, oldRS}
deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0))
deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil)
fake := fake.Clientset{}
controller := &DeploymentController{
client: &fake,
Expand Down Expand Up @@ -293,7 +294,7 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
oldRSs := []*exp.ReplicaSet{oldRS}
allRSs := []*exp.ReplicaSet{oldRS, newRS}

deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable)
deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, newSelector)
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
Expand Down Expand Up @@ -430,7 +431,7 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
t.Logf("executing scenario %d", i)
oldRS := rs("foo-v2", test.oldReplicas, nil)
oldRSs := []*exp.ReplicaSet{oldRS}
deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2))
deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil)
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
Expand Down Expand Up @@ -540,7 +541,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
oldRS := rs("foo-v2", test.oldReplicas, nil)
allRSs := []*exp.ReplicaSet{oldRS}
oldRSs := []*exp.ReplicaSet{oldRS}
deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable)
deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"})
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
Expand Down Expand Up @@ -783,12 +784,10 @@ func TestSyncDeploymentCreatesReplicaSet(t *testing.T) {
// then is updated to 1 replica
rs := newReplicaSet(d, "deploymentrs-4186632231", 0)
updatedRS := newReplicaSet(d, "deploymentrs-4186632231", 1)
opt := newListOptions()

f.expectCreateRSAction(rs)
f.expectUpdateDeploymentAction(d)
f.expectUpdateRSAction(updatedRS)
f.expectListPodAction(rs.Namespace, opt)
f.expectUpdateDeploymentAction(d)

f.run(getKey(d, t))
Expand Down
67 changes: 39 additions & 28 deletions pkg/util/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod"
rsutil "k8s.io/kubernetes/pkg/util/replicaset"
"k8s.io/kubernetes/pkg/util/wait"
)

Expand Down Expand Up @@ -314,23 +315,42 @@ func GetActualReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) i
return totalReplicaCount
}

// Returns the number of available pods corresponding to the given replica sets.
func GetAvailablePodsForReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
allPods, err := GetPodsForReplicaSets(c, rss)
// GetAvailablePodsForReplicaSets returns the number of available pods (listed from clientset) corresponding to the given replica sets.
func GetAvailablePodsForReplicaSets(c clientset.Interface, deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
podList, err := listPods(deployment, c)
if err != nil {
return 0, err
}
return CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds)
}

// CountAvailablePodsForReplicaSets returns the number of available pods corresponding to the given pod list and replica sets.
// Note that the input pod list should be the pods targeted by the deployment of input replica sets.
func CountAvailablePodsForReplicaSets(podList *api.PodList, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
rsPods, err := filterPodsMatchingReplicaSets(rss, podList)
if err != nil {
return 0, err
}
return countAvailablePods(rsPods, minReadySeconds), nil
}

// GetAvailablePodsForDeployment returns the number of available pods (listed from clientset) corresponding to the given deployment.
func GetAvailablePodsForDeployment(c clientset.Interface, deployment *extensions.Deployment, minReadySeconds int32) (int32, error) {
podList, err := listPods(deployment, c)
if err != nil {
return 0, err
}
return getReadyPodsCount(allPods, minReadySeconds), nil
return countAvailablePods(podList.Items, minReadySeconds), nil
}

func getReadyPodsCount(pods []api.Pod, minReadySeconds int32) int32 {
readyPodCount := int32(0)
func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 {
availablePodCount := int32(0)
for _, pod := range pods {
if IsPodAvailable(&pod, minReadySeconds) {
readyPodCount++
availablePodCount++
}
}
return readyPodCount
return availablePodCount
}

func IsPodAvailable(pod *api.Pod, minReadySeconds int32) bool {
Expand All @@ -354,29 +374,20 @@ func IsPodAvailable(pod *api.Pod, minReadySeconds int32) bool {
return false
}

func GetPodsForReplicaSets(c clientset.Interface, replicaSets []*extensions.ReplicaSet) ([]api.Pod, error) {
allPods := map[string]api.Pod{}
// filterPodsMatchingReplicaSets filters the given pod list and only return the ones targeted by the input replicasets
func filterPodsMatchingReplicaSets(replicaSets []*extensions.ReplicaSet, podList *api.PodList) ([]api.Pod, error) {
rsPods := []api.Pod{}
for _, rs := range replicaSets {
if rs != nil {
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
options := api.ListOptions{LabelSelector: selector}
podList, err := c.Core().Pods(rs.ObjectMeta.Namespace).List(options)
if err != nil {
return nil, fmt.Errorf("error listing pods: %v", err)
}
for _, pod := range podList.Items {
allPods[pod.Name] = pod
}
matchingFunc, err := rsutil.MatchingPodsFunc(rs)
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need to return here? If a replica set has an invalid label selector then isn't it fine to just ignore it and try to update the deployment status anyway?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kargakis You may be right, but since this keeps the old behavior w.r.t. handling errors and today is the last day to get 1.3 stuff in, I vote that we should do it in a follow up if at all.

}
if matchingFunc == nil {
continue
}
rsPods = append(rsPods, podutil.Filter(podList, matchingFunc)...)
}
requiredPods := []api.Pod{}
for _, pod := range allPods {
requiredPods = append(requiredPods, pod)
}
return requiredPods, nil
return rsPods, nil
}

// Revision returns the revision number of the input replica set
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/deployment/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newPod(now time.Time, ready bool, beforeSec int) api.Pod {
}
}

func TestGetReadyPodsCount(t *testing.T) {
func TestCountAvailablePods(t *testing.T) {
now := time.Now()
tests := []struct {
pods []api.Pod
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestGetReadyPodsCount(t *testing.T) {
}

for _, test := range tests {
if count := getReadyPodsCount(test.pods, int32(test.minReadySeconds)); int(count) != test.expected {
if count := countAvailablePods(test.pods, int32(test.minReadySeconds)); int(count) != test.expected {
t.Errorf("Pods = %#v, minReadySeconds = %d, expected %d, got %d", test.pods, test.minReadySeconds, test.expected, count)
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,14 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod,
// if the error is nil and podUpdated is true, the returned pod contains the applied update.
return pod, podUpdated, err
}

// Filter uses the input function f to filter the given pod list, and return the filtered pods
func Filter(podList *api.PodList, f func(api.Pod) bool) []api.Pod {
pods := make([]api.Pod, 0)
for _, p := range podList.Items {
if f(p) {
pods = append(pods, p)
}
}
return pods
}
17 changes: 17 additions & 0 deletions pkg/util/replicaset/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
"k8s.io/kubernetes/pkg/labels"
errorsutil "k8s.io/kubernetes/pkg/util/errors"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod"
Expand Down Expand Up @@ -91,3 +93,18 @@ func GetPodTemplateSpecHash(rs extensions.ReplicaSet) string {
Spec: rs.Spec.Template.Spec,
}))
}

// MatchingPodsFunc returns a filter function for pods with matching labels
func MatchingPodsFunc(rs *extensions.ReplicaSet) (func(api.Pod) bool, error) {
if rs == nil {
return nil, nil
}
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
return func(pod api.Pod) bool {
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
return selector.Matches(podLabelsSelector)
}, nil
}
21 changes: 14 additions & 7 deletions test/e2e/framework/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2886,18 +2886,18 @@ func WaitForDeploymentStatus(c clientset.Interface, ns, deploymentName string, d
}
}
totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
totalAvailable, err := deploymentutil.GetAvailablePodsForReplicaSets(c, allRSs, minReadySeconds)
totalAvailable, err := deploymentutil.GetAvailablePodsForDeployment(c, deployment, minReadySeconds)
if err != nil {
return false, err
}
if totalCreated > maxCreated {
logReplicaSetsOfDeployment(deployment, allOldRSs, newRS)
logPodsOfReplicaSets(c, allRSs, minReadySeconds)
logPodsOfDeployment(c, deployment, minReadySeconds)
return false, fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
}
if totalAvailable < minAvailable {
logReplicaSetsOfDeployment(deployment, allOldRSs, newRS)
logPodsOfReplicaSets(c, allRSs, minReadySeconds)
logPodsOfDeployment(c, deployment, minReadySeconds)
return false, fmt.Errorf("total pods available: %d, less than the min required: %d", totalAvailable, minAvailable)
}

Expand All @@ -2913,7 +2913,7 @@ func WaitForDeploymentStatus(c clientset.Interface, ns, deploymentName string, d

if err == wait.ErrWaitTimeout {
logReplicaSetsOfDeployment(deployment, allOldRSs, newRS)
logPodsOfReplicaSets(c, allRSs, minReadySeconds)
logPodsOfDeployment(c, deployment, minReadySeconds)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %s status to match expectation: %v", deploymentName, err)
Expand Down Expand Up @@ -3062,10 +3062,17 @@ func WaitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string
return deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { return c.Extensions().Deployments(ns).Get(deploymentName) }, desiredGeneration, Poll, 1*time.Minute)
}

func logPodsOfReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int32) {
allPods, err := deploymentutil.GetPodsForReplicaSets(c, rss)
func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deployment, minReadySeconds int32) {
podList, err := deploymentutil.ListPods(deployment,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
return c.Core().Pods(namespace).List(options)
})
if err != nil {
Logf("Failed to list pods of deployment %s: %v", deployment.Name, err)
return
}
if err == nil {
for _, pod := range allPods {
for _, pod := range podList.Items {
availability := "not available"
if deploymentutil.IsPodAvailable(&pod, minReadySeconds) {
availability = "available"
Expand Down