Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky e2e: Use expectation model for deployment's new rc creation #19433

Merged
merged 2 commits into from
Jan 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 32 additions & 32 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ func StaticResyncPeriodFunc(resyncPeriod time.Duration) ResyncPeriodFunc {
// }
//
// Implementation:
// PodExpectation = pair of atomic counters to track pod creation/deletion
// ControllerExpectationsStore = TTLStore + a PodExpectation per controller
// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion
// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller
//
// * Once set expectations can only be lowered
// * A controller isn't synced till its expectations are either fulfilled, or expire
// * Controllers that don't set expectations will get woken up for every matching pod
// * Controllers that don't set expectations will get woken up for every matching controllee

// ExpKeyFunc to parse out the key from a PodExpectation
// ExpKeyFunc to parse out the key from a ControlleeExpectation
var ExpKeyFunc = func(obj interface{}) (string, error) {
if e, ok := obj.(*PodExpectations); ok {
if e, ok := obj.(*ControlleeExpectations); ok {
return e.key, nil
}
return "", fmt.Errorf("Could not find key for obj %#v", obj)
Expand All @@ -96,7 +96,7 @@ var ExpKeyFunc = func(obj interface{}) (string, error) {
// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
// types of controllers, because the keys might conflict across types.
type ControllerExpectationsInterface interface {
GetExpectations(controllerKey string) (*PodExpectations, bool, error)
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
SatisfiedExpectations(controllerKey string) bool
DeleteExpectations(controllerKey string)
SetExpectations(controllerKey string, add, del int) error
Expand All @@ -111,43 +111,43 @@ type ControllerExpectations struct {
cache.Store
}

// GetExpectations returns the PodExpectations of the given controller.
func (r *ControllerExpectations) GetExpectations(controllerKey string) (*PodExpectations, bool, error) {
if podExp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
return podExp.(*PodExpectations), true, nil
// GetExpectations returns the ControlleeExpectations of the given controller.
func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) {
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
return exp.(*ControlleeExpectations), true, nil
} else {
return nil, false, err
}
}

// DeleteExpectations deletes the expectations of the given controller from the TTLStore.
func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
if podExp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
if err := r.Delete(podExp); err != nil {
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
if err := r.Delete(exp); err != nil {
glog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
}
}
}

// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
// Add/del counts are established by the controller at sync time, and updated as pods are observed by the controller
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
if podExp, exists, err := r.GetExpectations(controllerKey); exists {
if podExp.Fulfilled() {
if exp, exists, err := r.GetExpectations(controllerKey); exists {
if exp.Fulfilled() {
return true
} else {
glog.V(4).Infof("Controller still waiting on expectations %#v", podExp)
glog.V(4).Infof("Controller still waiting on expectations %#v", exp)
return false
}
} else if err != nil {
glog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
} else {
// When a new controller is created, it doesn't have expectations.
// When it doesn't see expected watch events for > TTL, the expectations expire.
// - In this case it wakes up, creates/deletes pods, and sets expectations again.
// When it has satisfied expectations and no pods need to be created/destroyed > TTL, the expectations expire.
// - In this case it continues without setting expectations till it needs to create/delete pods.
// - In this case it wakes up, creates/deletes controllees, and sets expectations again.
// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
// - In this case it continues without setting expectations till it needs to create/delete controllees.
glog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
}
// Trigger a sync if we either encountered and error (which shouldn't happen since we're
Expand All @@ -157,9 +157,9 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo

// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
podExp := &PodExpectations{add: int64(add), del: int64(del), key: controllerKey}
glog.V(4).Infof("Setting expectations %+v", podExp)
return r.Add(podExp)
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey}
glog.V(4).Infof("Setting expectations %+v", exp)
return r.Add(exp)
}

func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
Expand All @@ -172,10 +172,10 @@ func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int)

// Decrements the expectation counts of the given controller.
func (r *ControllerExpectations) lowerExpectations(controllerKey string, add, del int) {
if podExp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
podExp.Seen(int64(add), int64(del))
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Seen(int64(add), int64(del))
// The expectations might've been modified since the update on the previous line.
glog.V(4).Infof("Lowering expectations %+v", podExp)
glog.V(4).Infof("Lowering expectations %+v", exp)
}
}

Expand All @@ -194,31 +194,31 @@ type Expectations interface {
Fulfilled() bool
}

// PodExpectations track pod creates/deletes.
type PodExpectations struct {
// ControlleeExpectations track controllee creates/deletes.
type ControlleeExpectations struct {
add int64
del int64
key string
}

// Seen decrements the add and del counters.
func (e *PodExpectations) Seen(add, del int64) {
func (e *ControlleeExpectations) Seen(add, del int64) {
atomic.AddInt64(&e.add, -add)
atomic.AddInt64(&e.del, -del)
}

// Fulfilled returns true if this expectation has been fulfilled.
func (e *PodExpectations) Fulfilled() bool {
func (e *ControlleeExpectations) Fulfilled() bool {
// TODO: think about why this line being atomic doesn't matter
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}

// GetExpectations returns the add and del expectations of the pod.
func (e *PodExpectations) GetExpectations() (int64, int64) {
// GetExpectations returns the add and del expectations of the controllee.
func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
}

// NewControllerExpectations returns a store for PodExpectations.
// NewControllerExpectations returns a store for ControlleeExpectations.
func NewControllerExpectations() *ControllerExpectations {
return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)}
}
Expand Down
53 changes: 41 additions & 12 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ type DeploymentController struct {
podStoreSynced func() bool

// A TTLCache of pod creates/deletes each deployment expects to see
expectations controller.ControllerExpectationsInterface
podExpectations controller.ControllerExpectationsInterface

// A TTLCache of rc creates/deletes each deployment expects to see
// TODO: make expectation model understand (rc) updates (besides adds and deletes)
rcExpectations controller.ControllerExpectationsInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest also renaming the variables in controller_utils that indicate that expectations are only used for pods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Deployments that need to be synced
queue *workqueue.Type
Expand All @@ -91,11 +95,12 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re
eventBroadcaster.StartRecordingToSink(client.Events(""))

dc := &DeploymentController{
client: client,
expClient: client.Extensions(),
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.New(),
expectations: controller.NewControllerExpectations(),
client: client,
expClient: client.Extensions(),
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.New(),
podExpectations: controller.NewControllerExpectations(),
rcExpectations: controller.NewControllerExpectations(),
}

dc.dStore.Store, dc.dController = framework.NewInformer(
Expand Down Expand Up @@ -192,6 +197,12 @@ func (dc *DeploymentController) addRC(obj interface{}) {
rc := obj.(*api.ReplicationController)
glog.V(4).Infof("Replication controller %s added.", rc.Name)
if d := dc.getDeploymentForRC(rc); rc != nil {
dKey, err := controller.KeyFunc(d)
if err != nil {
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
return
}
dc.rcExpectations.CreationObserved(dKey)
dc.enqueueDeployment(d)
}
}
Expand Down Expand Up @@ -329,7 +340,7 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
return
}
dc.expectations.DeletionObserved(dKey)
dc.podExpectations.DeletionObserved(dKey)
}
}

Expand Down Expand Up @@ -384,15 +395,16 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}
if !exists {
glog.Infof("Deployment has been deleted %v", key)
dc.expectations.DeleteExpectations(key)
dc.podExpectations.DeleteExpectations(key)
dc.rcExpectations.DeleteExpectations(key)
return nil
}
d := *obj.(*extensions.Deployment)
if !dc.rcStoreSynced() {
// Sleep so we give the rc reflector goroutine a chance to run.
time.Sleep(RcStoreSyncedPollPeriod)
glog.Infof("Waiting for rc controller to sync, requeuing deployment %s", d.Name)
dc.enqueueDeployment(d)
dc.enqueueDeployment(&d)
return nil
}

Expand Down Expand Up @@ -475,13 +487,29 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api
if err != nil || existingNewRC != nil {
return existingNewRC, err
}
// Check the rc expectations of deployment before creating a new rc
dKey, err := controller.KeyFunc(&deployment)
if err != nil {
return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
}
if !dc.rcExpectations.SatisfiedExpectations(dKey) {
dc.enqueueDeployment(&deployment)
return nil, fmt.Errorf("RC expectations not met yet before getting new RC\n")
}
// new RC does not exist, create one.
namespace := deployment.ObjectMeta.Namespace
podTemplateSpecHash := deploymentutil.GetPodTemplateSpecHash(deployment.Spec.Template)
newRCTemplate := deploymentutil.GetNewRCTemplate(deployment)
// Add podTemplateHash label to selector.
newRCSelector := deploymentutil.CloneAndAddLabel(deployment.Spec.Selector, deployment.Spec.UniqueLabelKey, podTemplateSpecHash)

// Set RC expectations (1 rc should be created)
dKey, err = controller.KeyFunc(&deployment)
if err != nil {
return nil, fmt.Errorf("couldn't get key for deployment controller %#v: %v", deployment, err)
}
dc.rcExpectations.ExpectCreations(dKey, 1)
// Create new RC
newRC := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
GenerateName: deployment.Name + "-",
Expand All @@ -495,6 +523,7 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api
}
createdRC, err := dc.client.ReplicationControllers(namespace).Create(&newRC)
if err != nil {
dc.rcExpectations.DeleteExpectations(dKey)
return nil, fmt.Errorf("error creating replication controller: %v", err)
}
return createdRC, nil
Expand Down Expand Up @@ -556,8 +585,8 @@ func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControl
if err != nil {
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
}
if expectationsCheck && !dc.expectations.SatisfiedExpectations(dKey) {
fmt.Printf("Expectations not met yet before reconciling old RCs\n")
if expectationsCheck && !dc.podExpectations.SatisfiedExpectations(dKey) {
glog.V(4).Infof("Pod expectations not met yet before reconciling old RCs\n")
return false, nil
}
// Find the number of ready pods.
Expand Down Expand Up @@ -593,7 +622,7 @@ func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControl
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
}
if expectationsCheck {
dc.expectations.ExpectDeletions(dKey, scaleDownCount)
dc.podExpectations.ExpectDeletions(dKey, scaleDownCount)
}
}
return true, err
Expand Down