From 13326de3e8d86386abc4e89ada132d24d6490be3 Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Fri, 26 May 2017 13:46:04 +0200 Subject: [PATCH] Add DC controllerRef to RC --- pkg/controller/controller_ref_manager.go | 175 ++++++++++++++++++ .../deploymentconfig_controller.go | 76 +++++++- .../controller/deploymentconfig/factory.go | 5 + pkg/deploy/util/util.go | 24 ++- 4 files changed, 271 insertions(+), 9 deletions(-) create mode 100644 pkg/controller/controller_ref_manager.go diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go new file mode 100644 index 000000000000..96c50b38e82f --- /dev/null +++ b/pkg/controller/controller_ref_manager.go @@ -0,0 +1,175 @@ +package controller + +import ( + "fmt" + + "github.com/golang/glog" + kerrors "k8s.io/apimachinery/pkg/api/errors" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + klabels "k8s.io/apimachinery/pkg/labels" + kschema "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + kutilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/record" + kapi "k8s.io/kubernetes/pkg/api" + kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + kcontroller "k8s.io/kubernetes/pkg/controller" +) + +// RSControlInterface is an interface that knows how to add or delete +// ReplicationControllers, as well as increment or decrement them. It is used +// by the DeploymentConfig controller to ease testing of actions that it takes. +type RCControlInterface interface { + PatchReplicationController(namespace, name string, data []byte) error +} + +// RealRCControl is the default implementation of RCControlInterface. +type RealRCControl struct { + KubeClient kclientset.Interface + Recorder record.EventRecorder +} + +// To make sure RealRCControl implements RCControlInterface +var _ RCControlInterface = &RealRCControl{} + +// PatchReplicationController executes a strategic merge patch contained in 'data' on RC specified by 'namespace' and 'name' +func (r RealRCControl) PatchReplicationController(namespace, name string, data []byte) error { + _, err := r.KubeClient.Core().ReplicationControllers(namespace).Patch(name, types.StrategicMergePatchType, data) + return err +} + +type RCControllerRefManager struct { + kcontroller.BaseControllerRefManager + controllerKind kschema.GroupVersionKind + rcControl RCControlInterface +} + +// NewRCControllerRefManager returns a RCControllerRefManager that exposes +// methods to manage the controllerRef of ReplicationControllers. +// +// The CanAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If CanAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once CanAdopt() is called, it will not be called again by the same +// RCControllerRefManager instance. Create a new instance if it +// makes sense to check CanAdopt() again (e.g. in a different sync pass). +func NewRCControllerRefManager( + rcControl RCControlInterface, + controller kmetav1.Object, + selector klabels.Selector, + controllerKind kschema.GroupVersionKind, + canAdopt func() error, +) *RCControllerRefManager { + return &RCControllerRefManager{ + BaseControllerRefManager: kcontroller.BaseControllerRefManager{ + Controller: controller, + Selector: selector, + CanAdoptFunc: canAdopt, + }, + controllerKind: controllerKind, + rcControl: rcControl, + } +} + +// ClaimReplicationController tries to take ownership of a ReplicationController. +// +// It will reconcile the following: +// * Adopt the ReplicationController if it's an orphan. +// * Release owned ReplicationController if the selector no longer matches. +// +// A non-nil error is returned if some form of reconciliation was attempted and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The returned boolean indicates whether you now +// own the object. +func (m *RCControllerRefManager) ClaimReplicationController(rc *kapi.ReplicationController) (bool, error) { + match := func(obj kmetav1.Object) bool { + return m.Selector.Matches(klabels.Set(obj.GetLabels())) + } + adopt := func(obj kmetav1.Object) error { + return m.AdoptReplicationController(obj.(*kapi.ReplicationController)) + } + release := func(obj kmetav1.Object) error { + return m.ReleaseReplicationController(obj.(*kapi.ReplicationController)) + } + + return m.ClaimObject(rc, match, adopt, release) +} + +// ClaimReplicationControllers tries to take ownership of a list of ReplicationControllers. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// A non-nil error is returned if some form of reconciliation was attempted and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of ReplicationControllers that you now own is +// returned. +func (m *RCControllerRefManager) ClaimReplicationControllers(rcs []*kapi.ReplicationController) ([]*kapi.ReplicationController, error) { + var claimed []*kapi.ReplicationController + var errlist []error + + for _, rc := range rcs { + ok, err := m.ClaimReplicationController(rc) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, rc) + } + } + return claimed, kutilerrors.NewAggregate(errlist) +} + +// AdoptReplicationController sends a patch to take control of the ReplicationController. It returns the error if +// the patching fails. +func (m *RCControllerRefManager) AdoptReplicationController(rs *kapi.ReplicationController) error { + if err := m.CanAdopt(); err != nil { + return fmt.Errorf("can't adopt ReplicationController %s/%s (%s): %v", rs.Namespace, rs.Name, rs.UID, err) + } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. + addControllerPatch := fmt.Sprintf( + `{"metadata":{ + "ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}], + "uid":"%s", + "finalizers": ["%s"] + } + }`, + m.controllerKind.GroupVersion(), m.controllerKind.Kind, + m.Controller.GetName(), m.Controller.GetUID(), rs.UID, + kmetav1.FinalizerDeleteDependents) + return m.rcControl.PatchReplicationController(rs.Namespace, rs.Name, []byte(addControllerPatch)) +} + +// ReleaseReplicationController sends a patch to free the ReplicationController from the control of the Deployment controller. +// It returns the error if the patching fails. 404 and 422 errors are ignored. +func (m *RCControllerRefManager) ReleaseReplicationController(rc *kapi.ReplicationController) error { + glog.V(4).Infof("patching ReplicationController %s/%s to remove its controllerRef to %s/%s:%s", + rc.Namespace, rc.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), rc.UID) + err := m.rcControl.PatchReplicationController(rc.Namespace, rc.Name, []byte(deleteOwnerRefPatch)) + if err != nil { + if kerrors.IsNotFound(err) { + // If the ReplicationController no longer exists, ignore it. + return nil + } + if kerrors.IsInvalid(err) { + // Invalid error will be returned in two cases: 1. the ReplicationController + // has no owner reference, 2. the uid of the ReplicationController doesn't + // match, which means the ReplicationController is deleted and then recreated. + // In both cases, the error can be ignored. + return nil + } + } + return err +} diff --git a/pkg/deploy/controller/deploymentconfig/deploymentconfig_controller.go b/pkg/deploy/controller/deploymentconfig/deploymentconfig_controller.go index d4fd2d07675c..646a2e9cdda4 100644 --- a/pkg/deploy/controller/deploymentconfig/deploymentconfig_controller.go +++ b/pkg/deploy/controller/deploymentconfig/deploymentconfig_controller.go @@ -8,6 +8,7 @@ import ( kapierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" kutilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -17,9 +18,11 @@ import ( kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" "k8s.io/kubernetes/pkg/client/retry" + kcontroller "k8s.io/kubernetes/pkg/controller" osclient "github.com/openshift/origin/pkg/client" oscache "github.com/openshift/origin/pkg/client/cache" + oscontroller "github.com/openshift/origin/pkg/controller" deployapi "github.com/openshift/origin/pkg/deploy/api" deployutil "github.com/openshift/origin/pkg/deploy/util" ) @@ -68,6 +71,8 @@ type DeploymentConfigController struct { rcLister kcorelisters.ReplicationControllerLister // rcListerSynced makes sure the rc shared informer is synced before reconcling any deployment config. rcListerSynced func() bool + // rcControl is used for adopting/releasing replication controllers. + rcControl oscontroller.RCControlInterface // codec is used to build deployments from configs. codec runtime.Codec @@ -84,11 +89,29 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) return c.updateStatus(config, []*kapi.ReplicationController{}) } - // Find all deployments owned by the deployment config. + // List all ReplicationControllers to find also those we own but that no longer match our selector. + // They will be orphaned by ClaimReplicationControllers(). + rcList, err := c.rcLister.ReplicationControllers(config.Namespace).List(labels.Everything()) + if err != nil { + return fmt.Errorf("error while deploymentConfigController listing replication controllers: %v", err) + } selector := deployutil.ConfigSelector(config.Name) - existingDeployments, err := c.rcLister.ReplicationControllers(config.Namespace).List(selector) + // If any adoptions are attempted, we should first recheck for deletion with + // an uncached quorum read sometime after listing ReplicationControllers (see Kubernetes #42639). + canAdoptFunc := kcontroller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := c.dn.DeploymentConfigs(config.Namespace).Get(config.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if fresh.UID != config.UID { + return nil, fmt.Errorf("original DeploymentConfig %v/%v is gone: got uid %v, wanted %v", config.Namespace, config.Name, fresh.UID, config.UID) + } + return fresh, nil + }) + cm := oscontroller.NewRCControllerRefManager(c.rcControl, config, selector, deployutil.ControllerKind, canAdoptFunc) + existingDeployments, err := cm.ClaimReplicationControllers(rcList) if err != nil { - return err + return fmt.Errorf("error while deploymentConfigController claiming replication controllers: %v", err) } // In case the deployment config has been marked for deletion, merely update its status with @@ -125,6 +148,15 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) if err != nil { return err } + // We need to make sure we own that RC or adopt it if possible + isOurs, err := cm.ClaimReplicationController(rc) + if err != nil { + return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err) + } + if !isOurs { + return nil + } + copied, err := deployutil.DeploymentDeepCopy(rc) if err != nil { return err @@ -157,7 +189,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) return c.updateStatus(config, existingDeployments) } - return c.reconcileDeployments(existingDeployments, config) + return c.reconcileDeployments(existingDeployments, config, cm) } // If the config is paused we shouldn't create new deployments for it. if config.Spec.Paused { @@ -177,10 +209,26 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) } created, err := c.rn.ReplicationControllers(config.Namespace).Create(deployment) if err != nil { - // If the deployment was already created, just move on. The cache could be - // stale, or another process could have already handled this update. + // We need to find out if our controller owns that deployment and report error if not if kapierrors.IsAlreadyExists(err) { - return c.updateStatus(config, existingDeployments) + rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name) + if err != nil { + return fmt.Errorf("error while deploymentConfigController getting the replication controller %s/%s: %v", rc.Namespace, rc.Name, err) + } + // We need to make sure we own that RC or adopt it if possible + isOurs, err := cm.ClaimReplicationController(rc) + if err != nil { + return fmt.Errorf("error while deploymentConfigController claiming the replication controller: %v", err) + } + if isOurs { + // If the deployment was already created, just move on. The cache could be + // stale, or another process could have already handled this update. + return c.updateStatus(config, existingDeployments) + } else { + err = fmt.Errorf("replication controller %s already exists and deployment config is not allowed to claim it.", deployment.Name) + c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %v", config.Status.LatestVersion, err) + return c.updateStatus(config, existingDeployments) + } } c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %s", config.Status.LatestVersion, err) // We don't care about this error since we need to report the create failure. @@ -208,7 +256,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) // successful deployment, not necessarily the latest in terms of the config // version. The active deployment replica count should follow the config, and // all other deployments should be scaled to zero. -func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig) error { +func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig, cm *oscontroller.RCControllerRefManager) error { activeDeployment := deployutil.ActiveDeployment(existingDeployments) // Reconcile deployments. The active deployment follows the config, and all @@ -239,6 +287,18 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments [] if err != nil { return err } + // We need to make sure we own that RC or adopt it if possible + isOurs, err := cm.ClaimReplicationController(rc) + if err != nil { + return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err) + } + if !isOurs { + return fmt.Errorf("deployment config %s/%s (%v) no longer owns replication controller %s/%s (%v)", + config.Namespace, config.Name, config.UID, + deployment.Namespace, deployment.Name, deployment.UID, + ) + } + copied, err = deployutil.DeploymentDeepCopy(rc) if err != nil { return err diff --git a/pkg/deploy/controller/deploymentconfig/factory.go b/pkg/deploy/controller/deploymentconfig/factory.go index d4d2a26fe256..85843a3b29ce 100644 --- a/pkg/deploy/controller/deploymentconfig/factory.go +++ b/pkg/deploy/controller/deploymentconfig/factory.go @@ -20,6 +20,7 @@ import ( kcontroller "k8s.io/kubernetes/pkg/controller" osclient "github.com/openshift/origin/pkg/client" + oscontroller "github.com/openshift/origin/pkg/controller" deployapi "github.com/openshift/origin/pkg/deploy/api" ) @@ -51,6 +52,10 @@ func NewDeploymentConfigController( rcLister: rcInformer.Lister(), rcListerSynced: rcInformer.Informer().HasSynced, + rcControl: oscontroller.RealRCControl{ + KubeClient: internalKubeClientset, + Recorder: recorder, + }, recorder: recorder, codec: codec, diff --git a/pkg/deploy/util/util.go b/pkg/deploy/util/util.go index 2177a41b2e7b..487a6ff5d790 100644 --- a/pkg/deploy/util/util.go +++ b/pkg/deploy/util/util.go @@ -18,10 +18,16 @@ import ( kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" kdeplutil "k8s.io/kubernetes/pkg/controller/deployment/util" + osapiv1 "github.com/openshift/origin/pkg/api/v1" deployapi "github.com/openshift/origin/pkg/deploy/api" "github.com/openshift/origin/pkg/util/namer" ) +var ( + // ControllerKind contains the schema.GroupVersionKind for this controller type. + ControllerKind = osapiv1.SchemeGroupVersion.WithKind("DeploymentConfig") +) + // NewDeploymentCondition creates a new deployment condition. func NewDeploymentCondition(condType deployapi.DeploymentConditionType, status api.ConditionStatus, reason deployapi.DeploymentConditionReason, message string) *deployapi.DeploymentCondition { return &deployapi.DeploymentCondition{ @@ -228,6 +234,19 @@ func EncodeDeploymentConfig(config *deployapi.DeploymentConfig, codec runtime.Co return string(bytes[:]), nil } +func NewControllerRef(config *deployapi.DeploymentConfig) *metav1.OwnerReference { + blockOwnerDeletion := true + isController := true + return &metav1.OwnerReference{ + APIVersion: ControllerKind.Version, + Kind: ControllerKind.Kind, + Name: config.Name, + UID: config.UID, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &isController, + } +} + // MakeDeployment creates a deployment represented as a ReplicationController and based on the given // DeploymentConfig. The controller replica count will be zero. func MakeDeployment(config *deployapi.DeploymentConfig, codec runtime.Codec) (*api.ReplicationController, error) { @@ -279,6 +298,7 @@ func MakeDeployment(config *deployapi.DeploymentConfig, codec runtime.Codec) (*a podAnnotations[deployapi.DeploymentConfigAnnotation] = config.Name podAnnotations[deployapi.DeploymentVersionAnnotation] = strconv.FormatInt(config.Status.LatestVersion, 10) + controllerRef := NewControllerRef(config) deployment := &api.ReplicationController{ ObjectMeta: metav1.ObjectMeta{ Name: deploymentName, @@ -292,7 +312,9 @@ func MakeDeployment(config *deployapi.DeploymentConfig, codec runtime.Codec) (*a deployapi.DesiredReplicasAnnotation: strconv.Itoa(int(config.Spec.Replicas)), deployapi.DeploymentReplicasAnnotation: strconv.Itoa(0), }, - Labels: controllerLabels, + Labels: controllerLabels, + OwnerReferences: []metav1.OwnerReference{*controllerRef}, + Finalizers: []string{metav1.FinalizerDeleteDependents}, }, Spec: api.ReplicationControllerSpec{ // The deployment should be inactive initially