Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore expectation timeout to 5 minutes #22138

Merged
merged 1 commit into from
Feb 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 16 additions & 3 deletions pkg/controller/controller_utils.go
Expand Up @@ -36,7 +36,21 @@ import (
"k8s.io/kubernetes/pkg/util"
)

const CreatedByAnnotation = "kubernetes.io/created-by"
const (
CreatedByAnnotation = "kubernetes.io/created-by"

// If a watch drops a delete event for a pod, it'll take this long
// before a dormant controller waiting for those packets is woken up anyway. It is
// specifically targeted at the case where some problem prevents an update
// of expectations, without it the controller could stay asleep forever. This should
// be set based on the expected latency of watch events.
//
// Currently a controller can service (create *and* observe the watch events for said
// creation) about 10 pods a second, so it takes about 1 min to service
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
// latency/pod at the scale of 3000 pods over 100 nodes.
ExpectationsTimeout = 5 * time.Minute
)

var (
KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
Expand Down Expand Up @@ -150,10 +164,9 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo

// TODO: Extend ExpirationCache to support explicit expiration.
// TODO: Make this possible to disable in tests.
// TODO: Parameterize timeout.
// TODO: Support injection of clock.
func (exp *ControlleeExpectations) isExpired() bool {
return util.RealClock{}.Since(exp.timestamp) > 10*time.Second
return util.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}

// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
Expand Down
191 changes: 51 additions & 140 deletions pkg/controller/deployment/deployment_controller.go

Large diffs are not rendered by default.

34 changes: 13 additions & 21 deletions pkg/controller/deployment/deployment_controller_test.go
Expand Up @@ -93,12 +93,10 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) {
deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0))
fake := fake.Clientset{}
controller := &DeploymentController{
client: &fake,
eventRecorder: &record.FakeRecorder{},
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: &fake,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, deployment)
scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, &deployment)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
Expand Down Expand Up @@ -269,13 +267,11 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
return false, nil, nil
})
controller := &DeploymentController{
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}

scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment)
scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, &deployment)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
Expand Down Expand Up @@ -375,12 +371,10 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
})

controller := &DeploymentController{
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, test.maxCleanupCount)
cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, test.maxCleanupCount)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
Expand Down Expand Up @@ -464,12 +458,10 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
return false, nil, nil
})
controller := &DeploymentController{
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
podExpectations: controller.NewControllerExpectations(),
rsExpectations: controller.NewControllerExpectations(),
client: &fakeClientset,
eventRecorder: &record.FakeRecorder{},
}
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, &deployment)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
Expand Down Expand Up @@ -555,7 +547,7 @@ func TestDeploymentController_cleanupOldReplicaSets(t *testing.T) {
}

d := newDeployment(1, &tests[i].revisionHistoryLimit)
controller.cleanupOldReplicaSets(test.oldRSs, *d)
controller.cleanupOldReplicaSets(test.oldRSs, d)

gotDeletions := 0
for _, action := range fake.Actions() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubectl/describe.go
Expand Up @@ -1732,11 +1732,11 @@ func (dd *DeploymentDescriber) Describe(namespace, name string) (string, error)
ru := d.Spec.Strategy.RollingUpdate
fmt.Fprintf(out, "RollingUpdateStrategy:\t%s max unavailable, %s max surge\n", ru.MaxUnavailable.String(), ru.MaxSurge.String())
}
oldRSs, _, err := deploymentutil.GetOldReplicaSets(*d, dd)
oldRSs, _, err := deploymentutil.GetOldReplicaSets(d, dd)
if err == nil {
fmt.Fprintf(out, "OldReplicaSets:\t%s\n", printReplicaSetsByLabels(oldRSs))
}
newRS, err := deploymentutil.GetNewReplicaSet(*d, dd)
newRS, err := deploymentutil.GetNewReplicaSet(d, dd)
if err == nil {
var newRSs []*extensions.ReplicaSet
if newRS != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubectl/history.go
Expand Up @@ -67,11 +67,11 @@ func (h *DeploymentHistoryViewer) History(namespace, name string) (HistoryInfo,
if err != nil {
return historyInfo, fmt.Errorf("failed to retrieve deployment %s: %v", name, err)
}
_, allOldRSs, err := deploymentutil.GetOldReplicaSets(*deployment, h.c)
_, allOldRSs, err := deploymentutil.GetOldReplicaSets(deployment, h.c)
if err != nil {
return historyInfo, fmt.Errorf("failed to retrieve old replica sets from deployment %s: %v", name, err)
}
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, h.c)
newRS, err := deploymentutil.GetNewReplicaSet(deployment, h.c)
if err != nil {
return historyInfo, fmt.Errorf("failed to retrieve new replica set from deployment %s: %v", name, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubectl/stop_test.go
Expand Up @@ -512,7 +512,7 @@ func TestDeploymentStop(t *testing.T) {
Replicas: 0,
},
}
template := deploymentutil.GetNewReplicaSetTemplate(deployment)
template := deploymentutil.GetNewReplicaSetTemplate(&deployment)
tests := []struct {
Name string
Objs []runtime.Object
Expand Down
14 changes: 7 additions & 7 deletions pkg/util/deployment/deployment.go
Expand Up @@ -49,7 +49,7 @@ const (

// GetOldReplicaSets returns the old replica sets targeted by the given Deployment; get PodList and ReplicaSetList from client interface.
// Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
func GetOldReplicaSets(deployment extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
func GetOldReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
return GetOldReplicaSetsFromLists(deployment, c,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
return c.Core().Pods(namespace).List(options)
Expand All @@ -66,7 +66,7 @@ type podListFunc func(string, api.ListOptions) (*api.PodList, error)

// GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions.
// Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
func GetOldReplicaSetsFromLists(deployment *extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
// Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList.
// All pods and replica sets are labeled with pod-template-hash to prevent overlapping
// TODO: Right now we list all replica sets and then filter. We should add an API for this.
Expand Down Expand Up @@ -109,7 +109,7 @@ func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.In

// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface.
// Returns nil if the new replica set doesn't exist yet.
func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) {
func GetNewReplicaSet(deployment *extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) {
return GetNewReplicaSetFromList(deployment, c,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
return c.Core().Pods(namespace).List(options)
Expand All @@ -122,7 +122,7 @@ func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (

// GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function.
// Returns nil if the new replica set doesn't exist yet.
func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) {
func GetNewReplicaSetFromList(deployment *extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) {
rsList, _, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList)
if err != nil {
return nil, fmt.Errorf("error listing ReplicaSets: %v", err)
Expand All @@ -140,7 +140,7 @@ func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Inte
}

// rsAndPodsWithHashKeySynced returns the RSs and pods the given deployment targets, with pod-template-hash information synced.
func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, *api.PodList, error) {
func rsAndPodsWithHashKeySynced(deployment *extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, *api.PodList, error) {
namespace := deployment.Namespace
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
Expand Down Expand Up @@ -173,7 +173,7 @@ func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.In
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
// 2. Add hash label to all pods this rs owns
// 3. Add hash label to the rs's label and selector
func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (updatedRS *extensions.ReplicaSet, err error) {
func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (updatedRS *extensions.ReplicaSet, err error) {
updatedRS = &rs
// If the rs already has the new hash label in its selector, it's done syncing
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
Expand Down Expand Up @@ -307,7 +307,7 @@ func updatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod,
}

// Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
func GetNewReplicaSetTemplate(deployment extensions.Deployment) api.PodTemplateSpec {
func GetNewReplicaSetTemplate(deployment *extensions.Deployment) api.PodTemplateSpec {
// newRS will have the same template as in deployment spec, plus a unique label in some cases.
newRSTemplate := api.PodTemplateSpec{
ObjectMeta: deployment.Spec.Template.ObjectMeta,
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/deployment/deployment_test.go
Expand Up @@ -167,7 +167,7 @@ func generateRSWithLabel(labels map[string]string, image string) extensions.Repl

// generateRS creates a replica set, with the input deployment's template as its template
func generateRS(deployment extensions.Deployment) extensions.ReplicaSet {
template := GetNewReplicaSetTemplate(deployment)
template := GetNewReplicaSetTemplate(&deployment)
return extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
Name: api.SimpleNameGenerator.GenerateName("replicaset"),
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestGetNewRC(t *testing.T) {
fakeClient = addListRSReactor(fakeClient, test.objs[1])
fakeClient = addUpdatePodsReactor(fakeClient)
fakeClient = addUpdateRSReactor(fakeClient)
rs, err := GetNewReplicaSet(newDeployment, fakeClient)
rs, err := GetNewReplicaSet(&newDeployment, fakeClient)
if err != nil {
t.Errorf("In test case %s, got unexpected error %v", test.test, err)
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestGetOldRCs(t *testing.T) {
fakeClient = addListRSReactor(fakeClient, test.objs[1])
fakeClient = addUpdatePodsReactor(fakeClient)
fakeClient = addUpdateRSReactor(fakeClient)
rss, _, err := GetOldReplicaSets(newDeployment, fakeClient)
rss, _, err := GetOldReplicaSets(&newDeployment, fakeClient)
if err != nil {
t.Errorf("In test case %s, got unexpected error %v", test.test, err)
}
Expand Down
20 changes: 10 additions & 10 deletions test/e2e/deployment.go
Expand Up @@ -142,7 +142,7 @@ func checkDeploymentRevision(c *clientset.Clientset, ns, deploymentName, revisio
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
// Check revision of the new replica set of this deployment
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(newRS.Annotations).NotTo(Equal(nil))
Expect(newRS.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal(revision))
Expand Down Expand Up @@ -280,7 +280,7 @@ func testRollingUpdateDeployment(f *Framework) {
// There should be 1 old RS (nginx-controller, which is adopted)
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
_, allOldRSs, err := deploymentutil.GetOldReplicaSets(*deployment, c)
_, allOldRSs, err := deploymentutil.GetOldReplicaSets(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(len(allOldRSs)).Should(Equal(1))
// The old RS should contain pod-template-hash in its selector, label, and template label
Expand Down Expand Up @@ -340,7 +340,7 @@ func testRollingUpdateDeploymentEvents(f *Framework) {
// There should be 2 events, one to scale up the new ReplicaSet and then to scale down
// the old ReplicaSet.
Expect(len(events.Items)).Should(Equal(2))
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(newRS).NotTo(Equal(nil))
Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 1", newRS.Name)))
Expand Down Expand Up @@ -395,7 +395,7 @@ func testRecreateDeployment(f *Framework) {
}
// There should be 2 events, one to scale up the new ReplicaSet and then to scale down the old ReplicaSet.
Expect(len(events.Items)).Should(Equal(2))
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(newRS).NotTo(Equal(nil))
Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName)))
Expand Down Expand Up @@ -529,7 +529,7 @@ func testPausedDeployment(f *Framework) {
Expect(err).NotTo(HaveOccurred())

// Verify that there is no latest state realized for the new deployment.
rs, err := deploymentutil.GetNewReplicaSet(*deployment, c)
rs, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
if rs != nil {
err = fmt.Errorf("unexpected new rs/%s for deployment/%s", rs.Name, deployment.Name)
Expand Down Expand Up @@ -573,7 +573,7 @@ func testPausedDeployment(f *Framework) {
err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())

newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(DeleteReplicaSet(unversionedClient, ns, newRS.Name)).NotTo(HaveOccurred())

Expand All @@ -584,7 +584,7 @@ func testPausedDeployment(f *Framework) {
err = fmt.Errorf("deployment %q should be paused", deployment.Name)
Expect(err).NotTo(HaveOccurred())
}
shouldBeNil, err := deploymentutil.GetNewReplicaSet(*deployment, c)
shouldBeNil, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
if shouldBeNil != nil {
err = fmt.Errorf("deployment %q shouldn't have a replica set but there is %q", deployment.Name, shouldBeNil.Name)
Expand Down Expand Up @@ -848,7 +848,7 @@ func testDeploymentLabelAdopted(f *Framework) {
Logf("deleting deployment %s", deploymentName)
Expect(c.Extensions().Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
// TODO: remove this once we can delete replica sets with deployment
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(c.Extensions().ReplicaSets(ns).Delete(newRS.Name, nil)).NotTo(HaveOccurred())
}()
Expand All @@ -863,12 +863,12 @@ func testDeploymentLabelAdopted(f *Framework) {
// There should be no old RSs (overlapping RS)
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
oldRSs, allOldRSs, err := deploymentutil.GetOldReplicaSets(*deployment, c)
oldRSs, allOldRSs, err := deploymentutil.GetOldReplicaSets(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(len(oldRSs)).Should(Equal(0))
Expect(len(allOldRSs)).Should(Equal(0))
// New RS should contain pod-template-hash in its selector, label, and template label
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(len(newRS.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
Expect(len(newRS.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/util.go
Expand Up @@ -2248,11 +2248,11 @@ func waitForDeploymentStatus(c clientset.Interface, ns, deploymentName string, d
if err != nil {
return false, err
}
oldRSs, allOldRSs, err = deploymentutil.GetOldReplicaSets(*deployment, c)
oldRSs, allOldRSs, err = deploymentutil.GetOldReplicaSets(deployment, c)
if err != nil {
return false, err
}
newRS, err = deploymentutil.GetNewReplicaSet(*deployment, c)
newRS, err = deploymentutil.GetNewReplicaSet(deployment, c)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -2318,7 +2318,7 @@ func waitForDeploymentOldRSsNum(c *clientset.Clientset, ns, deploymentName strin
if err != nil {
return false, err
}
_, oldRSs, err := deploymentutil.GetOldReplicaSets(*deployment, c)
_, oldRSs, err := deploymentutil.GetOldReplicaSets(deployment, c)
if err != nil {
return false, err
}
Expand Down