Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.16] Cherry pick of #92644: Wait for all informers to sync in /readyz. #92695

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 14 additions & 7 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Expand Up @@ -559,13 +559,20 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
}

genericApiServerHookName := "generic-apiserver-start-informers"
if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) {
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
c.SharedInformerFactory.Start(context.StopCh)
return nil
})
if err != nil {
return nil, err
if c.SharedInformerFactory != nil {
if !s.isPostStartHookRegistered(genericApiServerHookName) {
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
c.SharedInformerFactory.Start(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
// TODO: Once we get rid of /healthz consider changing this to post-start-hook.
err = s.addReadyzChecks(healthz.NewInformerSyncHealthz(c.SharedInformerFactory))
if err != nil {
return nil, err
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/server/config_test.go
Expand Up @@ -158,6 +158,7 @@ func TestNewWithDelegate(t *testing.T) {
"/metrics",
"/readyz",
"/readyz/delegate-health",
"/readyz/informer-sync",
"/readyz/log",
"/readyz/ping",
"/readyz/poststarthook/delegate-post-start-hook",
Expand Down Expand Up @@ -233,10 +234,10 @@ func checkExpectedPathsAtRoot(url string, expectedPaths []string, t *testing.T)
pathset.Insert(p.(string))
}
expectedset := sets.NewString(expectedPaths...)
for _, p := range pathset.Difference(expectedset) {
for p := range pathset.Difference(expectedset) {
t.Errorf("Got %v path, which we did not expect", p)
}
for _, p := range expectedset.Difference(pathset) {
for p := range expectedset.Difference(pathset) {
t.Errorf(" Expected %v path which we did not get", p)
}
})
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/server/healthz/BUILD
Expand Up @@ -27,6 +27,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
Expand Down
34 changes: 34 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/client-go/informers"
"k8s.io/klog"
)

Expand Down Expand Up @@ -80,6 +81,39 @@ func (l *log) Check(_ *http.Request) error {
return fmt.Errorf("logging blocked")
}

type informerSync struct {
sharedInformerFactory informers.SharedInformerFactory
}

var _ HealthChecker = &informerSync{}

// NewInformerSyncHealthz returns a new HealthChecker that will pass only if all informers in the given sharedInformerFactory sync.
func NewInformerSyncHealthz(sharedInformerFactory informers.SharedInformerFactory) HealthChecker {
return &informerSync{
sharedInformerFactory: sharedInformerFactory,
}
}

func (i *informerSync) Name() string {
return "informer-sync"
}

func (i *informerSync) Check(_ *http.Request) error {
stopCh := make(chan struct{})
// Close stopCh to force checking if informers are synced now.
close(stopCh)

var informersByStarted map[bool][]string
for informerType, started := range i.sharedInformerFactory.WaitForCacheSync(stopCh) {
informersByStarted[started] = append(informersByStarted[started], informerType.String())
}

if notStarted := informersByStarted[false]; len(notStarted) > 0 {
return fmt.Errorf("%d informers not started yet: %v", len(notStarted), notStarted)
}
return nil
}

// NamedCheck returns a healthz checker for the given name and function.
func NamedCheck(name string, check func(r *http.Request) error) HealthChecker {
return &healthzCheck{name, check}
Expand Down
18 changes: 11 additions & 7 deletions test/integration/master/kube_apiserver_test.go
Expand Up @@ -96,11 +96,15 @@ func TestRun(t *testing.T) {
}
}

func endpointReturnsStatusOK(client *kubernetes.Clientset, path string) bool {
res := client.CoreV1().RESTClient().Get().AbsPath(path).Do()
func endpointReturnsStatusOK(client *kubernetes.Clientset, path string) (bool, error) {
res := client.CoreV1().RESTClient().Get().RequestURI(path).Do()
var status int
res.StatusCode(&status)
return status == http.StatusOK
_, err := res.Raw()
if err != nil {
return false, err
}
return status == http.StatusOK, nil
}

func TestLivezAndReadyz(t *testing.T) {
Expand All @@ -111,11 +115,11 @@ func TestLivezAndReadyz(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !endpointReturnsStatusOK(client, "/livez") {
t.Fatalf("livez should be healthy")
if statusOK, err := endpointReturnsStatusOK(client, "/livez"); err != nil || !statusOK {
t.Fatalf("livez should be healthy, got %v and error %v", statusOK, err)
}
if !endpointReturnsStatusOK(client, "/readyz") {
t.Fatalf("readyz should be healthy")
if statusOK, err := endpointReturnsStatusOK(client, "/readyz"); err != nil || !statusOK {
t.Fatalf("readyz should be healthy, got %v and error %v", statusOK, err)
}
}

Expand Down