Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: scale proportionally before rolling out new templates #32995

Merged
merged 2 commits into from
Oct 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 36 additions & 66 deletions pkg/controller/deployment/deployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
exp "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/testing/core"
Expand All @@ -38,76 +38,47 @@ var (
noTimestamp = unversioned.Time{}
)

func rs(name string, replicas int, selector map[string]string, timestamp unversioned.Time) *exp.ReplicaSet {
return &exp.ReplicaSet{
func rs(name string, replicas int, selector map[string]string, timestamp unversioned.Time) *extensions.ReplicaSet {
return &extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
Name: name,
CreationTimestamp: timestamp,
Namespace: api.NamespaceDefault,
},
Spec: exp.ReplicaSetSpec{
Spec: extensions.ReplicaSetSpec{
Replicas: int32(replicas),
Selector: &unversioned.LabelSelector{MatchLabels: selector},
Template: api.PodTemplateSpec{},
},
}
}

func newRSWithStatus(name string, specReplicas, statusReplicas int, selector map[string]string) *exp.ReplicaSet {
func newRSWithStatus(name string, specReplicas, statusReplicas int, selector map[string]string) *extensions.ReplicaSet {
rs := rs(name, specReplicas, selector, noTimestamp)
rs.Status = exp.ReplicaSetStatus{
rs.Status = extensions.ReplicaSetStatus{
Replicas: int32(statusReplicas),
}
return rs
}

func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOrString, selector map[string]string) exp.Deployment {
return exp.Deployment{
func newDeployment(name string, replicas int, revisionHistoryLimit *int32, maxSurge, maxUnavailable *intstr.IntOrString, selector map[string]string) *extensions.Deployment {
d := extensions.Deployment{
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
ObjectMeta: api.ObjectMeta{
UID: uuid.NewUUID(),
Name: name,
Namespace: api.NamespaceDefault,
},
Spec: exp.DeploymentSpec{
Replicas: int32(replicas),
Selector: &unversioned.LabelSelector{MatchLabels: selector},
Strategy: exp.DeploymentStrategy{
Type: exp.RollingUpdateDeploymentStrategyType,
RollingUpdate: &exp.RollingUpdateDeployment{
MaxSurge: maxSurge,
MaxUnavailable: maxUnavailable,
},
},
},
}
}

func newDeployment(replicas int, revisionHistoryLimit *int) *exp.Deployment {
var v *int32
if revisionHistoryLimit != nil {
v = new(int32)
*v = int32(*revisionHistoryLimit)
}
d := exp.Deployment{
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
ObjectMeta: api.ObjectMeta{
UID: uuid.NewUUID(),
Name: "foobar",
Namespace: api.NamespaceDefault,
ResourceVersion: "18",
},
Spec: exp.DeploymentSpec{
Strategy: exp.DeploymentStrategy{
Type: exp.RollingUpdateDeploymentStrategyType,
RollingUpdate: &exp.RollingUpdateDeployment{},
Spec: extensions.DeploymentSpec{
Strategy: extensions.DeploymentStrategy{
Type: extensions.RollingUpdateDeploymentStrategyType,
RollingUpdate: &extensions.RollingUpdateDeployment{},
},
Replicas: int32(replicas),
Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
Selector: &unversioned.LabelSelector{MatchLabels: selector},
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"name": "foo",
"type": "production",
},
Labels: selector,
},
Spec: api.PodSpec{
Containers: []api.Container{
Expand All @@ -117,33 +88,32 @@ func newDeployment(replicas int, revisionHistoryLimit *int) *exp.Deployment {
},
},
},
RevisionHistoryLimit: v,
RevisionHistoryLimit: revisionHistoryLimit,
},
}
if maxSurge != nil {
d.Spec.Strategy.RollingUpdate.MaxSurge = *maxSurge
}
if maxUnavailable != nil {
d.Spec.Strategy.RollingUpdate.MaxUnavailable = *maxUnavailable
}
return &d
}

// TODO: Consolidate all deployment helpers into one.
func newDeploymentEnhanced(replicas int, maxSurge intstr.IntOrString) *exp.Deployment {
d := newDeployment(replicas, nil)
d.Spec.Strategy.RollingUpdate.MaxSurge = maxSurge
return d
}

func newReplicaSet(d *exp.Deployment, name string, replicas int) *exp.ReplicaSet {
return &exp.ReplicaSet{
func newReplicaSet(d *extensions.Deployment, name string, replicas int) *extensions.ReplicaSet {
return &extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: api.NamespaceDefault,
},
Spec: exp.ReplicaSetSpec{
Spec: extensions.ReplicaSetSpec{
Replicas: int32(replicas),
Template: d.Spec.Template,
},
}
}

func getKey(d *exp.Deployment, t *testing.T) string {
func getKey(d *extensions.Deployment, t *testing.T) string {
if key, err := controller.KeyFunc(d); err != nil {
t.Errorf("Unexpected error getting key for deployment %v: %v", d.Name, err)
return ""
Expand All @@ -157,8 +127,8 @@ type fixture struct {

client *fake.Clientset
// Objects to put in the store.
dLister []*exp.Deployment
rsLister []*exp.ReplicaSet
dLister []*extensions.Deployment
rsLister []*extensions.ReplicaSet
podLister []*api.Pod

// Actions expected to happen on the client. Objects from here are also
Expand All @@ -167,21 +137,21 @@ type fixture struct {
objects []runtime.Object
}

func (f *fixture) expectUpdateDeploymentAction(d *exp.Deployment) {
func (f *fixture) expectUpdateDeploymentAction(d *extensions.Deployment) {
f.actions = append(f.actions, core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "deployments"}, d.Namespace, d))
}

func (f *fixture) expectUpdateDeploymentStatusAction(d *exp.Deployment) {
func (f *fixture) expectUpdateDeploymentStatusAction(d *extensions.Deployment) {
action := core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "deployments"}, d.Namespace, d)
action.Subresource = "status"
f.actions = append(f.actions, action)
}

func (f *fixture) expectCreateRSAction(rs *exp.ReplicaSet) {
func (f *fixture) expectCreateRSAction(rs *extensions.ReplicaSet) {
f.actions = append(f.actions, core.NewCreateAction(unversioned.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs))
}

func (f *fixture) expectUpdateRSAction(rs *exp.ReplicaSet) {
func (f *fixture) expectUpdateRSAction(rs *extensions.ReplicaSet) {
f.actions = append(f.actions, core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs))
}

Expand Down Expand Up @@ -239,7 +209,7 @@ func (f *fixture) run(deploymentName string) {
func TestSyncDeploymentCreatesReplicaSet(t *testing.T) {
f := newFixture(t)

d := newDeployment(1, nil)
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
f.dLister = append(f.dLister, d)
f.objects = append(f.objects, d)

Expand All @@ -255,7 +225,7 @@ func TestSyncDeploymentCreatesReplicaSet(t *testing.T) {
func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) {
f := newFixture(t)

d := newDeployment(1, nil)
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
now := unversioned.Now()
d.DeletionTimestamp = &now
f.dLister = append(f.dLister, d)
Expand All @@ -272,7 +242,7 @@ func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing
controller.rsListerSynced = alwaysReady
controller.podListerSynced = alwaysReady

d := newDeployment(1, nil)
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
empty := unversioned.LabelSelector{}
d.Spec.Selector = &empty
controller.dLister.Indexer.Add(d)
Expand Down
31 changes: 19 additions & 12 deletions pkg/controller/deployment/rolling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,20 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) {
},
}

for i, test := range tests {
for i := range tests {
test := tests[i]
t.Logf("executing scenario %d", i)
newRS := rs("foo-v2", test.newReplicas, nil, noTimestamp)
oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp)
allRSs := []*exp.ReplicaSet{newRS, oldRS}
deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil)
maxUnavailable := intstr.FromInt(0)
deployment := newDeployment("foo", test.deploymentReplicas, nil, &test.maxSurge, &maxUnavailable, map[string]string{"foo": "bar"})
fake := fake.Clientset{}
controller := &DeploymentController{
client: &fake,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, &deployment)
scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, deployment)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
Expand Down Expand Up @@ -178,7 +180,8 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
scaleExpected: false,
},
}
for i, test := range tests {
for i := range tests {
test := tests[i]
t.Logf("executing scenario %d", i)

newSelector := map[string]string{"foo": "new"}
Expand All @@ -187,8 +190,8 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
oldRS := rs("foo-old", test.oldReplicas, oldSelector, noTimestamp)
oldRSs := []*exp.ReplicaSet{oldRS}
allRSs := []*exp.ReplicaSet{oldRS, newRS}

deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, newSelector)
maxSurge := intstr.FromInt(0)
deployment := newDeployment("foo", test.deploymentReplicas, nil, &maxSurge, &test.maxUnavailable, newSelector)
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
Expand Down Expand Up @@ -267,7 +270,7 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
eventRecorder: &record.FakeRecorder{},
}

scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, &deployment)
scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
Expand Down Expand Up @@ -325,7 +328,9 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
t.Logf("executing scenario %d", i)
oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp)
oldRSs := []*exp.ReplicaSet{oldRS}
deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil)
maxSurge := intstr.FromInt(2)
maxUnavailable := intstr.FromInt(2)
deployment := newDeployment("foo", 10, nil, &maxSurge, &maxUnavailable, nil)
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
Expand Down Expand Up @@ -370,7 +375,7 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
_, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, 0, int32(test.maxCleanupCount))
_, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, 0, int32(test.maxCleanupCount))
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
Expand Down Expand Up @@ -436,12 +441,14 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
},
}

for i, test := range tests {
for i := range tests {
test := tests[i]
t.Logf("executing scenario %d", i)
oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp)
allRSs := []*exp.ReplicaSet{oldRS}
oldRSs := []*exp.ReplicaSet{oldRS}
deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"})
maxSurge := intstr.FromInt(0)
deployment := newDeployment("foo", test.deploymentReplicas, nil, &maxSurge, &test.maxUnavailable, map[string]string{"foo": "bar"})
fakeClientset := fake.Clientset{}
fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
switch action.(type) {
Expand Down Expand Up @@ -471,7 +478,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, &deployment)
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
if !test.errorExpected && err != nil {
t.Errorf("unexpected error: %v", err)
continue
Expand Down
16 changes: 0 additions & 16 deletions pkg/controller/deployment/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,22 +534,6 @@ func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) (bool,
if err != nil {
return false, err
}
// If there is no new replica set matching this deployment and the deployment isn't paused
// then there is a new rollout that waits to happen
if newRS == nil && !d.Spec.Paused {
// Update all active replicas sets to the new deployment size. SetReplicasAnnotations makes
// sure that we will update only replica sets that don't have the current size of the deployment.
maxSurge := deploymentutil.MaxSurge(*d)
for _, rs := range controller.FilterActiveReplicaSets(oldRSs) {
if updated := deploymentutil.SetReplicasAnnotations(rs, d.Spec.Replicas, d.Spec.Replicas+maxSurge); updated {
if _, err := dc.client.Extensions().ReplicaSets(rs.Namespace).Update(rs); err != nil {
glog.Infof("Cannot update annotations for replica set %q: %v", rs.Name, err)
return false, err
}
}
}
return false, nil
}
allRSs := append(oldRSs, newRS)
for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)
Expand Down