Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize HTTP lifecycle handlers with HTTP probers #86139

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -192,6 +192,12 @@ const (
// Enables container Checkpoint support in the kubelet
ContainerCheckpoint featuregate.Feature = "ContainerCheckpoint"

// owner: @bhcleek @wzshiming
// GA: v1.25
//
// Normalize HttpGet URL and Header passing for lifecycle handlers with probers.
ConsistentHTTPGetHandlers featuregate.Feature = "ConsistentHTTPGetHandlers"

// owner: @jiahuif
// alpha: v1.21
// beta: v1.22
Expand Down Expand Up @@ -842,6 +848,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha},

ConsistentHTTPGetHandlers: {Default: true, PreRelease: featuregate.GA},

ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26

CronJobTimeZone: {Default: true, PreRelease: featuregate.Beta},
Expand Down
18 changes: 16 additions & 2 deletions pkg/kubelet/kubelet.go
Expand Up @@ -110,6 +110,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
Expand Down Expand Up @@ -475,7 +476,20 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
clusterDNS = append(clusterDNS, ip)
}
}
httpClient := &http.Client{}
jasimmons marked this conversation as resolved.
Show resolved Hide resolved

// A TLS transport is needed to make HTTPS-based container lifecycle requests,
// but we do not have the information necessary to do TLS verification.
liggitt marked this conversation as resolved.
Show resolved Hide resolved
//
// This client must not be modified to include credentials, because it is
// critical that credentials not leak from the client to arbitrary hosts.
insecureContainerLifecycleHTTPClient := &http.Client{}
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) {
insecureTLSTransport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
insecureContainerLifecycleHTTPClient.Transport = insecureTLSTransport
insecureContainerLifecycleHTTPClient.CheckRedirect = httpprobe.RedirectChecker(false)
}

klet := &Kubelet{
hostname: hostname,
Expand Down Expand Up @@ -625,7 +639,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.podWorkers,
kubeDeps.OSInterface,
klet,
httpClient,
insecureContainerLifecycleHTTPClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go
Expand Up @@ -46,12 +46,12 @@ const (
)

type fakeHTTP struct {
url string
req *http.Request
err error
}

func (f *fakeHTTP) Get(url string) (*http.Response, error) {
f.url = url
func (f *fakeHTTP) Do(req *http.Request) (*http.Response, error) {
f.req = req
return nil, f.err
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/kubelet/kuberuntime/helpers_test.go
Expand Up @@ -23,11 +23,18 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimetesting "k8s.io/cri-api/pkg/apis/testing"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)

type podStatusProviderFunc func(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error)

func (f podStatusProviderFunc) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return f(uid, name, namespace)
}

func TestIsInitContainerFailed(t *testing.T) {
tests := []struct {
status *kubecontainer.Status
Expand Down
8 changes: 5 additions & 3 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Expand Up @@ -281,7 +281,8 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
if handlerErr != nil {
klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
// do not record the message in the event so that secrets won't leak from the server.
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, "PostStartHook failed")
jasimmons marked this conversation as resolved.
Show resolved Hide resolved
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
Expand Down Expand Up @@ -586,10 +587,11 @@ func (m *kubeGenericRuntimeManager) executePreStopHook(pod *v1.Pod, containerID
go func() {
defer close(done)
defer utilruntime.HandleCrash()
if msg, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {
if _, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {
klog.ErrorS(err, "PreStop hook failed", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerSpec.Name, "containerID", containerID.String())
m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, msg)
// do not record the message in the event so that secrets won't leak from the server.
m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, "PreStopHook failed")
tallclair marked this conversation as resolved.
Show resolved Hide resolved
}
}()

Expand Down
49 changes: 39 additions & 10 deletions pkg/kubelet/kuberuntime/kuberuntime_container_test.go
Expand Up @@ -28,9 +28,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"

v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -276,11 +281,21 @@ func TestLifeCycleHook(t *testing.T) {

fakeRunner := &containertest.FakeContainerCommandRunner{}
fakeHTTP := &fakeHTTP{}
fakePodStatusProvider := podStatusProviderFunc(func(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return &kubecontainer.PodStatus{
ID: uid,
Name: name,
Namespace: namespace,
IPs: []string{
"127.0.0.1",
},
}, nil
})

lcHanlder := lifecycle.NewHandlerRunner(
fakeHTTP,
fakeRunner,
nil)
fakePodStatusProvider)

m.runner = lcHanlder

Expand All @@ -295,13 +310,27 @@ func TestLifeCycleHook(t *testing.T) {

// Configured and working HTTP hook
t.Run("PreStop-HTTPGet", func(t *testing.T) {
defer func() { fakeHTTP.url = "" }()
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)

if !strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
t.Run("inconsistent", func(t *testing.T) {
defer func() { fakeHTTP.req = nil }()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, false)()
httpLifeCycle.PreStop.HTTPGet.Port = intstr.IntOrString{}
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)

if fakeHTTP.req == nil || !strings.Contains(fakeHTTP.req.URL.String(), httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
})
t.Run("consistent", func(t *testing.T) {
defer func() { fakeHTTP.req = nil }()
httpLifeCycle.PreStop.HTTPGet.Port = intstr.FromInt(80)
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod)

if fakeHTTP.req == nil || !strings.Contains(fakeHTTP.req.URL.String(), httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
})
})

// When there is no time to run PreStopHook
Expand All @@ -313,8 +342,8 @@ func TestLifeCycleHook(t *testing.T) {

m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriodLocal)

if strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Should not execute when gracePeriod is 0")
if fakeHTTP.req != nil {
t.Errorf("HTTP Prestop hook Should not execute when gracePeriod is 0")
}
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/kuberuntime/kuberuntime_manager.go
Expand Up @@ -176,7 +176,7 @@ func NewKubeGenericRuntimeManager(
podStateProvider podStateProvider,
osInterface kubecontainer.OSInterface,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HTTPGetter,
insecureContainerLifecycleHTTPClient types.HTTPDoer,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
imagePullQPS float32,
Expand Down Expand Up @@ -265,7 +265,7 @@ func NewKubeGenericRuntimeManager(
serializeImagePulls,
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.podStateProvider = podStateProvider

Expand Down
96 changes: 78 additions & 18 deletions pkg/kubelet/lifecycle/handlers.go
Expand Up @@ -17,28 +17,35 @@ limitations under the License.
package lifecycle

import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
utilio "k8s.io/utils/io"
)

const (
maxRespBodyLength = 10 * 1 << 10 // 10KB
)

type handlerRunner struct {
httpGetter kubetypes.HTTPGetter
httpDoer kubetypes.HTTPDoer
commandRunner kubecontainer.CommandRunner
containerManager podStatusProvider
}
Expand All @@ -48,9 +55,9 @@ type podStatusProvider interface {
}

// NewHandlerRunner returns a configured lifecycle handler for a container.
func NewHandlerRunner(httpGetter kubetypes.HTTPGetter, commandRunner kubecontainer.CommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
func NewHandlerRunner(httpDoer kubetypes.HTTPDoer, commandRunner kubecontainer.CommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
return &handlerRunner{
httpGetter: httpGetter,
httpDoer: httpDoer,
commandRunner: commandRunner,
containerManager: containerManager,
}
Expand All @@ -68,9 +75,10 @@ func (hr *handlerRunner) Run(containerID kubecontainer.ContainerID, pod *v1.Pod,
}
return msg, err
case handler.HTTPGet != nil:
msg, err := hr.runHTTPHandler(pod, container, handler)
err := hr.runHTTPHandler(pod, container, handler)
var msg string
if err != nil {
msg = fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v, message: %q", handler.HTTPGet.Path, container.Name, format.Pod(pod), err, msg)
msg = fmt.Sprintf("HTTP lifecycle hook (%s) for Container %q in Pod %q failed - error: %v", handler.HTTPGet.Path, container.Name, format.Pod(pod), err)
klog.V(1).ErrorS(err, "HTTP lifecycle hook for Container in Pod failed", "path", handler.HTTPGet.Path, "containerName", container.Name, "pod", klog.KObj(pod))
}
return msg, err
Expand Down Expand Up @@ -105,44 +113,85 @@ func resolvePort(portReference intstr.IntOrString, container *v1.Container) (int
return -1, fmt.Errorf("couldn't find port: %v in %v", portReference, container)
}

func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, handler *v1.LifecycleHandler) (string, error) {
func (hr *handlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, handler *v1.LifecycleHandler) error {
host := handler.HTTPGet.Host
podIP := host
if len(host) == 0 {
status, err := hr.containerManager.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
klog.ErrorS(err, "Unable to get pod info, event handlers may be invalid.", "pod", klog.KObj(pod))
return "", err
return err
}
if len(status.IPs) == 0 {
return "", fmt.Errorf("failed to find networking container: %v", status)
return fmt.Errorf("failed to find networking container: %v", status)
}
host = status.IPs[0]
podIP = host
}

if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentHTTPGetHandlers) {
req, err := httpprobe.NewRequestForHTTPGetAction(handler.HTTPGet, container, podIP, "lifecycle")
if err != nil {
return err
}
resp, err := hr.httpDoer.Do(req)
liggitt marked this conversation as resolved.
Show resolved Hide resolved
discardHTTPRespBody(resp)

if isHTTPResponseError(err) {
// TODO: emit an event about the fallback
// TODO: increment a metric about the fallback
klog.V(1).ErrorS(err, "HTTPS request to lifecycle hook got HTTP response, retrying with HTTP.", "pod", klog.KObj(pod), "host", req.URL.Host)

req := req.Clone(context.Background())
req.URL.Scheme = "http"
req.Header.Del("Authorization")
resp, httpErr := hr.httpDoer.Do(req)

// clear err since the fallback succeeded
if httpErr == nil {
err = nil
}
discardHTTPRespBody(resp)
}
return err
}

// Deprecated code path.
var port int
if handler.HTTPGet.Port.Type == intstr.String && len(handler.HTTPGet.Port.StrVal) == 0 {
port = 80
liggitt marked this conversation as resolved.
Show resolved Hide resolved
} else {
var err error
port, err = resolvePort(handler.HTTPGet.Port, container)
if err != nil {
return "", err
return err
}
}

url := fmt.Sprintf("http://%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), handler.HTTPGet.Path)
liggitt marked this conversation as resolved.
Show resolved Hide resolved
resp, err := hr.httpGetter.Get(url)
return getHTTPRespBody(resp), err
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
resp, err := hr.httpDoer.Do(req)

discardHTTPRespBody(resp)
return err
}

func getHTTPRespBody(resp *http.Response) string {
func discardHTTPRespBody(resp *http.Response) {
if resp == nil {
return ""
return
}

// Ensure the response body is fully read and closed
// before we reconnect, so that we reuse the same TCP
// connection.
defer resp.Body.Close()
bytes, err := utilio.ReadAtMost(resp.Body, maxRespBodyLength)
if err == nil || err == utilio.ErrLimitReached {
return string(bytes)

if resp.ContentLength <= maxRespBodyLength {
io.Copy(io.Discard, &io.LimitedReader{R: resp.Body, N: maxRespBodyLength})
}
return ""
}

// NewAppArmorAdmitHandler returns a PodAdmitHandler which is used to evaluate
Expand Down Expand Up @@ -173,3 +222,14 @@ func (a *appArmorAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
Message: fmt.Sprintf("Cannot enforce AppArmor: %v", err),
}
}

func isHTTPResponseError(err error) bool {
if err == nil {
return false
}
urlErr := &url.Error{}
if !errors.As(err, &urlErr) {
return false
}
return strings.Contains(urlErr.Err.Error(), "server gave HTTP response to HTTPS client")
}