Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make groupVersionResource listing dynamic for namespace controller #35947

Merged
merged 1 commit into from
Nov 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 22 additions & 3 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,30 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := client("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont you need to change the RESTMapper as well?
dynamic client has a NewDiscoveryRESTMapper method: https://github.com/kubernetes/kubernetes/blob/master/pkg/client/typed/dynamic/dynamic_util.go#L37

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so? We recreate a new dynamic client for each GroupVersionResource

https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/namespace/namespace_controller_utils.go#L282

Which I think should be capable of handling it w/o a new mapper? But I have to admit this part of the codebase is pretty opaque...

groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
// TODO: consider using a list-watch + cache here rather than polling
var gvrFn func() ([]unversioned.GroupVersionResource, error)
rsrcs, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
glog.Fatalf("Failed to get group version resources: %v", err)
}
for _, rsrcList := range rsrcs {
for ix := range rsrcList.APIResources {
rsrc := &rsrcList.APIResources[ix]
if rsrc.Kind == "ThirdPartyResource" {
gvrFn = namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
}
}
}
if gvrFn == nil {
gvr, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
if err != nil {
glog.Fatalf("Failed to get resources: %v", err)
}
gvrFn = func() ([]unversioned.GroupVersionResource, error) {
return gvr, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/namespace/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type NamespaceController struct {
controller *cache.Controller
// namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface
// list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResources []unversioned.GroupVersionResource
// function to list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error)
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
opCache *operationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
Expand All @@ -59,7 +59,7 @@ type NamespaceController struct {
func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
groupVersionResources []unversioned.GroupVersionResource,
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error),
resyncPeriod time.Duration,
finalizerToken api.FinalizerName) *NamespaceController {

Expand All @@ -86,9 +86,9 @@ func NewNamespaceController(
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
groupVersionResources: groupVersionResources,
opCache: opCache,
finalizerToken: finalizerToken,
groupVersionResourcesFn: groupVersionResourcesFn,
opCache: opCache,
finalizerToken: finalizerToken,
}

if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
Expand Down Expand Up @@ -191,7 +191,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
return err
}
namespace := obj.(*api.Namespace)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResources, namespace, nm.finalizerToken)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken)
}

// Run starts observing the system with the specified number of workers.
Expand Down
21 changes: 19 additions & 2 deletions pkg/controller/namespace/namespace_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
testNamespace *api.Namespace
kubeClientActionSet sets.String
dynamicClientActionSet sets.String
gvrError error
}{
"pending-finalize": {
testNamespace: testNamespacePendingFinalize,
Expand All @@ -148,6 +149,15 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
),
dynamicClientActionSet: sets.NewString(),
},
"groupVersionResourceErr": {
testNamespace: testNamespaceFinalizeComplete,
kubeClientActionSet: sets.NewString(
strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"),
),
dynamicClientActionSet: sets.NewString(),
gvrError: fmt.Errorf("test error"),
},
}

for scenario, testInput := range scenarios {
Expand All @@ -158,7 +168,11 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
mockClient := fake.NewSimpleClientset(testInput.testNamespace)
clientPool := dynamic.NewClientPool(clientConfig, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)

err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, groupVersionResources, testInput.testNamespace, api.FinalizerKubernetes)
fn := func() ([]unversioned.GroupVersionResource, error) {
return groupVersionResources, nil
}

err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testInput.testNamespace, api.FinalizerKubernetes)
if err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
}
Expand Down Expand Up @@ -227,7 +241,10 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
Phase: api.NamespaceActive,
},
}
err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, testGroupVersionResources(), testNamespace, api.FinalizerKubernetes)
fn := func() ([]unversioned.GroupVersionResource, error) {
return testGroupVersionResources(), nil
}
err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, api.FinalizerKubernetes)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/namespace/namespace_controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func syncNamespace(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error),
namespace *api.Namespace,
finalizerToken api.FinalizerName,
) error {
Expand Down Expand Up @@ -422,6 +422,10 @@ func syncNamespace(
}

// there may still be content for us to remove
groupVersionResources, err := groupVersionResourcesFn()
if err != nil {
return err
}
estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err
Expand Down
7 changes: 2 additions & 5 deletions test/e2e_node/services/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ func (n *NamespaceController) Start() error {
return err
}
clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
resources, err := client.Discovery().ServerPreferredNamespacedResources()
if err != nil {
return err
}
nc := namespacecontroller.NewNamespaceController(client, clientPool, resources, ncResyncPeriod, api.FinalizerKubernetes)
gvrFn := client.Discovery().ServerPreferredNamespacedResources
nc := namespacecontroller.NewNamespaceController(client, clientPool, gvrFn, ncResyncPeriod, api.FinalizerKubernetes)
go nc.Run(ncConcurrency, n.stopCh)
return nil
}
Expand Down