Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert TPR controller to posthook instead of disable flag #34979

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 1 addition & 10 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ type Config struct {
Tunneler genericapiserver.Tunneler
EnableUISupport bool
EnableLogsSupport bool

disableThirdPartyControllerForTesting bool
}

// EndpointReconcilerConfig holds the endpoint reconciler and endpoint reconciliation interval to be
Expand All @@ -130,8 +128,6 @@ type Master struct {
thirdPartyResources map[string]*thirdPartyEntry
// protects the map
thirdPartyResourcesLock sync.RWMutex
// Useful for reliable testing. Shouldn't be used otherwise.
disableThirdPartyControllerForTesting bool

// nodeClient is used to back the tunneler
nodeClient coreclient.NodeInterface
Expand Down Expand Up @@ -205,8 +201,6 @@ func (c completedConfig) New() (*Master, error) {
GenericAPIServer: s,
deleteCollectionWorkers: c.DeleteCollectionWorkers,
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),

disableThirdPartyControllerForTesting: c.disableThirdPartyControllerForTesting,
}

restOptionsFactory := restOptionsFactory{
Expand Down Expand Up @@ -246,10 +240,7 @@ func (c completedConfig) New() (*Master, error) {
c.RESTStorageProviders[autoscaling.GroupName] = autoscalingrest.RESTStorageProvider{}
c.RESTStorageProviders[batch.GroupName] = batchrest.RESTStorageProvider{}
c.RESTStorageProviders[certificates.GroupName] = certificatesrest.RESTStorageProvider{}
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{
ResourceInterface: m,
DisableThirdPartyControllerForTesting: m.disableThirdPartyControllerForTesting,
}
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ResourceInterface: m}
c.RESTStorageProviders[policy.GroupName] = policyrest.RESTStorageProvider{}
c.RESTStorageProviders[rbac.GroupName] = &rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser}
c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{}
Expand Down
8 changes: 0 additions & 8 deletions pkg/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
config.EnableCoreControllers = false

// TODO: this is kind of hacky. The trouble is that the sync loop
// runs in a go-routine and there is no way to validate in the test
// that the sync routine has actually run. The right answer here
// is probably to add some sort of callback that we can register
// to validate that it's actually been run, but for now we don't
// run the sync routine and register types manually.
config.disableThirdPartyControllerForTesting = true

master, err := config.Complete().New()
if err != nil {
t.Fatal(err)
Expand Down
40 changes: 27 additions & 13 deletions pkg/registry/extensions/rest/storage_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package rest

import (
"fmt"
"time"

"github.com/golang/glog"
Expand All @@ -25,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
"k8s.io/kubernetes/pkg/genericapiserver"
horizontalpodautoscaleretcd "k8s.io/kubernetes/pkg/registry/autoscaling/horizontalpodautoscaler/etcd"
jobetcd "k8s.io/kubernetes/pkg/registry/batch/job/etcd"
Expand All @@ -36,12 +38,12 @@ import (
pspetcd "k8s.io/kubernetes/pkg/registry/extensions/podsecuritypolicy/etcd"
replicasetetcd "k8s.io/kubernetes/pkg/registry/extensions/replicaset/etcd"
thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresource/etcd"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)

type RESTStorageProvider struct {
ResourceInterface ResourceInterface
DisableThirdPartyControllerForTesting bool
ResourceInterface ResourceInterface
}

var _ genericapiserver.RESTStorageProvider = &RESTStorageProvider{}
Expand Down Expand Up @@ -73,17 +75,6 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise
}
if apiResourceConfigSource.ResourceEnabled(version.WithResource("thirdpartyresources")) {
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(restOptionsGetter(extensions.Resource("thirdpartyresources")))
thirdPartyControl := ThirdPartyController{
master: p.ResourceInterface,
thirdPartyResourceRegistry: thirdPartyResourceStorage,
}
if !p.DisableThirdPartyControllerForTesting {
go wait.Forever(func() {
if err := thirdPartyControl.SyncResources(); err != nil {
glog.Warningf("third party resource sync failed: %v", err)
}
}, 10*time.Second)
}
storage["thirdpartyresources"] = thirdPartyResourceStorage
}

Expand Down Expand Up @@ -126,3 +117,26 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise

return storage
}

func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return "extensions/third-party-resources", p.postStartHookFunc, nil
}
func (p RESTStorageProvider) postStartHookFunc(hookContext genericapiserver.PostStartHookContext) error {
clientset, err := extensionsclient.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to initialize clusterroles: %v", err))
return nil
}

thirdPartyControl := ThirdPartyController{
master: p.ResourceInterface,
client: clientset,
}
go wait.Forever(func() {
if err := thirdPartyControl.SyncResources(); err != nil {
glog.Warningf("third party resource sync failed: %v", err)
}
}, 10*time.Second)

return nil
}
8 changes: 4 additions & 4 deletions pkg/registry/extensions/rest/thirdparty_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresource/etcd"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
Expand All @@ -47,8 +47,8 @@ const thirdpartyprefix = "/apis"
// ThirdPartyController is a control loop that knows how to synchronize ThirdPartyResource objects with
// RESTful resources which are present in the API server.
type ThirdPartyController struct {
master ResourceInterface
thirdPartyResourceRegistry *thirdpartyresourceetcd.REST
master ResourceInterface
client extensionsclient.ThirdPartyResourcesGetter
}

// SyncOneResource synchronizes a single resource with RESTful resources on the master
Expand All @@ -68,7 +68,7 @@ func (t *ThirdPartyController) SyncOneResource(rsrc *extensions.ThirdPartyResour

// Synchronize all resources with RESTful resources on the master
func (t *ThirdPartyController) SyncResources() error {
list, err := t.thirdPartyResourceRegistry.List(api.NewDefaultContext(), nil)
list, err := t.client.ThirdPartyResources().List(api.ListOptions{})
if err != nil {
return err
}
Expand Down