Skip to content

Commit

Permalink
Add DC controllerRef to RC
Browse files Browse the repository at this point in the history
  • Loading branch information
tnozicka committed May 30, 2017
1 parent eeaf7bc commit 13326de
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 9 deletions.
175 changes: 175 additions & 0 deletions pkg/controller/controller_ref_manager.go
@@ -0,0 +1,175 @@
package controller

import (
"fmt"

"github.com/golang/glog"
kerrors "k8s.io/apimachinery/pkg/api/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
kschema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
kapi "k8s.io/kubernetes/pkg/api"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kcontroller "k8s.io/kubernetes/pkg/controller"
)

// RSControlInterface is an interface that knows how to add or delete
// ReplicationControllers, as well as increment or decrement them. It is used
// by the DeploymentConfig controller to ease testing of actions that it takes.
type RCControlInterface interface {
PatchReplicationController(namespace, name string, data []byte) error
}

// RealRCControl is the default implementation of RCControlInterface.
type RealRCControl struct {
KubeClient kclientset.Interface
Recorder record.EventRecorder
}

// To make sure RealRCControl implements RCControlInterface
var _ RCControlInterface = &RealRCControl{}

// PatchReplicationController executes a strategic merge patch contained in 'data' on RC specified by 'namespace' and 'name'
func (r RealRCControl) PatchReplicationController(namespace, name string, data []byte) error {
_, err := r.KubeClient.Core().ReplicationControllers(namespace).Patch(name, types.StrategicMergePatchType, data)
return err
}

type RCControllerRefManager struct {
kcontroller.BaseControllerRefManager
controllerKind kschema.GroupVersionKind
rcControl RCControlInterface
}

// NewRCControllerRefManager returns a RCControllerRefManager that exposes
// methods to manage the controllerRef of ReplicationControllers.
//
// The CanAdopt() function can be used to perform a potentially expensive check
// (such as a live GET from the API server) prior to the first adoption.
// It will only be called (at most once) if an adoption is actually attempted.
// If CanAdopt() returns a non-nil error, all adoptions will fail.
//
// NOTE: Once CanAdopt() is called, it will not be called again by the same
// RCControllerRefManager instance. Create a new instance if it
// makes sense to check CanAdopt() again (e.g. in a different sync pass).
func NewRCControllerRefManager(
rcControl RCControlInterface,
controller kmetav1.Object,
selector klabels.Selector,
controllerKind kschema.GroupVersionKind,
canAdopt func() error,
) *RCControllerRefManager {
return &RCControllerRefManager{
BaseControllerRefManager: kcontroller.BaseControllerRefManager{
Controller: controller,
Selector: selector,
CanAdoptFunc: canAdopt,
},
controllerKind: controllerKind,
rcControl: rcControl,
}
}

// ClaimReplicationController tries to take ownership of a ReplicationController.
//
// It will reconcile the following:
// * Adopt the ReplicationController if it's an orphan.
// * Release owned ReplicationController if the selector no longer matches.
//
// A non-nil error is returned if some form of reconciliation was attempted and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The returned boolean indicates whether you now
// own the object.
func (m *RCControllerRefManager) ClaimReplicationController(rc *kapi.ReplicationController) (bool, error) {
match := func(obj kmetav1.Object) bool {
return m.Selector.Matches(klabels.Set(obj.GetLabels()))
}
adopt := func(obj kmetav1.Object) error {
return m.AdoptReplicationController(obj.(*kapi.ReplicationController))
}
release := func(obj kmetav1.Object) error {
return m.ReleaseReplicationController(obj.(*kapi.ReplicationController))
}

return m.ClaimObject(rc, match, adopt, release)
}

// ClaimReplicationControllers tries to take ownership of a list of ReplicationControllers.
//
// It will reconcile the following:
// * Adopt orphans if the selector matches.
// * Release owned objects if the selector no longer matches.
//
// A non-nil error is returned if some form of reconciliation was attempted and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of ReplicationControllers that you now own is
// returned.
func (m *RCControllerRefManager) ClaimReplicationControllers(rcs []*kapi.ReplicationController) ([]*kapi.ReplicationController, error) {
var claimed []*kapi.ReplicationController
var errlist []error

for _, rc := range rcs {
ok, err := m.ClaimReplicationController(rc)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, rc)
}
}
return claimed, kutilerrors.NewAggregate(errlist)
}

// AdoptReplicationController sends a patch to take control of the ReplicationController. It returns the error if
// the patching fails.
func (m *RCControllerRefManager) AdoptReplicationController(rs *kapi.ReplicationController) error {
if err := m.CanAdopt(); err != nil {
return fmt.Errorf("can't adopt ReplicationController %s/%s (%s): %v", rs.Namespace, rs.Name, rs.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf(
`{"metadata":{
"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],
"uid":"%s",
"finalizers": ["%s"]
}
}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.Controller.GetName(), m.Controller.GetUID(), rs.UID,
kmetav1.FinalizerDeleteDependents)
return m.rcControl.PatchReplicationController(rs.Namespace, rs.Name, []byte(addControllerPatch))
}

// ReleaseReplicationController sends a patch to free the ReplicationController from the control of the Deployment controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *RCControllerRefManager) ReleaseReplicationController(rc *kapi.ReplicationController) error {
glog.V(4).Infof("patching ReplicationController %s/%s to remove its controllerRef to %s/%s:%s",
rc.Namespace, rc.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), rc.UID)
err := m.rcControl.PatchReplicationController(rc.Namespace, rc.Name, []byte(deleteOwnerRefPatch))
if err != nil {
if kerrors.IsNotFound(err) {
// If the ReplicationController no longer exists, ignore it.
return nil
}
if kerrors.IsInvalid(err) {
// Invalid error will be returned in two cases: 1. the ReplicationController
// has no owner reference, 2. the uid of the ReplicationController doesn't
// match, which means the ReplicationController is deleted and then recreated.
// In both cases, the error can be ignored.
return nil
}
}
return err
}
Expand Up @@ -8,6 +8,7 @@ import (

kapierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -17,9 +18,11 @@ import (
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/client/retry"
kcontroller "k8s.io/kubernetes/pkg/controller"

osclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
oscontroller "github.com/openshift/origin/pkg/controller"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
Expand Down Expand Up @@ -68,6 +71,8 @@ type DeploymentConfigController struct {
rcLister kcorelisters.ReplicationControllerLister
// rcListerSynced makes sure the rc shared informer is synced before reconcling any deployment config.
rcListerSynced func() bool
// rcControl is used for adopting/releasing replication controllers.
rcControl oscontroller.RCControlInterface

// codec is used to build deployments from configs.
codec runtime.Codec
Expand All @@ -84,11 +89,29 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
return c.updateStatus(config, []*kapi.ReplicationController{})
}

// Find all deployments owned by the deployment config.
// List all ReplicationControllers to find also those we own but that no longer match our selector.
// They will be orphaned by ClaimReplicationControllers().
rcList, err := c.rcLister.ReplicationControllers(config.Namespace).List(labels.Everything())
if err != nil {
return fmt.Errorf("error while deploymentConfigController listing replication controllers: %v", err)
}
selector := deployutil.ConfigSelector(config.Name)
existingDeployments, err := c.rcLister.ReplicationControllers(config.Namespace).List(selector)
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing ReplicationControllers (see Kubernetes #42639).
canAdoptFunc := kcontroller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := c.dn.DeploymentConfigs(config.Namespace).Get(config.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != config.UID {
return nil, fmt.Errorf("original DeploymentConfig %v/%v is gone: got uid %v, wanted %v", config.Namespace, config.Name, fresh.UID, config.UID)
}
return fresh, nil
})
cm := oscontroller.NewRCControllerRefManager(c.rcControl, config, selector, deployutil.ControllerKind, canAdoptFunc)
existingDeployments, err := cm.ClaimReplicationControllers(rcList)
if err != nil {
return err
return fmt.Errorf("error while deploymentConfigController claiming replication controllers: %v", err)
}

// In case the deployment config has been marked for deletion, merely update its status with
Expand Down Expand Up @@ -125,6 +148,15 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
if err != nil {
return err
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
if !isOurs {
return nil
}

copied, err := deployutil.DeploymentDeepCopy(rc)
if err != nil {
return err
Expand Down Expand Up @@ -157,7 +189,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
return c.updateStatus(config, existingDeployments)
}

return c.reconcileDeployments(existingDeployments, config)
return c.reconcileDeployments(existingDeployments, config, cm)
}
// If the config is paused we shouldn't create new deployments for it.
if config.Spec.Paused {
Expand All @@ -177,10 +209,26 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
}
created, err := c.rn.ReplicationControllers(config.Namespace).Create(deployment)
if err != nil {
// If the deployment was already created, just move on. The cache could be
// stale, or another process could have already handled this update.
// We need to find out if our controller owns that deployment and report error if not
if kapierrors.IsAlreadyExists(err) {
return c.updateStatus(config, existingDeployments)
rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
if err != nil {
return fmt.Errorf("error while deploymentConfigController getting the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller: %v", err)
}
if isOurs {
// If the deployment was already created, just move on. The cache could be
// stale, or another process could have already handled this update.
return c.updateStatus(config, existingDeployments)
} else {
err = fmt.Errorf("replication controller %s already exists and deployment config is not allowed to claim it.", deployment.Name)
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %v", config.Status.LatestVersion, err)
return c.updateStatus(config, existingDeployments)
}
}
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %s", config.Status.LatestVersion, err)
// We don't care about this error since we need to report the create failure.
Expand Down Expand Up @@ -208,7 +256,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
// successful deployment, not necessarily the latest in terms of the config
// version. The active deployment replica count should follow the config, and
// all other deployments should be scaled to zero.
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig) error {
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig, cm *oscontroller.RCControllerRefManager) error {
activeDeployment := deployutil.ActiveDeployment(existingDeployments)

// Reconcile deployments. The active deployment follows the config, and all
Expand Down Expand Up @@ -239,6 +287,18 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []
if err != nil {
return err
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
if !isOurs {
return fmt.Errorf("deployment config %s/%s (%v) no longer owns replication controller %s/%s (%v)",
config.Namespace, config.Name, config.UID,
deployment.Namespace, deployment.Name, deployment.UID,
)
}

copied, err = deployutil.DeploymentDeepCopy(rc)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/deploy/controller/deploymentconfig/factory.go
Expand Up @@ -20,6 +20,7 @@ import (
kcontroller "k8s.io/kubernetes/pkg/controller"

osclient "github.com/openshift/origin/pkg/client"
oscontroller "github.com/openshift/origin/pkg/controller"
deployapi "github.com/openshift/origin/pkg/deploy/api"
)

Expand Down Expand Up @@ -51,6 +52,10 @@ func NewDeploymentConfigController(

rcLister: rcInformer.Lister(),
rcListerSynced: rcInformer.Informer().HasSynced,
rcControl: oscontroller.RealRCControl{
KubeClient: internalKubeClientset,
Recorder: recorder,
},

recorder: recorder,
codec: codec,
Expand Down
24 changes: 23 additions & 1 deletion pkg/deploy/util/util.go
Expand Up @@ -18,10 +18,16 @@ import (
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
kdeplutil "k8s.io/kubernetes/pkg/controller/deployment/util"

osapiv1 "github.com/openshift/origin/pkg/api/v1"
deployapi "github.com/openshift/origin/pkg/deploy/api"
"github.com/openshift/origin/pkg/util/namer"
)

var (
// ControllerKind contains the schema.GroupVersionKind for this controller type.
ControllerKind = osapiv1.SchemeGroupVersion.WithKind("DeploymentConfig")
)

// NewDeploymentCondition creates a new deployment condition.
func NewDeploymentCondition(condType deployapi.DeploymentConditionType, status api.ConditionStatus, reason deployapi.DeploymentConditionReason, message string) *deployapi.DeploymentCondition {
return &deployapi.DeploymentCondition{
Expand Down Expand Up @@ -228,6 +234,19 @@ func EncodeDeploymentConfig(config *deployapi.DeploymentConfig, codec runtime.Co
return string(bytes[:]), nil
}

func NewControllerRef(config *deployapi.DeploymentConfig) *metav1.OwnerReference {
blockOwnerDeletion := true
isController := true
return &metav1.OwnerReference{
APIVersion: ControllerKind.Version,
Kind: ControllerKind.Kind,
Name: config.Name,
UID: config.UID,
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
}
}

// MakeDeployment creates a deployment represented as a ReplicationController and based on the given
// DeploymentConfig. The controller replica count will be zero.
func MakeDeployment(config *deployapi.DeploymentConfig, codec runtime.Codec) (*api.ReplicationController, error) {
Expand Down Expand Up @@ -279,6 +298,7 @@ func MakeDeployment(config *deployapi.DeploymentConfig, codec runtime.Codec) (*a
podAnnotations[deployapi.DeploymentConfigAnnotation] = config.Name
podAnnotations[deployapi.DeploymentVersionAnnotation] = strconv.FormatInt(config.Status.LatestVersion, 10)

controllerRef := NewControllerRef(config)
deployment := &api.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: deploymentName,
Expand All @@ -292,7 +312,9 @@ func MakeDeployment(config *deployapi.DeploymentConfig, codec runtime.Codec) (*a
deployapi.DesiredReplicasAnnotation: strconv.Itoa(int(config.Spec.Replicas)),
deployapi.DeploymentReplicasAnnotation: strconv.Itoa(0),
},
Labels: controllerLabels,
Labels: controllerLabels,
OwnerReferences: []metav1.OwnerReference{*controllerRef},
Finalizers: []string{metav1.FinalizerDeleteDependents},
},
Spec: api.ReplicationControllerSpec{
// The deployment should be inactive initially
Expand Down

0 comments on commit 13326de

Please sign in to comment.