Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding cascading deletion support to federation replicaset and deployments #36476

Merged
merged 3 commits into from Nov 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions federation/pkg/federation-controller/deployment/BUILD
Expand Up @@ -19,16 +19,19 @@ go_library(
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_release_1_5:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/flowcontrol:go_default_library",
"//pkg/util/wait:go_default_library",
Expand Down
Expand Up @@ -29,16 +29,19 @@ import (
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
Expand Down Expand Up @@ -92,6 +95,8 @@ type DeploymentController struct {
deploymentBackoff *flowcontrol.Backoff
eventRecorder record.EventRecorder

deletionHelper *deletionhelper.DeletionHelper

defaultPlanner *planner.Planner
}

Expand Down Expand Up @@ -202,9 +207,72 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
return err
})

fdc.deletionHelper = deletionhelper.NewDeletionHelper(
fdc.hasFinalizerFunc,
fdc.removeFinalizerFunc,
fdc.addFinalizerFunc,
// objNameFunc
func(obj runtime.Object) string {
deployment := obj.(*extensionsv1.Deployment)
return deployment.Name
},
updateTimeout,
fdc.eventRecorder,
fdc.fedDeploymentInformer,
fdc.fedUpdater,
)

return fdc
}

// Returns true if the given object has the given finalizer in its ObjectMeta.
func (fdc *DeploymentController) hasFinalizerFunc(obj runtime.Object, finalizer string) bool {
deployment := obj.(*extensionsv1.Deployment)
for i := range deployment.ObjectMeta.Finalizers {
if string(deployment.ObjectMeta.Finalizers[i]) == finalizer {
return true
}
}
return false
}

// Removes the finalizer from the given objects ObjectMeta.
// Assumes that the given object is a deployment.
func (fdc *DeploymentController) removeFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) {
deployment := obj.(*extensionsv1.Deployment)
newFinalizers := []string{}
hasFinalizer := false
for i := range deployment.ObjectMeta.Finalizers {
if string(deployment.ObjectMeta.Finalizers[i]) != finalizer {
newFinalizers = append(newFinalizers, deployment.ObjectMeta.Finalizers[i])
} else {
hasFinalizer = true
}
}
if !hasFinalizer {
// Nothing to do.
return obj, nil
}
deployment.ObjectMeta.Finalizers = newFinalizers
deployment, err := fdc.fedClient.Extensions().Deployments(deployment.Namespace).Update(deployment)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizer %s from deployment %s: %v", finalizer, deployment.Name, err)
}
return deployment, nil
}

// Adds the given finalizer to the given objects ObjectMeta.
// Assumes that the given object is a deployment.
func (fdc *DeploymentController) addFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) {
deployment := obj.(*extensionsv1.Deployment)
deployment.ObjectMeta.Finalizers = append(deployment.ObjectMeta.Finalizers, finalizer)
deployment, err := fdc.fedClient.Extensions().Deployments(deployment.Namespace).Update(deployment)
if err != nil {
return nil, fmt.Errorf("failed to add finalizer %s to deployment %s: %v", finalizer, deployment.Name, err)
}
return deployment, nil
}

func (fdc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
go fdc.deploymentController.Run(stopCh)
fdc.fedDeploymentInformer.Start()
Expand Down Expand Up @@ -414,15 +482,43 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile deployment %q (%v)", key, time.Now().Sub(startTime))

obj, exists, err := fdc.deploymentStore.GetByKey(key)
objFromStore, exists, err := fdc.deploymentStore.GetByKey(key)
if err != nil {
return statusError, err
}
if !exists {
// don't delete local deployments for now. Do not reconcile it anymore.
return statusAllOk, nil
}
fd := obj.(*extensionsv1.Deployment)
obj, err := conversion.NewCloner().DeepCopy(objFromStore)
fd, ok := obj.(*extensionsv1.Deployment)
if err != nil || !ok {
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
return statusError, err
}

if fd.DeletionTimestamp != nil {
if err := fdc.delete(fd); err != nil {
glog.Errorf("Failed to delete %s: %v", fd.Name, err)
fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "DeleteFailed",
"Deployment delete failed: %v", err)
return statusError, err
}
return statusAllOk, nil
}

glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for deployment: %s",
fd.Name)
// Add the required finalizers before creating a deployment in underlying clusters.
updatedDeploymentObj, err := fdc.deletionHelper.EnsureFinalizers(fd)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in deployment %s: %v",
fd.Name, err)
return statusError, err
}
fd = updatedDeploymentObj.(*extensionsv1.Deployment)

glog.V(3).Infof("Syncing deployment %s in underlying clusters", fd.Name)

clusters, err := fdc.fedDeploymentInformer.GetReadyClusters()
if err != nil {
Expand Down Expand Up @@ -541,3 +637,23 @@ func (fdc *DeploymentController) reconcileDeploymentsOnClusterChange() {
fdc.deliverDeploymentByKey(key, 0, false)
}
}

// delete deletes the given deployment or returns error if the deletion was not complete.
func (fdc *DeploymentController) delete(deployment *extensionsv1.Deployment) error {
glog.V(3).Infof("Handling deletion of deployment: %v", *deployment)
_, err := fdc.deletionHelper.HandleObjectInUnderlyingClusters(deployment)
if err != nil {
return err
}

err = fdc.fedClient.Extensions().Deployments(deployment.Namespace).Delete(deployment.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of deployment finalizer deletion.
// The process that deleted the last finalizer is also going to delete the deployment and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete deployment: %v", err)
}
}
return nil
}
3 changes: 3 additions & 0 deletions federation/pkg/federation-controller/replicaset/BUILD
Expand Up @@ -19,16 +19,19 @@ go_library(
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_release_1_5:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/flowcontrol:go_default_library",
"//pkg/util/wait:go_default_library",
Expand Down
Expand Up @@ -29,16 +29,19 @@ import (
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
Expand Down Expand Up @@ -94,6 +97,8 @@ type ReplicaSetController struct {
// For events
eventRecorder record.EventRecorder

deletionHelper *deletionhelper.DeletionHelper

defaultPlanner *planner.Planner
}

Expand Down Expand Up @@ -205,9 +210,72 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
return err
})

frsc.deletionHelper = deletionhelper.NewDeletionHelper(
frsc.hasFinalizerFunc,
frsc.removeFinalizerFunc,
frsc.addFinalizerFunc,
// objNameFunc
func(obj runtime.Object) string {
replicaset := obj.(*extensionsv1.ReplicaSet)
return replicaset.Name
},
updateTimeout,
frsc.eventRecorder,
frsc.fedReplicaSetInformer,
frsc.fedUpdater,
)

return frsc
}

// Returns true if the given object has the given finalizer in its ObjectMeta.
func (frsc *ReplicaSetController) hasFinalizerFunc(obj runtime.Object, finalizer string) bool {
replicaset := obj.(*extensionsv1.ReplicaSet)
for i := range replicaset.ObjectMeta.Finalizers {
if string(replicaset.ObjectMeta.Finalizers[i]) == finalizer {
return true
}
}
return false
}

// Removes the finalizer from the given objects ObjectMeta.
// Assumes that the given object is a replicaset.
func (frsc *ReplicaSetController) removeFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
newFinalizers := []string{}
hasFinalizer := false
for i := range replicaset.ObjectMeta.Finalizers {
if string(replicaset.ObjectMeta.Finalizers[i]) != finalizer {
newFinalizers = append(newFinalizers, replicaset.ObjectMeta.Finalizers[i])
} else {
hasFinalizer = true
}
}
if !hasFinalizer {
// Nothing to do.
return obj, nil
}
replicaset.ObjectMeta.Finalizers = newFinalizers
replicaset, err := frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizer %s from replicaset %s: %v", finalizer, replicaset.Name, err)
}
return replicaset, nil
}

// Adds the given finalizer to the given objects ObjectMeta.
// Assumes that the given object is a replicaset.
func (frsc *ReplicaSetController) addFinalizerFunc(obj runtime.Object, finalizer string) (runtime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
replicaset.ObjectMeta.Finalizers = append(replicaset.ObjectMeta.Finalizers, finalizer)
replicaset, err := frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset)
if err != nil {
return nil, fmt.Errorf("failed to add finalizer %s to replicaset %s: %v", finalizer, replicaset.Name, err)
}
return replicaset, nil
}

func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
go frsc.replicaSetController.Run(stopCh)
frsc.fedReplicaSetInformer.Start()
Expand Down Expand Up @@ -413,15 +481,45 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime))

obj, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key)
objFromStore, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key)
if err != nil {
return statusError, err
}
if !exists {
// don't delete local replicasets for now. Do not reconcile it anymore.
return statusAllOk, nil
}
frs := obj.(*extensionsv1.ReplicaSet)
obj, err := conversion.NewCloner().DeepCopy(objFromStore)
frs, ok := obj.(*extensionsv1.ReplicaSet)
if err != nil || !ok {
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
frsc.deliverReplicaSetByKey(key, 0, true)
return statusError, err
}
if frs.DeletionTimestamp != nil {
if err := frsc.delete(frs); err != nil {
glog.Errorf("Failed to delete %s: %v", frs, err)
frsc.eventRecorder.Eventf(frs, api.EventTypeNormal, "DeleteFailed",
"ReplicaSet delete failed: %v", err)
frsc.deliverReplicaSetByKey(key, 0, true)
return statusError, err
}
return statusAllOk, nil
}

glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for replicaset: %s",
frs.Name)
// Add the required finalizers before creating a replicaset in underlying clusters.
updatedRsObj, err := frsc.deletionHelper.EnsureFinalizers(frs)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in replicaset %s: %v",
frs.Name, err)
frsc.deliverReplicaSetByKey(key, 0, false)
return statusError, err
}
frs = updatedRsObj.(*extensionsv1.ReplicaSet)

glog.V(3).Infof("Syncing replicaset %s in underlying clusters", frs.Name)

clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil {
Expand Down Expand Up @@ -535,3 +633,23 @@ func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
frsc.deliverReplicaSetByKey(key, 0, false)
}
}

// delete deletes the given replicaset or returns error if the deletion was not complete.
func (frsc *ReplicaSetController) delete(replicaset *extensionsv1.ReplicaSet) error {
glog.V(3).Infof("Handling deletion of replicaset: %v", *replicaset)
_, err := frsc.deletionHelper.HandleObjectInUnderlyingClusters(replicaset)
if err != nil {
return err
}

err = frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Delete(replicaset.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of replicaset finalizer deletion.
// The process that deleted the last finalizer is also going to delete the replicaset and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete replicaset: %v", err)
}
}
return nil
}