Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for clean old RSs statuses in the middle of Recreate rollouts #43963

Merged
merged 2 commits into from
Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/deployment/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/client-go/testing",
Expand Down
11 changes: 7 additions & 4 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/golang/glog"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -537,10 +538,8 @@ func (dc *DeploymentController) getPodMapForDeployment(d *extensions.Deployment,
podMap[rs.UID] = &v1.PodList{}
}
for _, pod := range pods {
// Ignore inactive Pods since that's what ReplicaSet does.
if !controller.IsPodActive(pod) {
continue
}
// Do not ignore inactive Pods because Recreate Deployments need to verify that no
// Pods from older versions are running before spinning up new Pods.
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
continue
Expand Down Expand Up @@ -614,6 +613,10 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
Expand Down
11 changes: 7 additions & 4 deletions pkg/controller/deployment/deployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,18 +560,21 @@ func TestGetPodMapForReplicaSets(t *testing.T) {
for _, podList := range podMap {
podCount += len(podList.Items)
}
if got, want := podCount, 2; got != want {
if got, want := podCount, 3; got != want {
t.Errorf("podCount = %v, want %v", got, want)
}

if got, want := len(podMap), 2; got != want {
t.Errorf("len(podMap) = %v, want %v", got, want)
}
if got, want := len(podMap[rs1.UID].Items), 1; got != want {
if got, want := len(podMap[rs1.UID].Items), 2; got != want {
t.Errorf("len(podMap[rs1]) = %v, want %v", got, want)
}
if got, want := podMap[rs1.UID].Items[0].Name, "rs1-pod"; got != want {
t.Errorf("podMap[rs1] = [%v], want [%v]", got, want)
expect := map[string]struct{}{"rs1-pod": {}, "pod4": {}}
for _, pod := range podMap[rs1.UID].Items {
if _, ok := expect[pod.Name]; !ok {
t.Errorf("unexpected pod name for rs1: %s", pod.Name)
}
}
if got, want := len(podMap[rs2.UID].Items), 1; got != want {
t.Errorf("len(podMap[rs2]) = %v, want %v", got, want)
Expand Down
35 changes: 21 additions & 14 deletions pkg/controller/deployment/recreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/deployment/util"
)

// rolloutRecreate implements the logic for recreating a replica set.
Expand All @@ -43,18 +44,12 @@ func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList
return dc.syncRolloutStatus(allRSs, newRS, d)
}

newStatus := calculateStatus(allRSs, newRS, d)
// Do not process a deployment when it has old pods running.
if newStatus.UpdatedReplicas == 0 {
for _, podList := range podMap {
if len(podList.Items) > 0 {
return dc.syncRolloutStatus(allRSs, newRS, d)
}
}
if oldPodsRunning(newRS, oldRSs, podMap) {
return dc.syncRolloutStatus(allRSs, newRS, d)
}

// If we need to create a new RS, create it now.
// TODO: Create a new RS without re-listing all RSs.
if newRS == nil {
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
if err != nil {
Expand All @@ -64,14 +59,9 @@ func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList
}

// scale up new replica set.
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d)
if err != nil {
if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus.
return dc.syncRolloutStatus(allRSs, newRS, d)
}

// Sync deployment status.
return dc.syncRolloutStatus(allRSs, newRS, d)
Expand All @@ -98,6 +88,23 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext
return scaled, nil
}

// oldPodsRunning returns whether there are old pods running or any of the old ReplicaSets thinks that it runs pods.
func oldPodsRunning(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) bool {
if oldPods := util.GetActualReplicaCountForReplicaSets(oldRSs); oldPods > 0 {
return true
}
for rsUID, podList := range podMap {
// If the pods belong to the new ReplicaSet, ignore.
if newRS != nil && newRS.UID == rsUID {
continue
}
if len(podList.Items) > 0 {
return true
}
}
return false
}

// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate".
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/deployment/recreate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"testing"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
Expand Down Expand Up @@ -82,3 +84,62 @@ func TestScaleDownOldReplicaSets(t *testing.T) {
}
}
}

func TestOldPodsRunning(t *testing.T) {
tests := []struct {
name string

newRS *extensions.ReplicaSet
oldRSs []*extensions.ReplicaSet
podMap map[types.UID]*v1.PodList

expected bool
}{
{
name: "no old RSs",
expected: false,
},
{
name: "old RSs with running pods",
oldRSs: []*extensions.ReplicaSet{rsWithUID("some-uid"), rsWithUID("other-uid")},
podMap: podMapWithUIDs([]string{"some-uid", "other-uid"}),
expected: true,
},
{
name: "old RSs without pods but with non-zero status replicas",
oldRSs: []*extensions.ReplicaSet{newRSWithStatus("rs-blabla", 0, 1, nil)},
expected: true,
},
{
name: "old RSs without pods or non-zero status replicas",
oldRSs: []*extensions.ReplicaSet{newRSWithStatus("rs-blabla", 0, 0, nil)},
expected: false,
},
}

for _, test := range tests {
if expected, got := test.expected, oldPodsRunning(test.newRS, test.oldRSs, test.podMap); expected != got {
t.Errorf("%s: expected %t, got %t", test.name, expected, got)
}
}
}

func rsWithUID(uid string) *extensions.ReplicaSet {
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
rs := newReplicaSet(d, fmt.Sprintf("foo-%s", uid), 0)
rs.UID = types.UID(uid)
return rs
}

func podMapWithUIDs(uids []string) map[types.UID]*v1.PodList {
podMap := make(map[types.UID]*v1.PodList)
for _, uid := range uids {
podMap[types.UID(uid)] = &v1.PodList{
Items: []v1.Pod{
{ /* supposedly a pod */ },
{ /* supposedly another pod pod */ },
},
}
}
return podMap
}
3 changes: 1 addition & 2 deletions pkg/controller/deployment/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.D
// rsList should come from getReplicaSetsForDeployment(d).
// podMap should come from getPodMapForDeployment(d, rsList).
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) ([]*extensions.ReplicaSet, error) {
syncedRSList := []*extensions.ReplicaSet{}
var syncedRSList []*extensions.ReplicaSet
for _, rs := range rsList {
// Add pod-template-hash information if it's not in the RS.
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
Expand Down Expand Up @@ -515,7 +515,6 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSe
glog.V(4).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name)

var errList []error
// TODO: This should be parallelized.
for i := int32(0); i < diff; i++ {
rs := cleanableRSes[i]
// Avoid delete replica set with non-zero replica counts
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/deployment/util/deployment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func rsListFromClient(c clientset.Interface) rsListFunc {
if err != nil {
return nil, err
}
ret := []*extensions.ReplicaSet{}
var ret []*extensions.ReplicaSet
for i := range rsList.Items {
ret = append(ret, &rsList.Items[i])
}
Expand Down Expand Up @@ -827,9 +827,12 @@ func WaitForPodsHashPopulated(c extensionslisters.ReplicaSetLister, desiredGener
}

// LabelPodsWithHash labels all pods in the given podList with the new hash label.
// The returned bool value can be used to tell if all pods are actually labeled.
func LabelPodsWithHash(podList *v1.PodList, c clientset.Interface, podLister corelisters.PodLister, namespace, name, hash string) error {
for _, pod := range podList.Items {
// Ignore inactive Pods.
if !controller.IsPodActive(&pod) {
continue
}
// Only label the pod that doesn't already have the new hash
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
_, err := UpdatePodWithRetries(c.Core().Pods(namespace), podLister, pod.Namespace, pod.Name,
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var _ = framework.KubeDescribe("Deployment", func() {
It("RollingUpdateDeployment should delete old pods and create new ones", func() {
testRollingUpdateDeployment(f)
})
It("RecreateDeployment should delete old pods and create new ones [Flaky]", func() {
It("RecreateDeployment should delete old pods and create new ones", func() {
testRecreateDeployment(f)
})
It("deployment should delete old replica sets", func() {
Expand Down