Skip to content

Commit

Permalink
apiextensions: wait for complete discovery endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
sttts committed Mar 17, 2020
1 parent 84dc704 commit 34f5737
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

crdController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
Expand All @@ -231,12 +231,19 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
}

go crdController.Run(context.StopCh)
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)

discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
case <-discoverySyncedCh:
}

return nil
})
// we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery)
})
}

func (c *DiscoveryController) Run(stopCh <-chan struct{}) {
func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
defer klog.Infof("Shutting down DiscoveryController")
Expand All @@ -213,6 +213,31 @@ func (c *DiscoveryController) Run(stopCh <-chan struct{}) {
return
}

// initially sync all group versions to make sure we serve complete discovery
if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
crds, err := c.crdLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially list CRDs: %v", err))
return false, nil
}
for _, crd := range crds {
for _, v := range crd.Spec.Versions {
gv := schema.GroupVersion{crd.Spec.Group, v.Name}
if err := c.sync(gv); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially sync CRD version %v: %v", gv, err))
return false, nil
}
}
}
return true, nil
}, stopCh); err == wait.ErrWaitTimeout {
utilruntime.HandleError(fmt.Errorf("timed out waiting for discovery endpoint to initialize"))
return
} else if err != nil {
panic(fmt.Errorf("unexpected error: %v", err))
}
close(synchedCh)

// only start one worker thread since its a slow moving API
go wait.Until(c.runWorker, time.Second, stopCh)

Expand Down

0 comments on commit 34f5737

Please sign in to comment.