Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

federation: use generated listers #41927

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion federation/pkg/federation-controller/replicaset/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ go_library(
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/pkg/api/v1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/pkg/api/v1"
Expand All @@ -47,7 +49,7 @@ import (
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
)

Expand Down Expand Up @@ -84,7 +86,7 @@ type ReplicaSetController struct {
fedClient fedclientset.Interface

replicaSetController cache.Controller
replicaSetStore listers.StoreToReplicaSetLister
replicaSetLister extensionslisters.ReplicaSetLister

fedReplicaSetInformer fedutil.FederatedInformer
fedPodInformer fedutil.FederatedInformer
Expand Down Expand Up @@ -172,7 +174,8 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
}
frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})

frsc.replicaSetStore.Indexer, frsc.replicaSetController = cache.NewIndexerInformer(
var replicaSetIndexer cache.Indexer
replicaSetIndexer, frsc.replicaSetController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options)
Expand All @@ -188,6 +191,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
),
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer)

frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer,
func(client kubeclientset.Interface, obj runtime.Object) error {
Expand Down Expand Up @@ -349,12 +353,18 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati
glog.Errorf("Couldn't get key for object %v: %v", obj, err)
return
}
_, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Couldn't get federation replicaset %v: %v", key, err)
return
glog.Errorf("Error splitting key for object %v: %v", obj, err)
}
if exists { // ignore replicasets exists only in local k8s
_, err = frsc.replicaSetLister.ReplicaSets(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// do nothing
case err != nil:
glog.Errorf("Couldn't get federation replicaset %v: %v", key, err)
default:
// ReplicaSet exists. Ignore ReplicaSets that exist only in local k8s
frsc.deliverReplicaSetByKey(key, duration, false)
}
}
Expand Down Expand Up @@ -477,14 +487,18 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime))

objFromStore, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return statusError, err
}
if !exists {
objFromStore, err := frsc.replicaSetLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
// don't delete local replicasets for now. Do not reconcile it anymore.
return statusAllOk, nil
}
if err != nil {
return statusError, err
}
obj, err := api.Scheme.DeepCopy(objFromStore)
frs, ok := obj.(*extensionsv1.ReplicaSet)
if err != nil || !ok {
Expand Down Expand Up @@ -626,7 +640,11 @@ func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
if !frsc.isSynced() {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}
rss := frsc.replicaSetStore.Indexer.List()
rss, err := frsc.replicaSetLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("error listing replica sets: %v", err))
return
}
for _, rs := range rss {
key, _ := controller.KeyFunc(rs)
frsc.deliverReplicaSetByKey(key, 0, false)
Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federation-controller/service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
Expand Down
15 changes: 10 additions & 5 deletions federation/pkg/federation-controller/service/cluster_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ import (
v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
v1 "k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"

"reflect"

"github.com/golang/glog"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
)

type clusterCache struct {
clientset *kubeclientset.Clientset
cluster *v1beta1.Cluster
// A store of services, populated by the serviceController
serviceStore listers.StoreToServiceLister
serviceStore corelisters.ServiceLister
// Watches changes to all services
serviceController cache.Controller
// A store of endpoint, populated by the serviceController
endpointStore listers.StoreToEndpointsLister
endpointStore corelisters.EndpointsLister
// Watches changes to all endpoints
endpointController cache.Controller
// services that need to be synced
Expand Down Expand Up @@ -91,7 +91,8 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
serviceQueue: workqueue.New(),
endpointQueue: workqueue.New(),
}
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = cache.NewInformer(
var endpointIndexer cache.Indexer
endpointIndexer, cachedClusterClient.endpointController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return clientset.Core().Endpoints(metav1.NamespaceAll).List(options)
Expand All @@ -113,9 +114,12 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
cc.enqueueEndpoint(obj, clusterName)
},
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
cachedClusterClient.endpointStore = corelisters.NewEndpointsLister(endpointIndexer)

cachedClusterClient.serviceStore.Indexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
var serviceIndexer cache.Indexer
serviceIndexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return clientset.Core().Services(metav1.NamespaceAll).List(options)
Expand Down Expand Up @@ -152,6 +156,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
cachedClusterClient.serviceStore = corelisters.NewServiceLister(serviceIndexer)
cc.clientMap[clusterName] = cachedClusterClient
go cachedClusterClient.serviceController.Run(wait.NeverStop)
go cachedClusterClient.endpointController.Run(wait.NeverStop)
Expand Down
28 changes: 12 additions & 16 deletions federation/pkg/federation-controller/service/endpoint_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package service

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
cache "k8s.io/client-go/tools/cache"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
v1 "k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -81,29 +81,25 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
// here we filtered all non-federation services
return nil
}
endpointInterface, exists, err := clusterCache.endpointStore.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.endpointQueue.Add(key)
return err
}
if exists {
endpoint, ok := endpointInterface.(*v1.Endpoints)
if ok {
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
} else {
_, ok := endpointInterface.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", endpointInterface)
}
glog.Infof("Found tombstone for %v", key)
err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
}
} else {
endpoint, err := clusterCache.endpointStore.Endpoints(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName)
err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
case err != nil:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could never actually have happened before, rigth?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't remember in what situation(s) a tombstone is possible

glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

util.HandleError.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require sweeping through the entire package and doing a find/replace. I opted not to do that here and would prefer to do it in a follow up if that's ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You added new code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ish". Before, it just did clusterCache.endpointStore.GetByKey and if that errored, it did the glog.Errorf in question. Now, it's a 2-parter: cache.SplitMetaNamespaceKey followed by clusterCache.endpointStore.Endpoints(namespace).Get(name), both of which could error.

clusterCache.endpointQueue.Add(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this case used to return

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deads2k fixed, ptal

return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(no action required) I don't think returning an error from this function makes any sense. The only thing the caller does is log it at Info level, and it's already logged at Error level.

default:
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
}
if err != nil {
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)
Expand Down
28 changes: 11 additions & 17 deletions federation/pkg/federation-controller/service/service_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package service

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -85,31 +84,26 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
// if serviceCache does not exists, that means the service is not created by federation, we should skip it
return nil
}
serviceInterface, exists, err := clusterCache.serviceStore.Indexer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.serviceQueue.Add(key)
return err
}
var needUpdate, isDeletion bool
if exists {
service, ok := serviceInterface.(*v1.Service)
if ok {
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
} else {
_, ok := serviceInterface.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", serviceInterface)
}
glog.Infof("Found tombstone for %v", key)
needUpdate = cc.processServiceDeletion(cachedService, clusterName)
isDeletion = true
}
} else {
service, err := clusterCache.serviceStore.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName)
needUpdate = cc.processServiceDeletion(cachedService, clusterName)
isDeletion = true
case err != nil:
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

util.HandleError

clusterCache.serviceQueue.Add(key)
return err
default:
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
}

if needUpdate {
Expand Down
52 changes: 25 additions & 27 deletions federation/pkg/federation-controller/service/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"k8s.io/kubernetes/pkg/api"
v1 "k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
)

Expand Down Expand Up @@ -121,7 +121,7 @@ type ServiceController struct {
serviceCache *serviceCache
clusterCache *clusterClientCache
// A store of services, populated by the serviceController
serviceStore listers.StoreToServiceLister
serviceStore corelisters.ServiceLister
// Watches changes to all services
serviceController cache.Controller
federatedInformer fedutil.FederatedInformer
Expand Down Expand Up @@ -180,7 +180,8 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
knownClusterSet: make(sets.String),
}
s.clusterDeliverer = util.NewDelayingDeliverer()
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
var serviceIndexer cache.Indexer
serviceIndexer, s.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return s.federationClient.Core().Services(metav1.NamespaceAll).List(options)
Expand All @@ -203,6 +204,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
s.serviceStore = corelisters.NewServiceLister(serviceIndexer)
s.clusterStore.Store, s.clusterController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
Expand Down Expand Up @@ -938,44 +940,40 @@ func (s *ServiceController) syncService(key string) error {
defer func() {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
}()
// obj holds the latest service info from apiserver
objFromStore, exists, err := s.serviceStore.Indexer.GetByKey(key)

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
s.queue.Add(key)
return err
}
if !exists {

service, err := s.serviceStore.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
}
// Create a copy before modifying the obj to prevent race condition with
// other readers of obj from store.
obj, err := conversion.NewCloner().DeepCopy(objFromStore)
if err != nil {
glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
case err != nil:
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleError everywhere

s.queue.Add(key)
return err
}

if exists {
service, ok := obj.(*v1.Service)
if ok {
cachedService = s.serviceCache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
} else {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", obj)
}
glog.Infof("Found tombstone for %v", key)
err, retryDelay = s.processServiceDeletion(tombstone.Key)
default:
// Create a copy before modifying the obj to prevent race condition with
// other readers of obj from store.
copy, err := conversion.NewCloner().DeepCopy(service)
if err != nil {
glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
s.queue.Add(key)
return err
}
service := copy.(*v1.Service)
cachedService = s.serviceCache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
}

if retryDelay != 0 {
s.enqueueService(obj)
s.enqueueService(service)
} else if err != nil {
runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
}
Expand Down