Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apiextensions: wait for complete discovery endpoint #89145

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -207,7 +207,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

crdController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
Expand All @@ -231,12 +231,19 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
}

go crdController.Run(context.StopCh)
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)

discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
case <-discoverySyncedCh:
}

return nil
})
// we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer
Expand Down
Expand Up @@ -201,7 +201,7 @@ func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery)
})
}

func (c *DiscoveryController) Run(stopCh <-chan struct{}) {
func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
defer klog.Infof("Shutting down DiscoveryController")
Expand All @@ -213,6 +213,31 @@ func (c *DiscoveryController) Run(stopCh <-chan struct{}) {
return
}

// initially sync all group versions to make sure we serve complete discovery
if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
crds, err := c.crdLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially list CRDs: %v", err))
return false, nil
}
for _, crd := range crds {
for _, v := range crd.Spec.Versions {
gv := schema.GroupVersion{crd.Spec.Group, v.Name}
if err := c.sync(gv); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially sync CRD version %v: %v", gv, err))
return false, nil
}
}
}
return true, nil
}, stopCh); err == wait.ErrWaitTimeout {
utilruntime.HandleError(fmt.Errorf("timed out waiting for discovery endpoint to initialize"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic on these please. Based on the code it should never happen, but I don't want to ever silently not run this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, timeout happens only when stopCh closed, so we can simply error

return
} else if err != nil {
panic(fmt.Errorf("unexpected error: %v", err))
}
close(synchedCh)

// only start one worker thread since its a slow moving API
go wait.Until(c.runWorker, time.Second, stopCh)

Expand Down