Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding cadcading deletion support for federated secrets #36296

Merged
merged 2 commits into from
Nov 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions federation/pkg/federation-controller/secret/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ go_library(
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_release_1_5:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/flowcontrol:go_default_library",
Expand All @@ -41,6 +44,8 @@ go_test(
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_release_1_5/fake:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
Expand Down
127 changes: 124 additions & 3 deletions federation/pkg/federation-controller/secret/secret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ limitations under the License.
package secret

import (
"fmt"
"time"

federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/conversion"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
Expand Down Expand Up @@ -69,6 +73,8 @@ type SecretController struct {
// For events
eventRecorder record.EventRecorder

deletionHelper *deletionhelper.DeletionHelper

secretReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
Expand Down Expand Up @@ -162,9 +168,73 @@ func NewSecretController(client federationclientset.Interface) *SecretController
err := client.Core().Secrets(secret.Namespace).Delete(secret.Name, &api_v1.DeleteOptions{})
return err
})

secretcontroller.deletionHelper = deletionhelper.NewDeletionHelper(
secretcontroller.hasFinalizerFunc,
secretcontroller.removeFinalizerFunc,
secretcontroller.addFinalizerFunc,
// objNameFunc
func(obj pkg_runtime.Object) string {
secret := obj.(*api_v1.Secret)
return secret.Name
},
secretcontroller.updateTimeout,
secretcontroller.eventRecorder,
secretcontroller.secretFederatedInformer,
secretcontroller.federatedUpdater,
)

return secretcontroller
}

// Returns true if the given object has the given finalizer in its ObjectMeta.
func (secretcontroller *SecretController) hasFinalizerFunc(obj pkg_runtime.Object, finalizer string) bool {
secret := obj.(*api_v1.Secret)
for i := range secret.ObjectMeta.Finalizers {
if string(secret.ObjectMeta.Finalizers[i]) == finalizer {
return true
}
}
return false
}

// Removes the finalizer from the given objects ObjectMeta.
// Assumes that the given object is a secret.
func (secretcontroller *SecretController) removeFinalizerFunc(obj pkg_runtime.Object, finalizer string) (pkg_runtime.Object, error) {
secret := obj.(*api_v1.Secret)
newFinalizers := []string{}
hasFinalizer := false
for i := range secret.ObjectMeta.Finalizers {
if string(secret.ObjectMeta.Finalizers[i]) != finalizer {
newFinalizers = append(newFinalizers, secret.ObjectMeta.Finalizers[i])
} else {
hasFinalizer = true
}
}
if !hasFinalizer {
// Nothing to do.
return obj, nil
}
secret.ObjectMeta.Finalizers = newFinalizers
secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizer %s from secret %s: %v", finalizer, secret.Name, err)
}
return secret, nil
}

// Adds the given finalizer to the given objects ObjectMeta.
// Assumes that the given object is a secret.
func (secretcontroller *SecretController) addFinalizerFunc(obj pkg_runtime.Object, finalizer string) (pkg_runtime.Object, error) {
secret := obj.(*api_v1.Secret)
secret.ObjectMeta.Finalizers = append(secret.ObjectMeta.Finalizers, finalizer)
secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret)
if err != nil {
return nil, fmt.Errorf("failed to add finalizer %s to secret %s: %v", finalizer, secret.Name, err)
}
return secret, nil
}

func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) {
go secretcontroller.secretInformerController.Run(stopChan)
secretcontroller.secretFederatedInformer.Start()
Expand Down Expand Up @@ -229,14 +299,13 @@ func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() {
}

func (secretcontroller *SecretController) reconcileSecret(secret types.NamespacedName) {

if !secretcontroller.isSynced() {
secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false)
return
}

key := secret.String()
baseSecretObj, exist, err := secretcontroller.secretInformerStore.GetByKey(key)
baseSecretObjFromStore, exist, err := secretcontroller.secretInformerStore.GetByKey(key)
if err != nil {
glog.Errorf("Failed to query main secret store for %v: %v", key, err)
secretcontroller.deliverSecret(secret, 0, true)
Expand All @@ -247,7 +316,39 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace
// Not federated secret, ignoring.
return
}
baseSecret := baseSecretObj.(*api_v1.Secret)

// Create a copy before modifying the obj to prevent race condition with
// other readers of obj from store.
baseSecretObj, err := conversion.NewCloner().DeepCopy(baseSecretObjFromStore)
baseSecret, ok := baseSecretObj.(*api_v1.Secret)
if err != nil || !ok {
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
secretcontroller.deliverSecret(secret, 0, true)
return
}
if baseSecret.DeletionTimestamp != nil {
if err := secretcontroller.delete(baseSecret); err != nil {
glog.Errorf("Failed to delete %s: %v", secret, err)
secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "DeleteFailed",
"Secret delete failed: %v", err)
secretcontroller.deliverSecret(secret, 0, true)
}
return
}

glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for secret: %s",
baseSecret.Name)
// Add the required finalizers before creating a secret in underlying clusters.
updatedSecretObj, err := secretcontroller.deletionHelper.EnsureFinalizers(baseSecret)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in secret %s: %v",
baseSecret.Name, err)
secretcontroller.deliverSecret(secret, 0, false)
return
}
baseSecret = updatedSecretObj.(*api_v1.Secret)

glog.V(3).Infof("Syncing secret %s in underlying clusters", baseSecret.Name)

clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters()
if err != nil {
Expand Down Expand Up @@ -316,3 +417,23 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace
// Evertyhing is in order but lets be double sure
secretcontroller.deliverSecret(secret, secretcontroller.secretReviewDelay, false)
}

// delete deletes the given secret or returns error if the deletion was not complete.
func (secretcontroller *SecretController) delete(secret *api_v1.Secret) error {
glog.V(3).Infof("Handling deletion of secret: %v", *secret)
_, err := secretcontroller.deletionHelper.HandleObjectInUnderlyingClusters(secret)
if err != nil {
return err
}

err = secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Delete(secret.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of secret finalizer deletion.
// The process that deleted the last finalizer is also going to delete the secret and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete secret: %v", err)
}
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fake_fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
Expand All @@ -43,6 +45,7 @@ func TestSecretController(t *testing.T) {
RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}})
RegisterFakeList("secrets", &fakeClient.Fake, &api_v1.SecretList{Items: []api_v1.Secret{}})
secretWatch := RegisterFakeWatch("secrets", &fakeClient.Fake)
secretUpdateChan := RegisterFakeCopyOnUpdate("secrets", &fakeClient.Fake, secretWatch)
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)

cluster1Client := &fake_kubeclientset.Clientset{}
Expand All @@ -57,8 +60,7 @@ func TestSecretController(t *testing.T) {
cluster2CreateChan := RegisterFakeCopyOnCreate("secrets", &cluster2Client.Fake, cluster2Watch)

secretController := NewSecretController(fakeClient)
informer := ToFederatedInformerForTestOnly(secretController.secretFederatedInformer)
informer.SetClientFactory(func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
informerClientFactory := func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
Expand All @@ -67,7 +69,8 @@ func TestSecretController(t *testing.T) {
default:
return nil, fmt.Errorf("Unknown cluster")
}
})
}
setClientFactory(secretController.secretFederatedInformer, informerClientFactory)

secretController.clusterAvailableDelay = time.Second
secretController.secretReviewDelay = 50 * time.Millisecond
Expand All @@ -92,11 +95,20 @@ func TestSecretController(t *testing.T) {

// Test add federated secret.
secretWatch.Add(&secret1)
// There should be 2 updates to add both the finalizers.
updatedSecret := GetSecretFromChan(secretUpdateChan)
assert.True(t, secretController.hasFinalizerFunc(updatedSecret, deletionhelper.FinalizerDeleteFromUnderlyingClusters))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for both finalizers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

updatedSecret = GetSecretFromChan(secretUpdateChan)
assert.True(t, secretController.hasFinalizerFunc(updatedSecret, api_v1.FinalizerOrphan))
secret1 = *updatedSecret

// Verify that the secret is created in underlying cluster1.
createdSecret := GetSecretFromChan(cluster1CreateChan)
assert.NotNil(t, createdSecret)
assert.Equal(t, secret1.Namespace, createdSecret.Namespace)
assert.Equal(t, secret1.Name, createdSecret.Name)
assert.True(t, secretsEqual(secret1, *createdSecret))
assert.True(t, secretsEqual(secret1, *createdSecret),
fmt.Sprintf("expected: %v, actual: %v", secret1, *createdSecret))

// Wait for the secret to appear in the informer store
err := WaitForStoreUpdate(
Expand All @@ -109,11 +121,12 @@ func TestSecretController(t *testing.T) {
"A": "B",
}
secretWatch.Modify(&secret1)
updatedSecret := GetSecretFromChan(cluster1UpdateChan)
updatedSecret = GetSecretFromChan(cluster1UpdateChan)
assert.NotNil(t, updatedSecret)
assert.Equal(t, secret1.Name, updatedSecret.Name)
assert.Equal(t, secret1.Namespace, updatedSecret.Namespace)
assert.True(t, secretsEqual(secret1, *updatedSecret))
assert.True(t, secretsEqual(secret1, *updatedSecret),
fmt.Sprintf("expected: %v, actual: %v", secret1, *updatedSecret))

// Test update federated secret.
secret1.Data = map[string][]byte{
Expand All @@ -124,22 +137,33 @@ func TestSecretController(t *testing.T) {
assert.NotNil(t, updatedSecret)
assert.Equal(t, secret1.Name, updatedSecret.Name)
assert.Equal(t, secret1.Namespace, updatedSecret.Namespace)
assert.True(t, secretsEqual(secret1, *updatedSecret2))
assert.True(t, secretsEqual(secret1, *updatedSecret2),
fmt.Sprintf("expected: %v, actual: %v", secret1, *updatedSecret2))

// Test add cluster
clusterWatch.Add(cluster2)
createdSecret2 := GetSecretFromChan(cluster2CreateChan)
assert.NotNil(t, createdSecret2)
assert.Equal(t, secret1.Name, createdSecret2.Name)
assert.Equal(t, secret1.Namespace, createdSecret2.Namespace)
assert.True(t, secretsEqual(secret1, *createdSecret2))
assert.True(t, secretsEqual(secret1, *createdSecret2),
fmt.Sprintf("expected: %v, actual: %v", secret1, *createdSecret2))

close(stop)
}

func setClientFactory(informer util.FederatedInformer, informerClientFactory func(*federation_api.Cluster) (kubeclientset.Interface, error)) {
testInformer := ToFederatedInformerForTestOnly(informer)
testInformer.SetClientFactory(informerClientFactory)
}

func secretsEqual(a, b api_v1.Secret) bool {
// Clear the SelfLink and ObjectMeta.Finalizers since they will be different
// in resoure in federation control plane and resource in underlying cluster.
a.SelfLink = ""
b.SelfLink = ""
a.ObjectMeta.Finalizers = []string{}
b.ObjectMeta.Finalizers = []string{}
return reflect.DeepEqual(a, b)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
}
hasOrphanFinalizer := dh.hasFinalizerFunc(obj, api_v1.FinalizerOrphan)
if hasOrphanFinalizer {
glog.V(3).Infof("Found finalizer orphan. Nothing to do, just remove the finalizer")
glog.V(2).Infof("Found finalizer orphan. Nothing to do, just remove the finalizer")
// If the obj has FinalizerOrphan finalizer, then we need to orphan the
// corresponding objects in underlying clusters.
// Just remove both the finalizers in that case.
Expand All @@ -133,6 +133,7 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
return dh.removeFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters)
}

glog.V(2).Infof("Deleting obj %s from underlying clusters", objName)
// Else, we need to delete the obj from all underlying clusters.
unreadyClusters, err := dh.informer.GetUnreadyClusters()
if err != nil {
Expand All @@ -141,7 +142,9 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
// TODO: Handle the case when cluster resource is watched after this is executed.
// This can happen if a namespace is deleted before its creation had been
// observed in all underlying clusters.
clusterNsObjs, err := dh.informer.GetTargetStore().GetFromAllClusters(objName)
storeKey := dh.informer.GetTargetStore().GetKeyFor(obj)
clusterNsObjs, err := dh.informer.GetTargetStore().GetFromAllClusters(storeKey)
glog.V(3).Infof("Found %d objects in underlying clusters", len(clusterNsObjs))
if err != nil {
return nil, fmt.Errorf("failed to get object %s from underlying clusters: %v", objName, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type FederatedReadOnlyStore interface {
// Returns all items from a cluster.
ListFromCluster(clusterName string) ([]interface{}, error)

// GetKeyFor returns the key under which the item would be put in the store.
GetKeyFor(item interface{}) string

// GetByKey returns the item stored under the given key in the specified cluster (if exist).
GetByKey(clusterName string, key string) (interface{}, bool, error)

Expand Down
4 changes: 2 additions & 2 deletions federation/pkg/federation-controller/util/test/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
}

// GetObjectFromChan tries to get an api object from the given channel
// within a reasonable time (1 min).
// within a reasonable time.
func GetObjectFromChan(c chan runtime.Object) runtime.Object {
select {
case obj := <-c:
return obj
case <-time.After(20 * time.Second):
case <-time.After(wait.ForeverTestTimeout):
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
return nil
}
Expand Down