Skip to content

Commit

Permalink
UPSTREAM: <carry>: when only this kube-apiserver can fulfill the kube…
Browse files Browse the repository at this point in the history
…rnetes.default.svc, don't wait for aggregated availability
  • Loading branch information
deads2k authored and sairameshv committed Dec 4, 2023
1 parent c040459 commit aaa8bd5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 10 deletions.
4 changes: 2 additions & 2 deletions openshift-kube-apiserver/openshiftkubeapiserver/patch.go
Expand Up @@ -108,8 +108,8 @@ func OpenShiftKubeAPIServerConfigPatch(genericConfig *genericapiserver.Config, k
}
// END HANDLER CHAIN

openshiftAPIServiceReachabilityCheck := newOpenshiftAPIServiceReachabilityCheck()
oauthAPIServiceReachabilityCheck := newOAuthPIServiceReachabilityCheck()
openshiftAPIServiceReachabilityCheck := newOpenshiftAPIServiceReachabilityCheck(genericConfig.PublicAddress)
oauthAPIServiceReachabilityCheck := newOAuthPIServiceReachabilityCheck(genericConfig.PublicAddress)
genericConfig.ReadyzChecks = append(genericConfig.ReadyzChecks, openshiftAPIServiceReachabilityCheck, oauthAPIServiceReachabilityCheck)

genericConfig.AddPostStartHookOrDie("openshift.io-startkubeinformers", func(context genericapiserver.PostStartHookContext) error {
Expand Down
47 changes: 39 additions & 8 deletions openshift-kube-apiserver/openshiftkubeapiserver/sdn_readyz_wait.go
Expand Up @@ -18,29 +18,34 @@ import (
"k8s.io/klog/v2"
)

func newOpenshiftAPIServiceReachabilityCheck() *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck("openshift-apiserver", "api")
func newOpenshiftAPIServiceReachabilityCheck(ipForKubernetesDefaultService net.IP) *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck(ipForKubernetesDefaultService, "openshift-apiserver", "api")
}

func newOAuthPIServiceReachabilityCheck() *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck("openshift-oauth-apiserver", "api")
func newOAuthPIServiceReachabilityCheck(ipForKubernetesDefaultService net.IP) *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck(ipForKubernetesDefaultService, "openshift-oauth-apiserver", "api")
}

// if the API service is not found, then this check returns quickly.
// if the endpoint is not accessible within 60 seconds, we report ready no matter what
// otherwise, wait for up to 60 seconds to be able to reach the apiserver
func newAggregatedAPIServiceReachabilityCheck(namespace, service string) *aggregatedAPIServiceAvailabilityCheck {
func newAggregatedAPIServiceReachabilityCheck(ipForKubernetesDefaultService net.IP, namespace, service string) *aggregatedAPIServiceAvailabilityCheck {
return &aggregatedAPIServiceAvailabilityCheck{
done: make(chan struct{}),
namespace: namespace,
serviceName: service,
done: make(chan struct{}),
ipForKubernetesDefaultService: ipForKubernetesDefaultService,
namespace: namespace,
serviceName: service,
}
}

type aggregatedAPIServiceAvailabilityCheck struct {
// done indicates that this check is complete (success or failure) and the check should return true
done chan struct{}

// ipForKubernetesDefaultService is used to determine whether this endpoint is the only one for the kubernetes.default.svc
// if so, it will report reachable immediately because honoring some requests is better than honoring no requests.
ipForKubernetesDefaultService net.IP

// namespace is the namespace hosting the service for the aggregated api
namespace string
// serviceName is used to get a list of endpoints to directly dial
Expand Down Expand Up @@ -78,6 +83,32 @@ func (c *aggregatedAPIServiceAvailabilityCheck) checkForConnection(context gener
panic(err)
}

ctx, cancel := gocontext.WithTimeout(gocontext.TODO(), 30*time.Second)
defer cancel()

// if the kubernetes.default.svc needs an endpoint and this is the only apiserver than can fulfill it, then we don't
// wait for reachability. We wait for other conditions, but unreachable apiservers correctly 503 for clients.
kubeEndpoints, err := kubeClient.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
utilruntime.HandleError(fmt.Errorf("%s did not find a kubernetes.default.svc endpoint", c.Name()))
return
case err != nil:
utilruntime.HandleError(fmt.Errorf("%s unable to read a kubernetes.default.svc endpoint: %w", c.Name(), err))
return
case len(kubeEndpoints.Subsets) == 0:
utilruntime.HandleError(fmt.Errorf("%s did not find any IPs for kubernetes.default.svc endpoint", c.Name()))
return
case len(kubeEndpoints.Subsets[0].Addresses) == 0:
utilruntime.HandleError(fmt.Errorf("%s did not find any IPs for kubernetes.default.svc endpoint", c.Name()))
return
case len(kubeEndpoints.Subsets[0].Addresses) == 1:
if kubeEndpoints.Subsets[0].Addresses[0].IP == c.ipForKubernetesDefaultService.String() {
utilruntime.HandleError(fmt.Errorf("%s only found this kube-apiserver's IP (%v) in kubernetes.default.svc endpoint", c.Name(), c.ipForKubernetesDefaultService))
return
}
}

// Start a thread which repeatedly tries to connect to any aggregated apiserver endpoint.
// 1. if the aggregated apiserver endpoint doesn't exist, logs a warning and reports ready
// 2. if a connection cannot be made, after 60 seconds logs an error and reports ready -- this avoids a rebootstrapping cycle
Expand Down

0 comments on commit aaa8bd5

Please sign in to comment.