Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding cascading deletion support to federated namespaces #34648

Merged
merged 1 commit into from
Oct 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion federation/pkg/federation-controller/namespace/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_release_1_5:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
Expand All @@ -28,7 +29,6 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/flowcontrol:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
],
Expand All @@ -42,6 +42,8 @@ go_test(
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_release_1_5/fake:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/api/v1:go_default_library",
Expand Down
216 changes: 168 additions & 48 deletions federation/pkg/federation-controller/namespace/namespace_controller.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fake_fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"k8s.io/kubernetes/pkg/api/unversioned"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
Expand All @@ -44,12 +46,16 @@ func TestNamespaceController(t *testing.T) {
Name: "test-namespace",
SelfLink: "/api/v1/namespaces/test-namespace",
},
Spec: api_v1.NamespaceSpec{
Finalizers: []api_v1.FinalizerName{api_v1.FinalizerKubernetes},
},
}

fakeClient := &fake_fedclientset.Clientset{}
RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}})
RegisterFakeList("namespaces", &fakeClient.Fake, &api_v1.NamespaceList{Items: []api_v1.Namespace{}})
namespaceWatch := RegisterFakeWatch("namespaces", &fakeClient.Fake)
namespaceCreateChan := RegisterFakeCopyOnCreate("namespaces", &fakeClient.Fake, namespaceWatch)
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)

cluster1Client := &fake_kubeclientset.Clientset{}
Expand Down Expand Up @@ -87,8 +93,7 @@ func TestNamespaceController(t *testing.T) {
secretDeleteChan := RegisterDeleteCollection(&fakeClient.Fake, "secrets")

namespaceController := NewNamespaceController(fakeClient)
informer := ToFederatedInformerForTestOnly(namespaceController.namespaceFederatedInformer)
informer.SetClientFactory(func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
informerClientFactory := func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
Expand All @@ -97,7 +102,8 @@ func TestNamespaceController(t *testing.T) {
default:
return nil, fmt.Errorf("Unknown cluster")
}
})
}
setClientFactory(namespaceController.namespaceFederatedInformer, informerClientFactory)
namespaceController.clusterAvailableDelay = time.Second
namespaceController.namespaceReviewDelay = 50 * time.Millisecond
namespaceController.smallDelay = 20 * time.Millisecond
Expand All @@ -108,11 +114,19 @@ func TestNamespaceController(t *testing.T) {

// Test add federated namespace.
namespaceWatch.Add(&ns1)
// Verify that the DeleteFromUnderlyingClusters finalizer is added to the namespace.
// Note: finalize invokes the create action in Fake client.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does sound like a bug? Could you open an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing in #35478

// TODO: Seems like a bug. Should invoke update. Fix it.
updatedNamespace := GetNamespaceFromChan(namespaceCreateChan)
assert.True(t, namespaceController.hasFinalizerFunc(updatedNamespace, deletionhelper.FinalizerDeleteFromUnderlyingClusters))
ns1 = *updatedNamespace

// Verify that the namespace is created in underlying cluster1.
createdNamespace := GetNamespaceFromChan(cluster1CreateChan)
assert.NotNil(t, createdNamespace)
assert.Equal(t, ns1.Name, createdNamespace.Name)

// Wait for the secret to appear in the informer store
// Wait for the namespace to appear in the informer store
err := WaitForStoreUpdate(
namespaceController.namespaceFederatedInformer.GetTargetStore(),
cluster1.Name, ns1.Name, wait.ForeverTestTimeout)
Expand All @@ -123,7 +137,7 @@ func TestNamespaceController(t *testing.T) {
"A": "B",
}
namespaceWatch.Modify(&ns1)
updatedNamespace := GetNamespaceFromChan(cluster1UpdateChan)
updatedNamespace = GetNamespaceFromChan(cluster1UpdateChan)
assert.NotNil(t, updatedNamespace)
assert.Equal(t, ns1.Name, updatedNamespace.Name)
// assert.Contains(t, updatedNamespace.Annotations, "A")
Expand All @@ -135,6 +149,10 @@ func TestNamespaceController(t *testing.T) {
assert.Equal(t, ns1.Name, createdNamespace2.Name)
// assert.Contains(t, createdNamespace2.Annotations, "A")

// Delete the namespace with orphan finalizer (let namespaces
// in underlying clusters be as is).
// TODO: Add a test without orphan finalizer.
ns1.ObjectMeta.Finalizers = append(ns1.ObjectMeta.Finalizers, api_v1.FinalizerOrphan)
ns1.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
namespaceWatch.Modify(&ns1)
assert.Equal(t, ns1.Name, GetStringFromChan(nsDeleteChan))
Expand All @@ -145,6 +163,11 @@ func TestNamespaceController(t *testing.T) {
close(stop)
}

func setClientFactory(informer util.FederatedInformer, informerClientFactory func(*federation_api.Cluster) (kubeclientset.Interface, error)) {
testInformer := ToFederatedInformerForTestOnly(informer)
testInformer.SetClientFactory(informerClientFactory)
}

func RegisterDeleteCollection(client *core.Fake, resource string) chan string {
deleteChan := make(chan string, 100)
client.AddReactor("delete-collection", resource, func(action core.Action) (bool, runtime.Object, error) {
Expand All @@ -169,7 +192,7 @@ func GetStringFromChan(c chan string) string {
case str := <-c:
return str
case <-time.After(5 * time.Second):
return ""
return "timedout"
}
}

Expand Down
25 changes: 25 additions & 0 deletions federation/pkg/federation-controller/util/deletionhelper/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package(default_visibility = ["//visibility:public"])

licenses(["notice"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)

go_library(
name = "go_default_library",
srcs = ["deletion_helper.go"],
tags = ["automanaged"],
deps = [
"//federation/pkg/federation-controller/util:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/runtime:go_default_library",
"//vendor:github.com/golang/glog",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package to help federation controllers to delete federated resources from
// underlying clusters when the resource is deleted from federation control
// plane.
package deletionhelper

import (
"fmt"
"strings"
"time"

"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/runtime"

"github.com/golang/glog"
)

const (
// Add this finalizer to a federation resource if the resource should be
// deleted from all underlying clusters before being deleted from
// federation control plane.
// This is ignored if FinalizerOrphan is also present on the resource.
// In that case, both finalizers are removed from the resource and the
// resource is deleted from federation control plane without affecting
// the underlying clusters.
FinalizerDeleteFromUnderlyingClusters string = "federation.kubernetes.io/delete-from-underlying-clusters"
)

type HasFinalizerFunc func(runtime.Object, string) bool
type RemoveFinalizerFunc func(runtime.Object, string) (runtime.Object, error)
type AddFinalizerFunc func(runtime.Object, string) (runtime.Object, error)
type ObjNameFunc func(runtime.Object) string

type DeletionHelper struct {
hasFinalizerFunc HasFinalizerFunc
removeFinalizerFunc RemoveFinalizerFunc
addFinalizerFunc AddFinalizerFunc
objNameFunc ObjNameFunc
updateTimeout time.Duration
eventRecorder record.EventRecorder
informer util.FederatedInformer
updater util.FederatedUpdater
}

func NewDeletionHelper(
hasFinalizerFunc HasFinalizerFunc, removeFinalizerFunc RemoveFinalizerFunc,
addFinalizerFunc AddFinalizerFunc, objNameFunc ObjNameFunc,
updateTimeout time.Duration, eventRecorder record.EventRecorder,
informer util.FederatedInformer,
updater util.FederatedUpdater) *DeletionHelper {
return &DeletionHelper{
hasFinalizerFunc: hasFinalizerFunc,
removeFinalizerFunc: removeFinalizerFunc,
addFinalizerFunc: addFinalizerFunc,
objNameFunc: objNameFunc,
updateTimeout: updateTimeout,
eventRecorder: eventRecorder,
informer: informer,
updater: updater,
}
}

// Ensures that the given object has the required finalizer to ensure that
// objects are deleted in underlying clusters when this object is deleted
// from federation control plane.
// This method should be called before creating objects in underlying clusters.
func (dh *DeletionHelper) EnsureDeleteFromUnderlyingClustersFinalizer(obj runtime.Object) (
runtime.Object, error) {
if dh.hasFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) {
return obj, nil
}
return dh.addFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters)
}

// Deletes the resources corresponding to the given federated resource from
// all underlying clusters, unless it has the FinalizerOrphan finalizer.
// Removes FinalizerOrphan and FinalizerDeleteFromUnderlyingClusters finalizers
// when done.
// Callers are expected to keep calling this (with appropriate backoff) until
// it succeeds.
func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
runtime.Object, error) {
objName := dh.objNameFunc(obj)
glog.V(2).Infof("Handling deletion of federated dependents for object: %s", objName)
if !dh.hasFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters) {
glog.V(2).Infof("obj does not have %s finalizer. Nothing to do", FinalizerDeleteFromUnderlyingClusters)
return obj, nil
}
hasOrphanFinalizer := dh.hasFinalizerFunc(obj, api_v1.FinalizerOrphan)
if hasOrphanFinalizer {
glog.V(3).Infof("Found finalizer orphan. Nothing to do, just remove the finalizer")
// If the obj has FinalizerOrphan finalizer, then we need to orphan the
// corresponding objects in underlying clusters.
// Just remove both the finalizers in that case.
obj, err := dh.removeFinalizerFunc(obj, api_v1.FinalizerOrphan)
if err != nil {
return obj, err
}
return dh.removeFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters)
}

// Else, we need to delete the obj from all underlying clusters.
unreadyClusters, err := dh.informer.GetUnreadyClusters()
if err != nil {
return nil, fmt.Errorf("failed to get a list of unready clusters: %v", err)
}
// TODO: Handle the case when cluster resource is watched after this is executed.
// This can happen if a namespace is deleted before its creation had been
// observed in all underlying clusters.
clusterNsObjs, err := dh.informer.GetTargetStore().GetFromAllClusters(objName)
if err != nil {
return nil, fmt.Errorf("failed to get object %s from underlying clusters: %v", objName, err)
}
operations := make([]util.FederatedOperation, 0)
for _, clusterNsObj := range clusterNsObjs {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeDelete,
ClusterName: clusterNsObj.ClusterName,
Obj: clusterNsObj.Object.(runtime.Object),
})
}
err = dh.updater.UpdateWithOnError(operations, dh.updateTimeout, func(op util.FederatedOperation, operror error) {
objName := dh.objNameFunc(op.Obj)
dh.eventRecorder.Eventf(obj, api.EventTypeNormal, "DeleteInClusterFailed",
"Failed to delete obj %s in cluster %s: %v", objName, op.ClusterName, operror)
})
if err != nil {
return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err)
}
if len(operations) > 0 {
// We have deleted a bunch of resources.
// Wait for the store to observe all the deletions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the "wait" is implemented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait is implemented by not returning success here.
Since len(operations) > 0 we return here without removing the finalizer. Next time this function is called, we will again first compute if we want to do any operations in underlying clusters. If at any time, len(operation) = 0, then we proceed. Else we wait till len(operations) == 0.
Caller is expected to keep calling HandleObjectInUnderlyingClusters until it returns success (err == nil).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I didn't notice you called deliverNamespace to process the namespace again in the future. So it looks good.

var clusterNames []string
for _, op := range operations {
clusterNames = append(clusterNames, op.ClusterName)
}
return nil, fmt.Errorf("waiting for object %s to be deleted from clusters: %s", objName, strings.Join(clusterNames, ", "))
}

// We have now deleted the object from all *ready* clusters.
// But still need to wait for clusters that are not ready to ensure that
// the object has been deleted from *all* clusters.
if len(unreadyClusters) != 0 {
var clusterNames []string
for _, cluster := range unreadyClusters {
clusterNames = append(clusterNames, cluster.Name)
}
return nil, fmt.Errorf("waiting for clusters %s to become ready to verify that obj %s has been deleted", strings.Join(clusterNames, ", "), objName)
}

// All done. Just remove the finalizer.
return dh.removeFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters)
}
24 changes: 24 additions & 0 deletions federation/pkg/federation-controller/util/federated_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type FederationView interface {
// GetClientsetForCluster returns a clientset for the cluster, if present.
GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error)

// GetUnreadyClusters returns a list of all clusters that are not ready yet.
GetUnreadyClusters() ([]*federation_api.Cluster, error)

// GetReadyClusers returns all clusters for which the sub-informers are run.
GetReadyClusters() ([]*federation_api.Cluster, error)

Expand Down Expand Up @@ -260,6 +263,9 @@ type federatedInformerImpl struct {
clientFactory func(*federation_api.Cluster) (kubeclientset.Interface, error)
}

// *federatedInformerImpl implements FederatedInformer interface.
var _ FederatedInformer = &federatedInformerImpl{}

type federatedStoreImpl struct {
federatedInformer *federatedInformerImpl
}
Expand Down Expand Up @@ -313,6 +319,24 @@ func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName strin
return nil, fmt.Errorf("cluster %q not found", clusterName)
}

func (f *federatedInformerImpl) GetUnreadyClusters() ([]*federation_api.Cluster, error) {
f.Lock()
defer f.Unlock()

items := f.clusterInformer.store.List()
result := make([]*federation_api.Cluster, 0, len(items))
for _, item := range items {
if cluster, ok := item.(*federation_api.Cluster); ok {
if !isClusterReady(cluster) {
result = append(result, cluster)
}
} else {
return nil, fmt.Errorf("wrong data in FederatedInformerImpl cluster store: %v", item)
}
}
return result, nil
}

// GetReadyClusers returns all clusters for which the sub-informers are run.
func (f *federatedInformerImpl) GetReadyClusters() ([]*federation_api.Cluster, error) {
f.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@ import (
type fakeFederationView struct {
}

func (f fakeFederationView) GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error) {
// Verify that fakeFederationView implements FederationView interface
var _ FederationView = &fakeFederationView{}

func (f *fakeFederationView) GetClientsetForCluster(clusterName string) (kubeclientset.Interface, error) {
return &fake_kubeclientset.Clientset{}, nil
}

func (f *fakeFederationView) GetReadyClusters() ([]*federation_api.Cluster, error) {
return []*federation_api.Cluster{}, nil
}

func (f *fakeFederationView) GetUnreadyClusters() ([]*federation_api.Cluster, error) {
return []*federation_api.Cluster{}, nil
}

func (f *fakeFederationView) GetReadyCluster(name string) (*federation_api.Cluster, bool, error) {
return nil, false, nil
}
Expand Down