Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Federation] Add integration test for secrets #42025

Merged
merged 4 commits into from
Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions federation/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ filegroup(
"//federation/pkg/dnsprovider:all-srcs",
"//federation/pkg/federation-controller:all-srcs",
"//federation/pkg/kubefed:all-srcs",
"//federation/pkg/typeadapters:all-srcs",
"//federation/registry/cluster:all-srcs",
],
tags = ["automanaged"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,16 @@ func Run(s *options.CMServer) error {
}

func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
glog.Infof("Loading client config for cluster controller %q", "cluster-controller")
ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
glog.Infof("Running cluster controller")
stopChan := wait.NeverStop
minimizeLatency := false

discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restClientCfg)
serverResources, err := discoveryClient.ServerResources()
if err != nil {
glog.Fatalf("Could not find resources from API Server: %v", err)
}

go clustercontroller.NewclusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run()
clustercontroller.StartClusterController(restClientCfg, stopChan, s.ClusterMonitorPeriod.Duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice refactor, but I wonder if it would be better to put it in another PR: it makes this PR a little larger than necessary, and occludes the main purpose I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be enough to put it in a separate commit? PRs have a lot of overhead and this change is trivial enough that I'm not sure it's worth breaking out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that SGTM.


if controllerEnabled(s.Controllers, serverResources, servicecontroller.ControllerName, servicecontroller.RequiredResources, true) {
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
Expand All @@ -191,9 +190,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
}

if controllerEnabled(s.Controllers, serverResources, secretcontroller.ControllerName, secretcontroller.RequiredResources, true) {
secretcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "secret-controller"))
secretcontroller := secretcontroller.NewSecretController(secretcontrollerClientset)
secretcontroller.Run(wait.NeverStop)
secretcontroller.StartSecretController(restClientCfg, stopChan, minimizeLatency)
}

if controllerEnabled(s.Controllers, serverResources, configmapcontroller.ControllerName, configmapcontroller.RequiredResources, true) {
Expand Down
20 changes: 15 additions & 5 deletions federation/pkg/federation-controller/cluster/clustercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
federationv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
clustercache "k8s.io/kubernetes/federation/client/cache"
Expand All @@ -53,8 +54,17 @@ type ClusterController struct {
clusterStore clustercache.StoreToClusterLister
}

// NewclusterController returns a new cluster controller
func NewclusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController {
// StartClusterController starts a new cluster controller
func StartClusterController(config *restclient.Config, stopChan <-chan struct{}, clusterMonitorPeriod time.Duration) {
restclient.AddUserAgent(config, "cluster-controller")
client := federationclientset.NewForConfigOrDie(config)
controller := newClusterController(client, clusterMonitorPeriod)
glog.Infof("Starting cluster controller")
controller.Run(stopChan)
}

// newClusterController returns a new cluster controller
func newClusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController {
cc := &ClusterController{
knownClusterSet: make(sets.String),
federationClient: federationClient,
Expand Down Expand Up @@ -112,15 +122,15 @@ func (cc *ClusterController) addToClusterSet(obj interface{}) {
}

// Run begins watching and syncing.
func (cc *ClusterController) Run() {
func (cc *ClusterController) Run(stopChan <-chan struct{}) {
defer utilruntime.HandleCrash()
go cc.clusterController.Run(wait.NeverStop)
go cc.clusterController.Run(stopChan)
// monitor cluster status periodically, in phase 1 we just get the health state from "/healthz"
go wait.Until(func() {
if err := cc.UpdateClusterStatus(); err != nil {
glog.Errorf("Error monitoring cluster status: %v", err)
}
}, cc.clusterMonitorPeriod, wait.NeverStop)
}, cc.clusterMonitorPeriod, stopChan)
}

func (cc *ClusterController) GetClusterClient(cluster *federationv1beta1.Cluster) (*ClusterClient, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestUpdateClusterStatusOK(t *testing.T) {
}
}

manager := NewclusterController(federationClientSet, 5)
manager := newClusterController(federationClientSet, 5)
err = manager.UpdateClusterStatus()
if err != nil {
t.Errorf("Failed to Update Cluster Status: %v", err)
Expand Down
1 change: 1 addition & 0 deletions federation/pkg/federation-controller/secret/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/flowcontrol",
Expand Down
25 changes: 23 additions & 2 deletions federation/pkg/federation-controller/secret/secret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/pkg/api/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
Expand Down Expand Up @@ -88,8 +89,20 @@ type SecretController struct {
updateTimeout time.Duration
}

// NewSecretController returns a new secret controller
func NewSecretController(client federationclientset.Interface) *SecretController {
// StartSecretController starts a new secret controller
func StartSecretController(config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) {
restclient.AddUserAgent(config, "secret-controller")
client := federationclientset.NewForConfigOrDie(config)
controller := newSecretController(client)
if minimizeLatency {
controller.minimizeLatency()
}
glog.Infof("Starting Secret controller")
controller.Run(stopChan)
}

// newSecretController returns a new secret controller
func newSecretController(client federationclientset.Interface) *SecretController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client))
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-secrets-controller"})
Expand Down Expand Up @@ -190,6 +203,14 @@ func NewSecretController(client federationclientset.Interface) *SecretController
return secretcontroller
}

// minimizeLatency reduces delays and timeouts to make the controller more responsive (useful for testing).
func (secretcontroller *SecretController) minimizeLatency() {
secretcontroller.clusterAvailableDelay = time.Second
secretcontroller.secretReviewDelay = 50 * time.Millisecond
secretcontroller.smallDelay = 20 * time.Millisecond
secretcontroller.updateTimeout = 5 * time.Second
}

// Returns true if the given object has the given finalizer in its ObjectMeta.
func (secretcontroller *SecretController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool {
secret := obj.(*apiv1.Secret)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestSecretController(t *testing.T) {
RegisterFakeList(secrets, &cluster2Client.Fake, &apiv1.SecretList{Items: []apiv1.Secret{}})
cluster2CreateChan := RegisterFakeCopyOnCreate(secrets, &cluster2Client.Fake, cluster2Watch)

secretController := NewSecretController(fakeClient)
secretController := newSecretController(fakeClient)
informerClientFactory := func(cluster *federationapi.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
Expand All @@ -79,10 +79,7 @@ func TestSecretController(t *testing.T) {
}
setClientFactory(secretController.secretFederatedInformer, informerClientFactory)

secretController.clusterAvailableDelay = time.Second
secretController.secretReviewDelay = 50 * time.Millisecond
secretController.smallDelay = 20 * time.Millisecond
secretController.updateTimeout = 5 * time.Second
secretController.minimizeLatency()

stop := make(chan struct{})
secretController.Run(stop)
Expand Down
42 changes: 42 additions & 0 deletions federation/pkg/typeadapters/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package(default_visibility = ["//visibility:public"])

licenses(["notice"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)

go_library(
name = "go_default_library",
srcs = [
"adapter.go",
"secret.go",
],
tags = ["automanaged"],
deps = [
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//federation/pkg/typeadapters/crudtester:all-srcs",
],
tags = ["automanaged"],
)
48 changes: 48 additions & 0 deletions federation/pkg/typeadapters/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package typeadapters

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)

// FederatedTypeAdapter defines operations for interacting with a
// federated type. Code written to this interface can then target any
// type for which an implementation of this interface exists.
type FederatedTypeAdapter interface {
SetClient(client federationclientset.Interface)

Kind() string
Equivalent(obj1, obj2 pkgruntime.Object) bool
ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta
NamespacedName(obj pkgruntime.Object) types.NamespacedName

// Fed* operations target the federation control plane
FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error)
FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error

// The following operations are intended to target a cluster that is a member of a federation
ClusterGet(client clientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error)

NewTestObject(namespace string) pkgruntime.Object
}
35 changes: 35 additions & 0 deletions federation/pkg/typeadapters/crudtester/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package(default_visibility = ["//visibility:public"])

licenses(["notice"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)

go_library(
name = "go_default_library",
srcs = ["crudtester.go"],
tags = ["automanaged"],
deps = [
"//federation/pkg/typeadapters:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)