Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change clientset of service controller to versioned #26694

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions federation/client/cache/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package cache

import (
"github.com/golang/glog"
"k8s.io/kubernetes/federation/apis/federation"
"k8s.io/kubernetes/federation/apis/federation/v1alpha1"
kubeCache "k8s.io/kubernetes/pkg/client/cache"
)

Expand All @@ -28,16 +28,16 @@ type StoreToClusterLister struct {
kubeCache.Store
}

func (s *StoreToClusterLister) List() (clusters federation.ClusterList, err error) {
func (s *StoreToClusterLister) List() (clusters v1alpha1.ClusterList, err error) {
for _, m := range s.Store.List() {
clusters.Items = append(clusters.Items, *(m.(*federation.Cluster)))
clusters.Items = append(clusters.Items, *(m.(*v1alpha1.Cluster)))
}
return clusters, nil
}

// ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet
// some set of criteria defined by the function.
type ClusterConditionPredicate func(cluster federation.Cluster) bool
type ClusterConditionPredicate func(cluster v1alpha1.Cluster) bool

// storeToClusterConditionLister filters and returns nodes matching the given type and status from the store.
type storeToClusterConditionLister struct {
Expand All @@ -51,9 +51,9 @@ func (s *StoreToClusterLister) ClusterCondition(predicate ClusterConditionPredic
}

// List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister.
func (s storeToClusterConditionLister) List() (clusters federation.ClusterList, err error) {
func (s storeToClusterConditionLister) List() (clusters v1alpha1.ClusterList, err error) {
for _, m := range s.store.List() {
cluster := *m.(*federation.Cluster)
cluster := *m.(*v1alpha1.Cluster)
if s.predicate(cluster) {
clusters.Items = append(clusters.Items, cluster)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ import (
"net/http/pprof"
"strconv"

"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/pkg/client/restclient"

internalclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/util/configz"
Expand Down Expand Up @@ -77,7 +76,7 @@ func Run(s *options.CMServer) error {
glog.Errorf("unable to register configz: %s", err)
}
// Create the config to talk to federation-apiserver.
kubeconfigGetter := clustercontroller.KubeconfigGetterForSecret(FederationAPIServerSecretName)
kubeconfigGetter := util.KubeconfigGetterForSecret(FederationAPIServerSecretName)
Copy link
Author

@mfanjie mfanjie Jun 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhiljindal is this a prerequisite for starting controller manager? do we need always create secret for kubeconfig of federation controller plane? I keep this as is but want to let you know , originally, we can simply run ./federation-controller-manager --v=5 to start controller manager without any preconfiguration, but now I have to create secret for kubeconfig.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes federation-controller-manager needs the kubeconfig of federation-apiserver to be able to talk to it.

We can update this code to try insecure access if secret does not exist, which will ensure that your local setup will work without secret. For clusters on GCE/GKE or any other cloud provider, we will need the secret.

restClientCfg, err := clientcmd.BuildConfigFromKubeconfigGetter(s.Master, kubeconfigGetter)
if err != nil {
return err
Expand Down Expand Up @@ -115,14 +114,14 @@ func Run(s *options.CMServer) error {

func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {

federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run()
ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
go clustercontroller.NewclusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run()
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
scclientset := internalclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scclientset, dns)
scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scClientset, dns)
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}
Expand Down
76 changes: 4 additions & 72 deletions federation/pkg/federation-controller/cluster/cluster_client.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -18,22 +18,17 @@ package cluster

import (
"fmt"
"net"
"os"
"strings"

"github.com/golang/glog"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/discovery"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
)

Expand All @@ -44,81 +39,18 @@ const (
KubeconfigSecretDataKey = "kubeconfig"
)

// This is to inject a different kubeconfigGetter in tests.
// We dont use the standard one which calls NewInCluster in tests to avoid having to setup service accounts and mount files with secret tokens.
var KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
secretRefName := ""
if c.Spec.SecretRef != nil {
secretRefName = c.Spec.SecretRef.Name
} else {
glog.Infof("didnt find secretRef for cluster %s. Trying insecure access", c.Name)
}
return KubeconfigGetterForSecret(secretRefName)()
}
}

// KubeconfigGettterForSecret is used to get the kubeconfig from the given secret.
var KubeconfigGetterForSecret = func(secretName string) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
// Get the namespace this is running in from the env variable.
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
return nil, fmt.Errorf("unexpected: POD_NAMESPACE env var returned empty string")
}
// Get a client to talk to the k8s apiserver, to fetch secrets from it.
client, err := client.NewInCluster()
if err != nil {
return nil, fmt.Errorf("error in creating in-cluster client: %s", err)
}
data := []byte{}
if secretName != "" {
secret, err := client.Secrets(namespace).Get(secretName)
if err != nil {
return nil, fmt.Errorf("error in fetching secret: %s", err)
}
ok := false
data, ok = secret.Data[KubeconfigSecretDataKey]
if !ok {
return nil, fmt.Errorf("secret does not have data with key: %s", KubeconfigSecretDataKey)
}
}
return clientcmd.Load(data)
}
}

type ClusterClient struct {
discoveryClient *discovery.DiscoveryClient
kubeClient *clientset.Clientset
}

func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) {
var serverAddress string
hostIP, err := utilnet.ChooseHostInterface()
clusterConfig, err := util.BuildClusterConfig(c)
if err != nil {
return nil, err
}

for _, item := range c.Spec.ServerAddressByClientCIDRs {
_, cidrnet, err := net.ParseCIDR(item.ClientCIDR)
if err != nil {
return nil, err
}
myaddr := net.ParseIP(hostIP.String())
if cidrnet.Contains(myaddr) == true {
serverAddress = item.ServerAddress
break
}
}
var clusterClientSet = ClusterClient{}
if serverAddress != "" {
kubeconfigGetter := KubeconfigGetterForCluster(c)
clusterConfig, err := clientcmd.BuildConfigFromKubeconfigGetter(serverAddress, kubeconfigGetter)
if err != nil {
return nil, err
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
if clusterConfig != nil {
clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName)))
if clusterClientSet.discoveryClient == nil {
return nil, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@ import (
"time"

"github.com/golang/glog"
"k8s.io/kubernetes/federation/apis/federation"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
cluster_cache "k8s.io/kubernetes/federation/client/cache"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
Expand Down Expand Up @@ -73,7 +72,7 @@ func NewclusterController(federationClient federationclientset.Interface, cluste
return cc.federationClient.Federation().Clusters().Watch(options)
},
},
&federation.Cluster{},
&federation_v1alpha1.Cluster{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{
DeleteFunc: cc.delFromClusterSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
controller_util "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -124,8 +125,8 @@ func TestUpdateClusterStatusOK(t *testing.T) {
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))

// Override KubeconfigGetterForCluster to avoid having to setup service accounts and mount files with secret tokens.
originalGetter := KubeconfigGetterForCluster
KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
originalGetter := controller_util.KubeconfigGetterForCluster
controller_util.KubeconfigGetterForCluster = func(c *federation_v1alpha1.Cluster) clientcmd.KubeconfigGetter {
return func() (*clientcmdapi.Config, error) {
return &clientcmdapi.Config{}, nil
}
Expand All @@ -146,5 +147,5 @@ func TestUpdateClusterStatusOK(t *testing.T) {
}

// Reset KubeconfigGetterForCluster
KubeconfigGetterForCluster = originalGetter
controller_util.KubeconfigGetterForCluster = originalGetter
}
51 changes: 25 additions & 26 deletions federation/pkg/federation-controller/service/cluster_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,26 @@ package service
import (
"sync"

"k8s.io/kubernetes/federation/apis/federation"
v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/pkg/api"
v1 "k8s.io/kubernetes/pkg/api/v1"
cache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
release_1_3 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"

"github.com/golang/glog"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"reflect"
)

type clusterCache struct {
clientset *clientset.Clientset
cluster *federation.Cluster
clientset *release_1_3.Clientset
cluster *v1alpha1.Cluster
// A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister
// Watches changes to all services
Expand All @@ -57,7 +58,7 @@ type clusterClientCache struct {
clientMap map[string]*clusterCache
}

func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, clusterName string) {
func (cc *clusterClientCache) startClusterLW(cluster *v1alpha1.Cluster, clusterName string) {
cachedClusterClient, ok := cc.clientMap[clusterName]
// only create when no existing cachedClusterClient
if ok {
Expand Down Expand Up @@ -92,13 +93,13 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Endpoints(api.NamespaceAll).List(options)
return clientset.Core().Endpoints(v1.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Endpoints(api.NamespaceAll).Watch(options)
return clientset.Core().Endpoints(v1.NamespaceAll).Watch(options)
},
},
&api.Endpoints{},
&v1.Endpoints{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -116,25 +117,25 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Services(api.NamespaceAll).List(options)
return clientset.Core().Services(v1.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Services(api.NamespaceAll).Watch(options)
return clientset.Core().Services(v1.NamespaceAll).Watch(options)
},
},
&api.Service{},
&v1.Service{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cc.enqueueService(obj, clusterName)
},
UpdateFunc: func(old, cur interface{}) {
oldService, ok := old.(*api.Service)
oldService, ok := old.(*v1.Service)

if !ok {
return
}
curService, ok := cur.(*api.Service)
curService, ok := cur.(*v1.Service)
if !ok {
return
}
Expand All @@ -143,7 +144,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
}
},
DeleteFunc: func(obj interface{}) {
service, _ := obj.(*api.Service)
service, _ := obj.(*v1.Service)
cc.enqueueService(obj, clusterName)
glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName)
},
Expand All @@ -161,7 +162,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, cluste
// delFromClusterSet delete a cluster from clusterSet and
// delete the corresponding restclient from the map clusterKubeClientMap
func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
cluster, ok := obj.(*federation.Cluster)
cluster, ok := obj.(*v1alpha1.Cluster)
cc.rwlock.Lock()
defer cc.rwlock.Unlock()
if ok {
Expand All @@ -180,10 +181,10 @@ func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding
// restclient to map clusterKubeClientMap
func (cc *clusterClientCache) addToClientMap(obj interface{}) {
cluster := obj.(*federation.Cluster)
cluster := obj.(*v1alpha1.Cluster)
cc.rwlock.Lock()
defer cc.rwlock.Unlock()
cluster, ok := obj.(*federation.Cluster)
cluster, ok := obj.(*v1alpha1.Cluster)
if !ok {
return
}
Expand All @@ -195,13 +196,11 @@ func (cc *clusterClientCache) addToClientMap(obj interface{}) {
}
}

func newClusterClientset(c *federation.Cluster) (*clientset.Clientset, error) {
clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "")
if err != nil {
return nil, err
func newClusterClientset(c *v1alpha1.Cluster) (*release_1_3.Clientset, error) {
clusterConfig, err := util.BuildClusterConfig(c)
if clusterConfig != nil {
clientset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
return clientset, nil
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
clientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
return clientset, nil
return nil, err
}