Skip to content
Permalink
Browse files

Merge pull request #82687 from dims/automated-cherry-pick-of-#82514-u…

…pstream-release-1.16-take-2

Automated cherry pick of #82514: Exec probes should not be unbounded
  • Loading branch information...
k8s-ci-robot committed Oct 1, 2019
2 parents 2f76f5e + aefa3d0 commit d647ddbd755faf07169599a625faf302ffc34458
@@ -42,6 +42,8 @@ type streamingRuntime struct {

var _ streaming.Runtime = &streamingRuntime{}

const maxMsgSize = 1024 * 1024 * 16

func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
}
@@ -78,8 +80,8 @@ func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncReq
var stdoutBuffer, stderrBuffer bytes.Buffer
err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd,
nil, // in
ioutils.WriteCloserWrapper(&stdoutBuffer),
ioutils.WriteCloserWrapper(&stderrBuffer),
ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stdoutBuffer, maxMsgSize)),
ioutils.WriteCloserWrapper(ioutils.LimitWriter(&stderrBuffer, maxMsgSize)),
false, // tty
nil, // resize
timeout)
@@ -58,6 +58,7 @@ go_test(
"//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/status/testing:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/probe:go_default_library",
"//pkg/probe/exec:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@@ -252,63 +252,68 @@ func formatURL(scheme string, host string, port int, path string) *url.URL {
type execInContainer struct {
// run executes a command in a container. Combined stdout and stderr output is always returned. An
// error is returned if one occurred.
run func() ([]byte, error)
run func() ([]byte, error)
writer io.Writer
}

func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
return execInContainer{func() ([]byte, error) {
return &execInContainer{run: func() ([]byte, error) {
return pb.runner.RunInContainer(containerID, cmd, timeout)
}}
}

func (eic execInContainer) Run() error {
return fmt.Errorf("unimplemented")
func (eic *execInContainer) Run() error {
return nil
}

func (eic execInContainer) CombinedOutput() ([]byte, error) {
func (eic *execInContainer) CombinedOutput() ([]byte, error) {
return eic.run()
}

func (eic execInContainer) Output() ([]byte, error) {
func (eic *execInContainer) Output() ([]byte, error) {
return nil, fmt.Errorf("unimplemented")
}

func (eic execInContainer) SetDir(dir string) {
func (eic *execInContainer) SetDir(dir string) {
//unimplemented
}

func (eic execInContainer) SetStdin(in io.Reader) {
func (eic *execInContainer) SetStdin(in io.Reader) {
//unimplemented
}

func (eic execInContainer) SetStdout(out io.Writer) {
//unimplemented
func (eic *execInContainer) SetStdout(out io.Writer) {
eic.writer = out
}

func (eic execInContainer) SetStderr(out io.Writer) {
//unimplemented
func (eic *execInContainer) SetStderr(out io.Writer) {
eic.writer = out
}

func (eic execInContainer) SetEnv(env []string) {
func (eic *execInContainer) SetEnv(env []string) {
//unimplemented
}

func (eic execInContainer) Stop() {
func (eic *execInContainer) Stop() {
//unimplemented
}

func (eic execInContainer) Start() error {
return fmt.Errorf("unimplemented")
func (eic *execInContainer) Start() error {
data, err := eic.run()
if eic.writer != nil {
eic.writer.Write(data)
}
return err
}

func (eic execInContainer) Wait() error {
return fmt.Errorf("unimplemented")
func (eic *execInContainer) Wait() error {
return nil
}

func (eic execInContainer) StdoutPipe() (io.ReadCloser, error) {
func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) {
return nil, fmt.Errorf("unimplemented")
}

func (eic execInContainer) StderrPipe() (io.ReadCloser, error) {
func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) {
return nil, fmt.Errorf("unimplemented")
}
@@ -17,10 +17,12 @@ limitations under the License.
package prober

import (
"bytes"
"errors"
"fmt"
"net/http"
"reflect"
"strings"
"testing"

"k8s.io/api/core/v1"
@@ -29,6 +31,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/probe"
execprobe "k8s.io/kubernetes/pkg/probe/exec"
)
@@ -329,23 +332,38 @@ func TestProbe(t *testing.T) {
}

func TestNewExecInContainer(t *testing.T) {
limit := 1024
tenKilobyte := strings.Repeat("logs-123", 128*10)

tests := []struct {
name string
err error
name string
stdout string
expected string
err error
}{
{
name: "no error",
err: nil,
name: "no error",
stdout: "foo",
expected: "foo",
err: nil,
},
{
name: "no error",
stdout: tenKilobyte,
expected: tenKilobyte[0:limit],
err: nil,
},
{
name: "error - make sure we get output",
err: errors.New("bad"),
name: "error - make sure we get output",
stdout: "foo",
expected: "foo",
err: errors.New("bad"),
},
}

for _, test := range tests {
runner := &containertest.FakeContainerCommandRunner{
Stdout: "foo",
Stdout: test.stdout,
Err: test.err,
}
prober := &prober{
@@ -357,15 +375,24 @@ func TestNewExecInContainer(t *testing.T) {
cmd := []string{"/foo", "bar"}
exec := prober.newExecInContainer(container, containerID, cmd, 0)

actualOutput, err := exec.CombinedOutput()
var dataBuffer bytes.Buffer
writer := ioutils.LimitWriter(&dataBuffer, int64(limit))
exec.SetStderr(writer)
exec.SetStdout(writer)
err := exec.Start()
if err == nil {
err = exec.Wait()
}
actualOutput := dataBuffer.Bytes()

if e, a := containerID, runner.ContainerID; e != a {
t.Errorf("%s: container id: expected %v, got %v", test.name, e, a)
}
if e, a := cmd, runner.Cmd; !reflect.DeepEqual(e, a) {
t.Errorf("%s: cmd: expected %v, got %v", test.name, e, a)
}
// this isn't 100% foolproof as a bug in a real ContainerCommandRunner where it fails to copy to stdout/stderr wouldn't be caught by this test
if e, a := "foo", string(actualOutput); e != a {
if e, a := test.expected, string(actualOutput); e != a {
t.Errorf("%s: output: expected %q, got %q", test.name, e, a)
}
if e, a := fmt.Sprintf("%v", test.err), fmt.Sprintf("%v", err); e != a {
@@ -1,9 +1,6 @@
package(default_visibility = ["//visibility:public"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
@@ -23,3 +20,10 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)

go_test(
name = "go_default_test",
srcs = ["ioutils_test.go"],
embed = [":go_default_library"],
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
)
@@ -35,3 +35,36 @@ func (w *writeCloserWrapper) Close() error {
func WriteCloserWrapper(w io.Writer) io.WriteCloser {
return &writeCloserWrapper{w}
}

// LimitWriter is a copy of the standard library ioutils.LimitReader,
// applied to the writer interface.
// LimitWriter returns a Writer that writes to w
// but stops with EOF after n bytes.
// The underlying implementation is a *LimitedWriter.
func LimitWriter(w io.Writer, n int64) io.Writer { return &LimitedWriter{w, n} }

// A LimitedWriter writes to W but limits the amount of
// data returned to just N bytes. Each call to Write
// updates N to reflect the new amount remaining.
// Write returns EOF when N <= 0 or when the underlying W returns EOF.
type LimitedWriter struct {
W io.Writer // underlying writer
N int64 // max bytes remaining
}

func (l *LimitedWriter) Write(p []byte) (n int, err error) {
if l.N <= 0 {
return 0, io.ErrShortWrite
}
truncated := false
if int64(len(p)) > l.N {
p = p[0:l.N]
truncated = true
}
n, err = l.W.Write(p)
l.N -= int64(n)
if err == nil && truncated {
err = io.ErrShortWrite
}
return
}
@@ -0,0 +1,93 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ioutils

import (
"bytes"
"fmt"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
)

func TestLimitWriter(t *testing.T) {
r := rand.New(rand.NewSource(1234)) // Fixed source to prevent flakes.

tests := []struct {
inputSize, limit, writeSize int64
}{
// Single write tests
{100, 101, 100},
{100, 100, 100},
{100, 99, 100},
{1, 1, 1},
{100, 10, 100},
{100, 0, 100},
{100, -1, 100},
// Multi write tests
{100, 101, 10},
{100, 100, 10},
{100, 99, 10},
{100, 10, 10},
{100, 0, 10},
{100, -1, 10},
}

for _, test := range tests {
t.Run(fmt.Sprintf("inputSize=%d limit=%d writes=%d", test.inputSize, test.limit, test.writeSize), func(t *testing.T) {
input := make([]byte, test.inputSize)
r.Read(input)
output := &bytes.Buffer{}
w := LimitWriter(output, test.limit)

var (
err error
written int64
n int
)
for written < test.inputSize && err == nil {
n, err = w.Write(input[written : written+test.writeSize])
written += int64(n)
}

expectWritten := bounded(0, test.inputSize, test.limit)
assert.EqualValues(t, expectWritten, written)
if expectWritten <= 0 {
assert.Empty(t, output)
} else {
assert.Equal(t, input[:expectWritten], output.Bytes())
}

if test.limit < test.inputSize {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func bounded(min, val, max int64) int64 {
if max < val {
val = max
}
if val < min {
val = min
}
return val
}
@@ -11,6 +11,7 @@ go_library(
srcs = ["exec.go"],
importpath = "k8s.io/kubernetes/pkg/probe/exec",
deps = [
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/probe:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",

0 comments on commit d647ddb

Please sign in to comment.
You can’t perform that action at this time.