Skip to content

Commit

Permalink
Always check connectivity to overlay network
Browse files Browse the repository at this point in the history
When kube-apiserver is no longer able connect overlay network
where oAuth PODs resides it still remains active, but cannot
properly handle requests. In that case kube-apiserver should
be marked as not ready by one of the readyz checks.
Current checks only verify connectivy once after
kube-apiserver started. Modify checks to constantly monitor
connection to overlay network.
  • Loading branch information
michalskalski committed Feb 7, 2022
1 parent 42a86de commit 3965bae
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 84 deletions.
5 changes: 3 additions & 2 deletions openshift-kube-apiserver/openshiftkubeapiserver/patch.go
Expand Up @@ -100,8 +100,9 @@ func OpenShiftKubeAPIServerConfigPatch(genericConfig *genericapiserver.Config, k
}
// END HANDLER CHAIN

openshiftAPIServiceReachabilityCheck := newOpenshiftAPIServiceReachabilityCheck()
oauthAPIServiceReachabilityCheck := newOAuthPIServiceReachabilityCheck()
apiServerIP := os.Getenv("HOST_IP")
openshiftAPIServiceReachabilityCheck := newOpenshiftAPIServiceReachabilityCheck(apiServerIP)
oauthAPIServiceReachabilityCheck := newOAuthPIServiceReachabilityCheck(apiServerIP)
genericConfig.ReadyzChecks = append(genericConfig.ReadyzChecks, openshiftAPIServiceReachabilityCheck, oauthAPIServiceReachabilityCheck)

genericConfig.AddPostStartHookOrDie("openshift.io-startkubeinformers", func(context genericapiserver.PostStartHookContext) error {
Expand Down
143 changes: 61 additions & 82 deletions openshift-kube-apiserver/openshiftkubeapiserver/sdn_readyz_wait.go
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"net/http"
"net/http/httputil"
"sync/atomic"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -18,73 +19,55 @@ import (
"k8s.io/klog/v2"
)

func newOpenshiftAPIServiceReachabilityCheck() *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck("openshift-apiserver", "api")
func newOpenshiftAPIServiceReachabilityCheck(apiSeverIP string) *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck("openshift-apiserver", "api", apiSeverIP)
}

func newOAuthPIServiceReachabilityCheck() *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck("openshift-oauth-apiserver", "api")
func newOAuthPIServiceReachabilityCheck(apiServerIP string) *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck("openshift-oauth-apiserver", "api", apiServerIP)
}

// if the API service is not found, then this check returns quickly.
// if the endpoint is not accessible within 60 seconds, we report ready no matter what
// otherwise, wait for up to 60 seconds to be able to reach the apiserver
func newAggregatedAPIServiceReachabilityCheck(namespace, service string) *aggregatedAPIServiceAvailabilityCheck {
return &aggregatedAPIServiceAvailabilityCheck{
done: make(chan struct{}),
func newAggregatedAPIServiceReachabilityCheck(namespace, service, apiServerIP string) *aggregatedAPIServiceAvailabilityCheck {
check := &aggregatedAPIServiceAvailabilityCheck{
namespace: namespace,
serviceName: service,
ip: apiServerIP,
}
check.readyzErrorMessage.Store("waiting for endpoint verification")
return check
}

type aggregatedAPIServiceAvailabilityCheck struct {
// done indicates that this check is complete (success or failure) and the check should return true
done chan struct{}

// readyzErrorMessage is not empty string when api endpoint can't be reach
readyzErrorMessage atomic.Value
// namespace is the namespace hosting the service for the aggregated api
namespace string
// serviceName is used to get a list of endpoints to directly dial
serviceName string
// IP address of apiserver where check is run
ip string
}

func (c *aggregatedAPIServiceAvailabilityCheck) Name() string {
return fmt.Sprintf("%s-%s-available", c.serviceName, c.namespace)
}

func (c *aggregatedAPIServiceAvailabilityCheck) Check(req *http.Request) error {
select {
case <-c.done:
return nil
default:
return fmt.Errorf("check is not yet complete")
if errMsg := c.readyzErrorMessage.Load().(string); len(errMsg) > 0 {
return fmt.Errorf(errMsg)
}
return nil
}

func (c *aggregatedAPIServiceAvailabilityCheck) checkForConnection(context genericapiserver.PostStartHookContext) {
defer utilruntime.HandleCrash()

reachedAggregatedAPIServer := make(chan struct{})
noAggregatedAPIServer := make(chan struct{})
waitUntilCh := make(chan struct{})
defer func() {
close(waitUntilCh) // this stops the endpoint check
close(c.done) // once this method is done, the ready check should return true
}()
start := time.Now()

kubeClient, err := kubernetes.NewForConfig(context.LoopbackClientConfig)
if err != nil {
// shouldn't happen. this means the loopback config didn't work.
panic(err)
}

// Start a thread which repeatedly tries to connect to any aggregated apiserver endpoint.
// 1. if the aggregated apiserver endpoint doesn't exist, logs a warning and reports ready
// 2. if a connection cannot be made, after 60 seconds logs an error and reports ready -- this avoids a rebootstrapping cycle
// 3. as soon as a connection can be made, logs a time to be ready and reports ready.
go func() {
defer utilruntime.HandleCrash()

go wait.Until(func() {
client := http.Client{
Transport: &http.Transport{
// since any http return code satisfies us, we don't bother to send credentials.
Expand All @@ -93,54 +76,50 @@ func (c *aggregatedAPIServiceAvailabilityCheck) checkForConnection(context gener
},
Timeout: 1 * time.Second, // these should all be very fast. if none work, we continue anyway.
}

wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
ctx := gocontext.TODO()
openshiftEndpoints, err := kubeClient.CoreV1().Endpoints(c.namespace).Get(ctx, c.serviceName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// if we have no aggregated apiserver endpoint, we have no reason to wait
klog.Warningf("%s.%s.svc endpoints were not found", c.serviceName, c.namespace)
close(noAggregatedAPIServer)
return true, nil
}
if err != nil {
utilruntime.HandleError(err)
return false, nil
ctx := gocontext.TODO()
kubeApiEndpoints, err := kubeClient.CoreV1().Endpoints("openshift-kube-apiserver").Get(ctx, "apiserver", metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// bootstrap case
klog.Warningf("apiserver.openshift-kube-apiserver.svc endpoints were not found")
c.readyzErrorMessage.Store("")
return
}
otherEndpointExist := false
for _, subset := range kubeApiEndpoints.Subsets {
for _, address := range subset.Addresses {
if address.IP != c.ip {
otherEndpointExist = true
break
}
}
for _, subset := range openshiftEndpoints.Subsets {
for _, address := range subset.Addresses {
url := fmt.Sprintf("https://%v", net.JoinHostPort(address.IP, "8443"))
resp, err := client.Get(url)
if err == nil { // any http response is fine. it means that we made contact
response, dumpErr := httputil.DumpResponse(resp, true)
klog.V(4).Infof("reached to connect to %q: %v\n%v", url, dumpErr, string(response))
close(reachedAggregatedAPIServer)
resp.Body.Close()
return true, nil
}
klog.V(2).Infof("failed to connect to %q: %v", url, err)
}
if !otherEndpointExist {
// bootstrap case
klog.V(2).Infof("Only %s registered as apiserver endpoint, set ready for this check", c.ip)
c.readyzErrorMessage.Store("")
return
}
openshiftEndpoints, err := kubeClient.CoreV1().Endpoints(c.namespace).Get(ctx, c.serviceName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// if we have no aggregated apiserver endpoint, we have no reason to wait
klog.Warningf("%s.%s.svc endpoints were not found", c.serviceName, c.namespace)
c.readyzErrorMessage.Store("")
return
}
for _, subset := range openshiftEndpoints.Subsets {
for _, address := range subset.Addresses {
url := fmt.Sprintf("https://%v", net.JoinHostPort(address.IP, "8443"))
resp, err := client.Get(url)
if err == nil { // any http response is fine. it means that we made contact
response, dumpErr := httputil.DumpResponse(resp, true)
klog.V(4).Infof("reached to connect to %q: %v\n%v", url, dumpErr, string(response))
c.readyzErrorMessage.Store("")
resp.Body.Close()
return
}
klog.V(2).Infof("failed to connect to %q: %v", url, err)
}

return false, nil
}, waitUntilCh)
}()

select {
case <-time.After(60 * time.Second):
// if we timeout, always return ok so that we can start from a case where all kube-apiservers are down and the SDN isn't coming up
utilruntime.HandleError(fmt.Errorf("%s never reached apiserver", c.Name()))
return
case <-context.StopCh:
utilruntime.HandleError(fmt.Errorf("%s interrupted", c.Name()))
return
case <-noAggregatedAPIServer:
utilruntime.HandleError(fmt.Errorf("%s did not find an %s endpoint", c.Name(), c.namespace))
return

case <-reachedAggregatedAPIServer:
end := time.Now()
klog.Infof("reached %s via SDN after %v milliseconds", c.namespace, end.Sub(start).Milliseconds())
return
}
}
c.readyzErrorMessage.Store(fmt.Sprintf("%s not reachable", c.Name()))
}, 10*time.Second, context.StopCh)
}

0 comments on commit 3965bae

Please sign in to comment.