Skip to content

Commit

Permalink
Merge pull request #86139 from jasimmons/pr_normalize_probes_lifecycl…
Browse files Browse the repository at this point in the history
…e_handlers

Normalize HTTP  lifecycle handlers with HTTP probers
  • Loading branch information
k8s-ci-robot committed Oct 20, 2022
2 parents b21efbb + dfaaa14 commit ad26b31
Show file tree
Hide file tree
Showing 22 changed files with 1,148 additions and 249 deletions.
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -192,6 +192,12 @@ const (
// Enables container Checkpoint support in the kubelet
ContainerCheckpoint featuregate.Feature = "ContainerCheckpoint"

// owner: @bhcleek @wzshiming
// GA: v1.25
//
// Normalize HttpGet URL and Header passing for lifecycle handlers with probers.
ConsistentHTTPGetHandlers featuregate.Feature = "ConsistentHTTPGetHandlers"

// owner: @jiahuif
// alpha: v1.21
// beta: v1.22
Expand Down Expand Up @@ -842,6 +848,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha},

ConsistentHTTPGetHandlers: {Default: true, PreRelease: featuregate.GA},

ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26

CronJobTimeZone: {Default: true, PreRelease: featuregate.Beta},
Expand Down
18 changes: 16 additions & 2 deletions pkg/kubelet/kubelet.go
Expand Up @@ -110,6 +110,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
Expand Down Expand Up @@ -475,7 +476,20 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
clusterDNS = append(clusterDNS, ip)
}
}
httpClient := &http.Client{}

// A TLS transport is needed to make HTTPS-based container lifecycle requests,
// but we do not have the information necessary to do TLS verification.
//
// This client must not be modified to include credentials, because it is
// critical that credentials not leak from the client to arbitrary hosts.
insecureContainerLifecycleHTTPClient := &http.Client{}
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) {
insecureTLSTransport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
insecureContainerLifecycleHTTPClient.Transport = insecureTLSTransport
insecureContainerLifecycleHTTPClient.CheckRedirect = httpprobe.RedirectChecker(false)
}

klet := &Kubelet{
hostname: hostname,
Expand Down Expand Up @@ -625,7 +639,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.podWorkers,
kubeDeps.OSInterface,
klet,
httpClient,
insecureContainerLifecycleHTTPClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go
Expand Up @@ -46,12 +46,12 @@ const (
)

type fakeHTTP struct {
url string
req *http.Request
err error
}

func (f *fakeHTTP) Get(url string) (*http.Response, error) {
f.url = url
func (f *fakeHTTP) Do(req *http.Request) (*http.Response, error) {
f.req = req
return nil, f.err
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/kubelet/kuberuntime/helpers_test.go
Expand Up @@ -23,11 +23,18 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimetesting "k8s.io/cri-api/pkg/apis/testing"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)

type podStatusProviderFunc func(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error)

func (f podStatusProviderFunc) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return f(uid, name, namespace)
}

func TestIsInitContainerFailed(t *testing.T) {
tests := []struct {
status *kubecontainer.Status
Expand Down
8 changes: 5 additions & 3 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Expand Up @@ -281,7 +281,8 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
if handlerErr != nil {
klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
// do not record the message in the event so that secrets won't leak from the server.
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, "PostStartHook failed")
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
Expand Down Expand Up @@ -586,10 +587,11 @@ func (m *kubeGenericRuntimeManager) executePreStopHook(pod *v1.Pod, containerID
go func() {
defer close(done)
defer utilruntime.HandleCrash()
if msg, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {
if _, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {
klog.ErrorS(err, "PreStop hook failed", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerSpec.Name, "containerID", containerID.String())
m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, msg)
// do not record the message in the event so that secrets won't leak from the server.
m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, "PreStopHook failed")
}
}()

Expand Down
49 changes: 39 additions & 10 deletions pkg/kubelet/kuberuntime/kuberuntime_container_test.go
Expand Up @@ -28,9 +28,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"

v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -276,11 +281,21 @@ func TestLifeCycleHook(t *testing.T) {

fakeRunner := &containertest.FakeContainerCommandRunner{}
fakeHTTP := &fakeHTTP{}
fakePodStatusProvider := podStatusProviderFunc(func(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return &kubecontainer.PodStatus{
ID: uid,
Name: name,
Namespace: namespace,
IPs: []string{
"127.0.0.1",
},
}, nil
})

lcHanlder := lifecycle.NewHandlerRunner(
fakeHTTP,
fakeRunner,
nil)
fakePodStatusProvider)

m.runner = lcHanlder

Expand All @@ -295,13 +310,27 @@ func TestLifeCycleHook(t *testing.T) {

// Configured and working HTTP hook
t.Run("PreStop-HTTPGet", func(t *testing.T) {
defer func() { fakeHTTP.url = "" }()
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)

if !strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
t.Run("inconsistent", func(t *testing.T) {
defer func() { fakeHTTP.req = nil }()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, false)()
httpLifeCycle.PreStop.HTTPGet.Port = intstr.IntOrString{}
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)

if fakeHTTP.req == nil || !strings.Contains(fakeHTTP.req.URL.String(), httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
})
t.Run("consistent", func(t *testing.T) {
defer func() { fakeHTTP.req = nil }()
httpLifeCycle.PreStop.HTTPGet.Port = intstr.FromInt(80)
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)

if fakeHTTP.req == nil || !strings.Contains(fakeHTTP.req.URL.String(), httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
})
})

// When there is no time to run PreStopHook
Expand All @@ -313,8 +342,8 @@ func TestLifeCycleHook(t *testing.T) {

m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriodLocal)

if strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Should not execute when gracePeriod is 0")
if fakeHTTP.req != nil {
t.Errorf("HTTP Prestop hook Should not execute when gracePeriod is 0")
}
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/kuberuntime/kuberuntime_manager.go
Expand Up @@ -176,7 +176,7 @@ func NewKubeGenericRuntimeManager(
podStateProvider podStateProvider,
osInterface kubecontainer.OSInterface,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HTTPGetter,
insecureContainerLifecycleHTTPClient types.HTTPDoer,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
imagePullQPS float32,
Expand Down Expand Up @@ -265,7 +265,7 @@ func NewKubeGenericRuntimeManager(
serializeImagePulls,
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.podStateProvider = podStateProvider

Expand Down
96 changes: 78 additions & 18 deletions pkg/kubelet/lifecycle/handlers.go
Expand Up @@ -17,28 +17,35 @@ limitations under the License.
package lifecycle

import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
utilio "k8s.io/utils/io"
)

const (
maxRespBodyLength = 10 * 1 << 10 // 10KB
)

type handlerRunner struct {
httpGetter kubetypes.HTTPGetter
httpDoer kubetypes.HTTPDoer
commandRunner kubecontainer.CommandRunner
containerManager podStatusProvider
}
Expand All @@ -48,9 +55,9 @@ type podStatusProvider interface {
}

// NewHandlerRunner returns a configured lifecycle handler for a container.
func NewHandlerRunner(httpGetter kubetypes.HTTPGetter, commandRunner kubecontainer.CommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
func NewHandlerRunner(httpDoer kubetypes.HTTPDoer, commandRunner kubecontainer.CommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
return &handlerRunner{
httpGetter: httpGetter,
httpDoer: httpDoer,
commandRunner: commandRunner,
containerManager: containerManager,
}
Expand All @@ -68,9 +75,10 @@ func (hr *handlerRunner) Run(containerID kubecontainer.ContainerID, pod *v1.Pod,
}
return msg, err
case handler.HTTPGet != nil:
msg, err := hr.runHTTPHandler(pod, container, handler)
err := hr.runHTTPHandler(pod, container, handler)
var msg string
if err != nil {
msg = fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v, message: %q", handler.HTTPGet.Path, container.Name, format.Pod(pod), err, msg)
msg = fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v", handler.HTTPGet.Path, container.Name, format.Pod(pod), err)
klog.V(1).ErrorS(err, "HTTP lifecycle hook for Container in Pod failed", "path", handler.HTTPGet.Path, "containerName", container.Name, "pod", klog.KObj(pod))
}
return msg, err
Expand Down Expand Up @@ -105,44 +113,85 @@ func resolvePort(portReference intstr.IntOrString, container *v1.Container) (int
return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container)
}

func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, handler *v1.LifecycleHandler) (string, error) {
func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, handler *v1.LifecycleHandler) error {
host := handler.HTTPGet.Host
podIP := host
if len(host) == 0 {
status, err := hr.containerManager.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
klog.ErrorS(err, "Unable to get pod info, event handlers may be invalid.", "pod", klog.KObj(pod))
return "", err
return err
}
if len(status.IPs) == 0 {
return "", fmt.Errorf("failed to find networking container: %v", status)
return fmt.Errorf("failed to find networking container: %v", status)
}
host = status.IPs[0]
podIP = host
}

if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) {
req, err := httpprobe.NewRequestForHTTPGetAction(handler.HTTPGet, container, podIP, "lifecycle")
if err != nil {
return err
}
resp, err := hr.httpDoer.Do(req)
discardHTTPRespBody(resp)

if isHTTPResponseError(err) {
// TODO: emit an event about the fallback
// TODO: increment a metric about the fallback
klog.V(1).ErrorS(err, "HTTPS request to lifecycle hook got HTTP response, retrying with HTTP.", "pod", klog.KObj(pod), "host", req.URL.Host)

req := req.Clone(context.Background())
req.URL.Scheme = "http"
req.Header.Del("Authorization")
resp, httpErr := hr.httpDoer.Do(req)

// clear err since the fallback succeeded
if httpErr == nil {
err = nil
}
discardHTTPRespBody(resp)
}
return err
}

// Deprecated code path.
var port int
if handler.HTTPGet.Port.Type == intstr.String && len(handler.HTTPGet.Port.StrVal) == 0 {
port = 80
} else {
var err error
port, err = resolvePort(handler.HTTPGet.Port, container)
if err != nil {
return "", err
return err
}
}

url := fmt.Sprintf("http://%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), handler.HTTPGet.Path)
resp, err := hr.httpGetter.Get(url)
return getHTTPRespBody(resp), err
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
resp, err := hr.httpDoer.Do(req)

discardHTTPRespBody(resp)
return err
}

func getHTTPRespBody(resp *http.Response) string {
func discardHTTPRespBody(resp *http.Response) {
if resp == nil {
return ""
return
}

// Ensure the response body is fully read and closed
// before we reconnect, so that we reuse the same TCP
// connection.
defer resp.Body.Close()
bytes, err := utilio.ReadAtMost(resp.Body, maxRespBodyLength)
if err == nil || err == utilio.ErrLimitReached {
return string(bytes)

if resp.ContentLength <= maxRespBodyLength {
io.Copy(io.Discard, &io.LimitedReader{R: resp.Body, N: maxRespBodyLength})
}
return ""
}

// NewAppArmorAdmitHandler returns a PodAdmitHandler which is used to evaluate
Expand Down Expand Up @@ -173,3 +222,14 @@ func (a *appArmorAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
Message: fmt.Sprintf("Cannot enforce AppArmor: %v", err),
}
}

func isHTTPResponseError(err error) bool {
if err == nil {
return false
}
urlErr := &url.Error{}
if !errors.As(err, &urlErr) {
return false
}
return strings.Contains(urlErr.Err.Error(), "server gave HTTP response to HTTPS client")
}

0 comments on commit ad26b31

Please sign in to comment.