Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch federated secret controller to use NamespacedName #36019

Merged
merged 2 commits into from
Nov 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions federation/pkg/federation-controller/secret/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/client/record:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/flowcontrol:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
Expand All @@ -45,6 +46,7 @@ go_test(
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/clientset_generated/release_1_5/fake:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
],
Expand Down
43 changes: 16 additions & 27 deletions federation/pkg/federation-controller/secret/secret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package secret

import (
"fmt"
"time"

federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
Expand All @@ -31,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/watch"

Expand Down Expand Up @@ -173,41 +173,30 @@ func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) {
secretcontroller.secretFederatedInformer.Stop()
}()
secretcontroller.secretDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
secret := item.Value.(*secretItem)
secretcontroller.reconcileSecret(secret.namespace, secret.name)
secret := item.Value.(*types.NamespacedName)
secretcontroller.reconcileSecret(*secret)
})
secretcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
secretcontroller.reconcileSecretsOnClusterChange()
})
util.StartBackoffGC(secretcontroller.secretBackoff, stopChan)
}

func getSecretKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

// Internal structure for data in delaying deliverer.
type secretItem struct {
namespace string
name string
}

func (secretcontroller *SecretController) deliverSecretObj(obj interface{}, delay time.Duration, failed bool) {
secret := obj.(*api_v1.Secret)
secretcontroller.deliverSecret(secret.Namespace, secret.Name, delay, failed)
secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, delay, failed)
}

// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (secretcontroller *SecretController) deliverSecret(namespace string, name string, delay time.Duration, failed bool) {
key := getSecretKey(namespace, name)
func (secretcontroller *SecretController) deliverSecret(secret types.NamespacedName, delay time.Duration, failed bool) {
key := secret.String()
if failed {
secretcontroller.secretBackoff.Next(key, time.Now())
delay = delay + secretcontroller.secretBackoff.Get(key)
} else {
secretcontroller.secretBackoff.Reset(key)
}
secretcontroller.secretDeliverer.DeliverAfter(key,
&secretItem{namespace: namespace, name: name}, delay)
secretcontroller.secretDeliverer.DeliverAfter(key, &secret, delay)
}

// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
Expand Down Expand Up @@ -235,22 +224,22 @@ func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() {
}
for _, obj := range secretcontroller.secretInformerStore.List() {
secret := obj.(*api_v1.Secret)
secretcontroller.deliverSecret(secret.Namespace, secret.Name, secretcontroller.smallDelay, false)
secretcontroller.deliverSecret(types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}, secretcontroller.smallDelay, false)
}
}

func (secretcontroller *SecretController) reconcileSecret(namespace string, secretName string) {
func (secretcontroller *SecretController) reconcileSecret(secret types.NamespacedName) {

if !secretcontroller.isSynced() {
secretcontroller.deliverSecret(namespace, secretName, secretcontroller.clusterAvailableDelay, false)
secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false)
return
}

key := getSecretKey(namespace, secretName)
key := secret.String()
baseSecretObj, exist, err := secretcontroller.secretInformerStore.GetByKey(key)
if err != nil {
glog.Errorf("Failed to query main secret store for %v: %v", key, err)
secretcontroller.deliverSecret(namespace, secretName, 0, true)
secretcontroller.deliverSecret(secret, 0, true)
return
}

Expand All @@ -263,7 +252,7 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr
clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
secretcontroller.deliverSecret(namespace, secretName, secretcontroller.clusterAvailableDelay, false)
secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false)
return
}

Expand All @@ -272,7 +261,7 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr
clusterSecretObj, found, err := secretcontroller.secretFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err)
secretcontroller.deliverSecret(namespace, secretName, 0, true)
secretcontroller.deliverSecret(secret, 0, true)
return
}

Expand Down Expand Up @@ -320,10 +309,10 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr

if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
secretcontroller.deliverSecret(namespace, secretName, 0, true)
secretcontroller.deliverSecret(secret, 0, true)
return
}

// Evertyhing is in order but lets be double sure
secretcontroller.deliverSecret(namespace, secretName, secretcontroller.secretReviewDelay, false)
secretcontroller.deliverSecret(secret, secretcontroller.secretReviewDelay, false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
fake_kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -100,7 +101,7 @@ func TestSecretController(t *testing.T) {
// Wait for the secret to appear in the informer store
err := WaitForStoreUpdate(
secretController.secretFederatedInformer.GetTargetStore(),
cluster1.Name, getSecretKey(secret1.Namespace, secret1.Name), wait.ForeverTestTimeout)
cluster1.Name, types.NamespacedName{Namespace: secret1.Namespace, Name: secret1.Name}.String(), wait.ForeverTestTimeout)
assert.Nil(t, err, "secret should have appeared in the informer store")

// Test update federated secret.
Expand Down