Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plumb stopch to post start hook index since many of them are starting go funcs #45495

Merged
merged 1 commit into from
May 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command

}

func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, stopCh)
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}
Expand All @@ -105,8 +105,8 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
tprRegistrationController := thirdparty.NewAutoRegistrationController(sharedInformers.Extensions().InternalVersion().ThirdPartyResources(), autoRegistrationController)

aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go autoRegistrationController.Run(5, stopCh)
go tprRegistrationController.Run(5, stopCh)
go autoRegistrationController.Run(5, context.StopCh)
go tprRegistrationController.Run(5, context.StopCh)
return nil
})
aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error {
Expand Down
8 changes: 4 additions & 4 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers, stopCh)
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers)
if err != nil {
return err
}
Expand All @@ -129,7 +129,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, stopCh)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return err
Expand All @@ -138,13 +138,13 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
}

// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*master.Master, error) {
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, sharedInformers informers.SharedInformerFactory) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(genericapiserver.EmptyDelegate)
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
sharedInformers.Start(stopCh)
sharedInformers.Start(context.StopCh)
return nil
})

Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestNewWithDelegate(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
wrappingServer.PrepareRun()
wrappingServer.RunPostStartHooks()
wrappingServer.RunPostStartHooks(stopCh)

server := httptest.NewServer(wrappingServer.Handler)
defer server.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
close(internalStopCh)
}()

s.RunPostStartHooks()
s.RunPostStartHooks(stopCh)

if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
glog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
Expand Down
9 changes: 7 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/server/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type PostStartHookFunc func(context PostStartHookContext) error
type PostStartHookContext struct {
// LoopbackClientConfig is a config for a privileged loopback connection to the API server
LoopbackClientConfig *restclient.Config
// StopCh is the channel that will be closed when the server stops
StopCh <-chan struct{}
}

// PostStartHookProvider is an interface in addition to provide a post start hook for the api server
Expand Down Expand Up @@ -89,12 +91,15 @@ func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc)
}

// RunPostStartHooks runs the PostStartHooks for the server
func (s *GenericAPIServer) RunPostStartHooks() {
func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
s.postStartHooksCalled = true

context := PostStartHookContext{LoopbackClientConfig: s.LoopbackClientConfig}
context := PostStartHookContext{
LoopbackClientConfig: s.LoopbackClientConfig,
StopCh: stopCh,
}

for hookName, hookEntry := range s.postStartHooks {
go runPostStartHook(hookName, hookEntry, context)
Expand Down
10 changes: 5 additions & 5 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *Config) SkipComplete() completedConfig {
}

// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, stopCh <-chan struct{}) (*APIAggregator, error) {
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().New(delegationTarget) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
Expand Down Expand Up @@ -185,16 +185,16 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
)

s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(stopCh)
kubeInformers.Start(stopCh)
informerFactory.Start(context.StopCh)
kubeInformers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
go apiserviceRegistrationController.Run(stopCh)
go apiserviceRegistrationController.Run(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
go availableController.Run(stopCh)
go availableController.Run(context.StopCh)
return nil
})

Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
return err
}

server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate, stopCh)
server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (c *Config) SkipComplete() completedConfig {
}

// New returns a new instance of CustomResources from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget, stopCh <-chan struct{}) (*CustomResources, error) {
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResources, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().New(genericapiserver.EmptyDelegate) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
Expand Down Expand Up @@ -155,11 +155,11 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget,
customResourceController := NewDiscoveryController(customResourceInformers.Apiextensions().InternalVersion().CustomResources(), versionDiscoveryHandler, groupDiscoveryHandler)

s.GenericAPIServer.AddPostStartHook("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
customResourceInformers.Start(stopCh)
customResourceInformers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
go customResourceController.Run(stopCh)
go customResourceController.Run(context.StopCh)
return nil
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (o CustomResourcesServerOptions) RunCustomResourcesServer(stopCh <-chan str
return err
}

server, err := config.Complete().New(genericapiserver.EmptyDelegate, stopCh)
server, err := config.Complete().New(genericapiserver.EmptyDelegate)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ func DefaultServerConfig() (*extensionsapiserver.Config, error) {

func StartServer(config *extensionsapiserver.Config) (chan struct{}, clientset.Interface, dynamic.ClientPool, error) {
stopCh := make(chan struct{})
server, err := config.Complete().New(genericapiserver.EmptyDelegate, stopCh)
server, err := config.Complete().New(genericapiserver.EmptyDelegate)
if err != nil {
close(stopCh)
return nil, nil, nil, err
}
go func() {
Expand Down
10 changes: 5 additions & 5 deletions test/integration/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ import (
"k8s.io/kubernetes/test/integration/framework"
)

func setup(t *testing.T) (*httptest.Server, clientset.Interface) {
func setup(t *testing.T) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.EnableCoreControllers = false
_, s := framework.RunAMaster(masterConfig)
_, s, closeFn := framework.RunAMaster(masterConfig)

clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL})
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
return s, clientSet
return s, clientSet, closeFn
}

func verifyStatusCode(t *testing.T, verb, URL, body string, expectedStatusCode int) {
Expand Down Expand Up @@ -109,8 +109,8 @@ var cascDel = `

// Tests that the apiserver returns 202 status code as expected.
func Test202StatusCode(t *testing.T) {
s, clientSet := setup(t)
defer s.Close()
s, clientSet, closeFn := setup(t)
defer closeFn()

ns := framework.CreateTestingNamespace("status-code", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down
4 changes: 2 additions & 2 deletions test/integration/apiserver/patch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (

// Tests that the apiserver retries non-overlapping conflicts on patches
func TestPatchConflicts(t *testing.T) {
s, clientSet := setup(t)
defer s.Close()
s, clientSet, closeFn := setup(t)
defer closeFn()

ns := framework.CreateTestingNamespace("status-code", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down
12 changes: 6 additions & 6 deletions test/integration/auth/accessreview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func TestSubjectAccessReview(t *testing.T) {
masterConfig.GenericConfig.Authenticator = authenticator.RequestFunc(alwaysAlice)
masterConfig.GenericConfig.Authorizer = sarAuthorizer{}
masterConfig.GenericConfig.AdmissionControl = admit.NewAlwaysAdmit()
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(api.GroupName).GroupVersion}})

Expand Down Expand Up @@ -151,8 +151,8 @@ func TestSelfSubjectAccessReview(t *testing.T) {
})
masterConfig.GenericConfig.Authorizer = sarAuthorizer{}
masterConfig.GenericConfig.AdmissionControl = admit.NewAlwaysAdmit()
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(api.GroupName).GroupVersion}})

Expand Down Expand Up @@ -231,8 +231,8 @@ func TestLocalSubjectAccessReview(t *testing.T) {
masterConfig.GenericConfig.Authenticator = authenticator.RequestFunc(alwaysAlice)
masterConfig.GenericConfig.Authorizer = sarAuthorizer{}
masterConfig.GenericConfig.AdmissionControl = admit.NewAlwaysAdmit()
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(api.GroupName).GroupVersion}})

Expand Down
44 changes: 22 additions & 22 deletions test/integration/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ func getTestRequests(namespace string) []struct {
func TestAuthModeAlwaysAllow(t *testing.T) {
// Set up a master
masterConfig := framework.NewIntegrationTestMasterConfig()
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-always-allow", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -501,8 +501,8 @@ func TestAuthModeAlwaysDeny(t *testing.T) {
// Set up a master
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authorizer = authorizerfactory.NewAlwaysDenyAuthorizer()
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-always-deny", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -552,8 +552,8 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) {
masterConfig.GenericConfig.Authenticator = getTestTokenAuth()
masterConfig.GenericConfig.Authorizer = allowAliceAuthorizer{}
masterConfig.GenericConfig.AdmissionControl = admit.NewAlwaysAdmit()
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-alice-not-forbidden", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -621,8 +621,8 @@ func TestBobIsForbidden(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authenticator = getTestTokenAuth()
masterConfig.GenericConfig.Authorizer = allowAliceAuthorizer{}
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-bob-forbidden", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -665,8 +665,8 @@ func TestUnknownUserIsUnauthorized(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authenticator = getTestTokenAuth()
masterConfig.GenericConfig.Authorizer = allowAliceAuthorizer{}
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-unknown-unauthorized", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -727,8 +727,8 @@ func TestImpersonateIsForbidden(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authenticator = getTestTokenAuth()
masterConfig.GenericConfig.Authorizer = impersonateAuthorizer{}
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-impersonate-forbidden", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -874,8 +874,8 @@ func TestAuthorizationAttributeDetermination(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authenticator = getTestTokenAuth()
masterConfig.GenericConfig.Authorizer = trackingAuthorizer
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-attribute-determination", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -940,8 +940,8 @@ func TestNamespaceAuthorization(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authenticator = getTestTokenAuth()
masterConfig.GenericConfig.Authorizer = a
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-namespace", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -1038,8 +1038,8 @@ func TestKindAuthorization(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authenticator = getTestTokenAuth()
masterConfig.GenericConfig.Authorizer = a
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-kind", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -1122,8 +1122,8 @@ func TestReadOnlyAuthorization(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authenticator = getTestTokenAuth()
masterConfig.GenericConfig.Authorizer = a
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-read-only", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down Expand Up @@ -1181,8 +1181,8 @@ func TestWebhookTokenAuthenticator(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.Authenticator = authenticator
masterConfig.GenericConfig.Authorizer = allowAliceAuthorizer{}
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()

ns := framework.CreateTestingNamespace("auth-webhook-token", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
Expand Down