Skip to content

Commit

Permalink
Remove RunInContainer interface in Kuberlete Runtime interface
Browse files Browse the repository at this point in the history
  • Loading branch information
feiskyer committed May 1, 2016
1 parent ae5065a commit 831203c
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 100 deletions.
3 changes: 0 additions & 3 deletions pkg/kubelet/container/runtime.go
Expand Up @@ -112,9 +112,6 @@ type ContainerAttacher interface {

// CommandRunner encapsulates the command runner interfaces for testability.
type ContainerCommandRunner interface {
// TODO(vmarmol): Merge RunInContainer and ExecInContainer.
// Runs the command in the container of the specified pod.
RunInContainer(containerID ContainerID, cmd []string) ([]byte, error)
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
Expand Down
8 changes: 0 additions & 8 deletions pkg/kubelet/container/testing/fake_runtime.go
Expand Up @@ -276,14 +276,6 @@ func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader,
return f.Err
}

func (f *FakeRuntime) RunInContainer(containerID ContainerID, cmd []string) ([]byte, error) {
f.Lock()
defer f.Unlock()

f.CalledFunctions = append(f.CalledFunctions, "RunInContainer")
return []byte{}, f.Err
}

func (f *FakeRuntime) GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
f.Lock()
defer f.Unlock()
Expand Down
5 changes: 0 additions & 5 deletions pkg/kubelet/container/testing/runtime_mock.go
Expand Up @@ -98,11 +98,6 @@ func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout,
return args.Error(0)
}

func (r *Mock) RunInContainer(containerID ContainerID, cmd []string) ([]byte, error) {
args := r.Called(containerID, cmd)
return args.Get(0).([]byte), args.Error(1)
}

func (r *Mock) GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
args := r.Called(pod, containerID, logOptions, stdout, stderr)
return args.Error(0)
Expand Down
47 changes: 0 additions & 47 deletions pkg/kubelet/dockertools/manager.go
Expand Up @@ -982,53 +982,6 @@ func (dm *DockerManager) defaultSecurityOpt() ([]string, error) {
return nil, nil
}

// RunInContainer run the command inside the container identified by containerID
func (dm *DockerManager) RunInContainer(containerID kubecontainer.ContainerID, cmd []string) ([]byte, error) {
glog.V(2).Infof("Using docker native exec to run cmd %+v inside container %s", cmd, containerID)
createOpts := dockertypes.ExecConfig{
Cmd: cmd,
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
}
execObj, err := dm.client.CreateExec(containerID.ID, createOpts)
if err != nil {
return nil, fmt.Errorf("failed to run in container - Exec setup failed - %v", err)
}
var buf bytes.Buffer
startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: false}
streamOpts := StreamOptions{
OutputStream: &buf,
ErrorStream: &buf,
RawTerminal: false,
}
err = dm.client.StartExec(execObj.ID, startOpts, streamOpts)
if err != nil {
glog.V(2).Infof("StartExec With error: %v", err)
return nil, err
}
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
inspect, err2 := dm.client.InspectExec(execObj.ID)
if err2 != nil {
glog.V(2).Infof("InspectExec %s failed with error: %+v", execObj.ID, err2)
return buf.Bytes(), err2
}
if !inspect.Running {
if inspect.ExitCode != 0 {
glog.V(2).Infof("InspectExec %s exit with result %+v", execObj.ID, inspect)
err = &dockerExitError{inspect}
}
break
}
<-ticker.C
}

return buf.Bytes(), err
}

type dockerExitError struct {
Inspect *dockertypes.ContainerExecInspect
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/dockertools/manager_test.go
Expand Up @@ -477,7 +477,7 @@ func TestKillContainerInPodWithPreStop(t *testing.T) {
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
t.Errorf("container was not stopped correctly: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "create_exec", "start_exec", "stop"})
verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "create_exec", "start_exec", "stop"})
if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) {
t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd)
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/kubelet/kubelet.go
Expand Up @@ -64,6 +64,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
Expand Down Expand Up @@ -3456,7 +3457,15 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return kl.runner.RunInContainer(container.ID, cmd)

var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false)
if err != nil {
return nil, err
}

return buffer.Bytes(), nil
}

// ExecInContainer executes a command in a container, connecting the supplied
Expand Down
6 changes: 0 additions & 6 deletions pkg/kubelet/kubelet_test.go
Expand Up @@ -1026,12 +1026,6 @@ type fakeContainerCommandRunner struct {
Stream io.ReadWriteCloser
}

func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) {
f.Cmd = cmd
f.ID = id
return []byte{}, f.E
}

func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
f.Cmd = cmd
f.ID = id
Expand Down
6 changes: 5 additions & 1 deletion pkg/kubelet/lifecycle/handlers.go
Expand Up @@ -21,10 +21,12 @@ import (
"net"
"strconv"

"bytes"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/intstr"
)
Expand All @@ -50,7 +52,9 @@ func NewHandlerRunner(httpGetter kubetypes.HttpGetter, commandRunner kubecontain
func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container, handler *api.Handler) error {
switch {
case handler.Exec != nil:
_, err := hr.commandRunner.RunInContainer(containerID, handler.Exec.Command)
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false)
return err
case handler.HTTPGet != nil:
return hr.runHTTPHandler(pod, container, handler)
Expand Down
6 changes: 1 addition & 5 deletions pkg/kubelet/lifecycle/handlers_test.go
Expand Up @@ -77,13 +77,9 @@ type fakeContainerCommandRunner struct {
ID kubecontainer.ContainerID
}

func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) {
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
f.Cmd = cmd
f.ID = id
return []byte{}, nil
}

func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
return nil
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/kubelet/prober/prober.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package prober

import (
"bytes"
"fmt"
"net"
"net/http"
Expand All @@ -30,6 +31,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/probe"
execprobe "k8s.io/kubernetes/pkg/probe/exec"
httprobe "k8s.io/kubernetes/pkg/probe/http"
Expand Down Expand Up @@ -219,7 +221,14 @@ type execInContainer struct {

func (p *prober) newExecInContainer(container api.Container, containerID kubecontainer.ContainerID, cmd []string) exec.Cmd {
return execInContainer{func() ([]byte, error) {
return p.runner.RunInContainer(containerID, cmd)
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false)
if err != nil {
return nil, err
}

return buffer.Bytes(), nil
}}
}

Expand Down
22 changes: 0 additions & 22 deletions pkg/kubelet/rkt/rkt.go
Expand Up @@ -1463,28 +1463,6 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error
return nil
}

// Note: In rkt, the container ID is in the form of "UUID:appName", where
// appName is the container name.
// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
func (r *Runtime) RunInContainer(containerID kubecontainer.ContainerID, cmd []string) ([]byte, error) {
glog.V(4).Infof("Rkt running in container.")

id, err := parseContainerID(containerID)
if err != nil {
return nil, err
}
args := append([]string{}, "enter", fmt.Sprintf("--app=%s", id.appName), id.uuid)
args = append(args, cmd...)

result, err := r.buildCommand(args...).CombinedOutput()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
err = &rktExitError{exitErr}
}
}
return result, err
}

// rktExitError implemets /pkg/util/exec.ExitError interface.
type rktExitError struct{ *exec.ExitError }

Expand Down
37 changes: 37 additions & 0 deletions pkg/kubelet/util/ioutils/ioutils.go
@@ -0,0 +1,37 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ioutils

import "io"

// writeCloserWrapper represents a WriteCloser whose closer operation is noop.
type writeCloserWrapper struct {
Writer io.Writer
}

func (w *writeCloserWrapper) Write(buf []byte) (int, error) {
return w.Writer.Write(buf)
}

func (w *writeCloserWrapper) Close() error {
return nil
}

// WriteCloserWrapper returns a writeCloserWrapper.
func WriteCloserWrapper(w io.Writer) io.WriteCloser {
return &writeCloserWrapper{w}
}

0 comments on commit 831203c

Please sign in to comment.