Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement streaming CRI methods in dockershim #35661

Merged
merged 1 commit into from Oct 29, 2016
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -17,7 +17,7 @@ limitations under the License.
package api

import (
"io"
"time"

runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
)
@@ -43,8 +43,13 @@ type ContainerManager interface {
ListContainers(filter *runtimeApi.ContainerFilter) ([]*runtimeApi.Container, error)
// ContainerStatus returns the status of the container.
ContainerStatus(containerID string) (*runtimeApi.ContainerStatus, error)
// Exec executes a command in the container.
Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)

This comment has been minimized.

Copy link
@feiskyer

feiskyer Oct 27, 2016

Member

exit code is also needed here, some applications may check exit code instead of error messages.

This comment has been minimized.

Copy link
@feiskyer

feiskyer Oct 27, 2016

Member

Update: exit code is not required here because it is handled by utilexec.ExitError

This comment has been minimized.

Copy link
@timstclair
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error)

This comment has been minimized.

Copy link
@feiskyer

feiskyer Oct 27, 2016

Member

just returns string (indicates the url) instead of ExecResponse here? (same with Attach and PortForward)

This comment has been minimized.

Copy link
@timstclair

timstclair Oct 27, 2016

Author

I see these methods delegating entirely to the server library (see implementation). Using the request / response here makes it easier to simply pass through to the library. I'm open to changing it, but I think we should change the library implementation to mirror this API in that case.

// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error)
}

// PodSandboxManager contains methods for operating on PodSandboxes. The methods
@@ -63,6 +68,8 @@ type PodSandboxManager interface {
PodSandboxStatus(podSandboxID string) (*runtimeApi.PodSandboxStatus, error)
// ListPodSandbox returns a list of Sandbox.
ListPodSandbox(filter *runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error)
}

// RuntimeService interface should be implemented by a container runtime.
@@ -18,7 +18,6 @@ package testing

import (
"fmt"
"io"
"reflect"
"sync"
"time"
@@ -214,6 +213,14 @@ func (r *FakeRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandboxFilter)
return result, nil
}

func (r *FakeRuntimeService) PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
r.Lock()
defer r.Unlock()

r.Called = append(r.Called, "PortForward")
return &runtimeApi.PortForwardResponse{}, nil
}

func (r *FakeRuntimeService) CreateContainer(podSandboxID string, config *runtimeApi.ContainerConfig, sandboxConfig *runtimeApi.PodSandboxConfig) (string, error) {
r.Lock()
defer r.Unlock()
@@ -349,12 +356,28 @@ func (r *FakeRuntimeService) ContainerStatus(containerID string) (*runtimeApi.Co
return &status, nil
}

func (r *FakeRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error {
func (r *FakeRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
r.Lock()
defer r.Unlock()

r.Called = append(r.Called, "ExecSync")
return nil, nil, nil
}

func (r *FakeRuntimeService) Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
r.Lock()
defer r.Unlock()

r.Called = append(r.Called, "Exec")
return nil
return &runtimeApi.ExecResponse{}, nil
}

func (r *FakeRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
r.Lock()
defer r.Unlock()

r.Called = append(r.Called, "Attach")
return &runtimeApi.AttachResponse{}, nil
}

func (r *FakeRuntimeService) UpdateRuntimeConfig(runtimeCOnfig *runtimeApi.RuntimeConfig) error {
@@ -19,6 +19,7 @@ go_library(
"docker_image.go",
"docker_sandbox.go",
"docker_service.go",
"docker_streaming.go",
"helpers.go",
"legacy.go",
"naming.go",
@@ -32,7 +33,9 @@ go_library(
"//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/leaky:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/util/term:go_default_library",
"//vendor:github.com/docker/engine-api/types",
"//vendor:github.com/docker/engine-api/types/container",
@@ -18,7 +18,6 @@ package dockershim

import (
"fmt"
"io"
"os"
"path/filepath"
"time"
@@ -388,10 +387,3 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeApi.Contai
Annotations: annotations,
}, nil
}

// Exec execute a command in the container.
// TODO: Need to handle terminal resizing before implementing this function.
// https://github.com/kubernetes/kubernetes/issues/29579.
func (ds *dockerService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error {
return fmt.Errorf("not implemented")
}
@@ -25,6 +25,7 @@ import (
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/util/term"
)

@@ -55,13 +56,27 @@ const (
var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey}

// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string) DockerService {
return &dockerService{
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string, streamingConfig *streaming.Config) (DockerService, error) {
ds := &dockerService{
seccompProfileRoot: seccompProfileRoot,
client: dockertools.NewInstrumentedDockerInterface(client),
os: kubecontainer.RealOS{},
podSandboxImage: podSandboxImage,
streamingRuntime: &streamingRuntime{
client: client,
// Only the native exec handling is supported for now.
// TODO(#35747) - Either deprecate nsenter exec handling, or add support for it here.
execHandler: &dockertools.NativeExecHandler{},
},
}
if streamingConfig != nil {
var err error
ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
if err != nil {
return nil, err
}
}
return ds, nil
}

// DockerService is an interface that embeds both the new RuntimeService and
@@ -78,18 +93,18 @@ type DockerService interface {
type DockerLegacyService interface {
// Supporting legacy methods for docker.
GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
kubecontainer.ContainerAttacher
PortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error

// TODO: Remove this once exec is properly defined in CRI.
ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error
}

type dockerService struct {
seccompProfileRoot string
client dockertools.DockerInterface
os kubecontainer.OSInterface
podSandboxImage string
streamingRuntime *streamingRuntime
streamingServer streaming.Server
}

// Version returns the runtime name, runtime version and runtime API version
@@ -0,0 +1,149 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dockershim

import (
"bytes"
"fmt"
"io"
"math"
"time"

dockertypes "github.com/docker/engine-api/types"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/util/term"
)

type streamingRuntime struct {
client dockertools.DockerInterface
execHandler dockertools.ExecHandler
}

var _ streaming.Runtime = &streamingRuntime{}

func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
}

// Internal version of Exec adds a timeout.
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
container, err := checkContainerStatus(r.client, containerID)
if err != nil {
return err
}

// TODO(timstclair): Clean this up once PR#33366 merges.

This comment has been minimized.

Copy link
@yujuhong

yujuhong Oct 27, 2016

Member

#33366 doesn't implement the timeout, are you going to move code below to ExecInContainer?

This comment has been minimized.

Copy link
@timstclair

timstclair Oct 28, 2016

Author

Oh yeah, I was thinking d82bc85 was part of #33366. I don't think this method should be responsible for underlying handling of the timeout, so I can implement it in the native exec handler in a separate PR. /cc @rhcarvalho

This comment has been minimized.

Copy link
@rhcarvalho

rhcarvalho Oct 31, 2016

Contributor

#27956 has the code implementing the timeout, though we are not able to implement proper timeouts using the Docker API.

To speed up the discussion of timeout in the func signature vs. what respects the timeout, we extracted #33366 to do the former, then we can bring the actual implementations.

FYI this is an effort to fix #26895.

if timeout <= 0 {
// Run until command exits.
return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize)
}

errCh := make(chan error)
go func() {
errCh <- r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize)
}()

select {
case err := <-errCh:
return err
case <-time.After(timeout):
return streaming.ErrorTimeout("exec", timeout)

This comment has been minimized.

Copy link
@rhcarvalho

rhcarvalho Oct 31, 2016

Contributor

@timstclair this code path leaks a goroutine, because the send operation in line 60 will never succeed without a receive. This is a common pattern where we make errCh buffered, with size 1.

errCh := make(chan error, 1)

And if we expect frequent calls (and I think we do, probes are executed often), then https://golang.org/pkg/time/#After is not very efficient because we don't cleanup until the timer fires (even though we might return before the timeout...)

This comment has been minimized.

Copy link
@feiskyer

feiskyer Oct 31, 2016

Member

@rhcarvalho File an issue for the bug?

This comment has been minimized.

Copy link
@timstclair

timstclair Oct 31, 2016

Author

Good catch, I'll send a pr to fix it.

}
}

func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, resize <-chan term.Size) error {
container, err := checkContainerStatus(r.client, containerID)
if err != nil {
return err
}

tty := container.Config.Tty
return dockertools.AttachContainer(r.client, containerID, in, out, errw, tty, resize)
}

func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
if port < 0 || port > math.MaxUint16 {
return fmt.Errorf("invalid port %d", port)
}
return dockertools.PortForward(r.client, podSandboxID, uint16(port), stream)
}

// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (ds *dockerService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
var stdoutBuffer, stderrBuffer bytes.Buffer
err = ds.streamingRuntime.exec(containerID, cmd,
nil, // in
ioutils.WriteCloserWrapper(&stdoutBuffer),
ioutils.WriteCloserWrapper(&stderrBuffer),
false, // tty
nil, // resize
timeout)
return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err
}

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("exec")
}
_, err := checkContainerStatus(ds.client, req.GetContainerId())
if err != nil {
return nil, err
}
return ds.streamingServer.GetExec(req)
}

// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (ds *dockerService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("attach")
}
container, err := checkContainerStatus(ds.client, req.GetContainerId())
if err != nil {
return nil, err
}
tty := container.Config.Tty
return ds.streamingServer.GetAttach(req, tty)

This comment has been minimized.

Copy link
@yujuhong

yujuhong Oct 27, 2016

Member

Hm...I checked the code again, GetAttach embeds the tty option in its URL, but serveAttach never uses. Seems redundant, no?

This comment has been minimized.

Copy link
@timstclair

timstclair Oct 28, 2016

Author

It's used to establish the resize stream, here:

if opts.tty && handler.supportsTerminalResizing() {

}

// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (ds *dockerService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("port forward")
}
_, err := checkContainerStatus(ds.client, req.GetPodSandboxId())
if err != nil {
return nil, err
}
// TODO(timstclair): Verify that ports are exposed.
return ds.streamingServer.GetPortForward(req)
}

func checkContainerStatus(client dockertools.DockerInterface, containerID string) (*dockertypes.ContainerJSON, error) {
container, err := client.InspectContainer(containerID)
if err != nil {
return nil, err
}
if !container.State.Running {
return nil, fmt.Errorf("container not running (%s)", container.ID)
}
return container, nil
}
@@ -17,7 +17,6 @@ limitations under the License.
package dockershim

import (
"fmt"
"io"

"k8s.io/kubernetes/pkg/api"
@@ -31,8 +30,8 @@ import (
// directly.

// TODO: implement the methods in this file.
func (ds *dockerService) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
return dockertools.AttachContainer(ds.client, id, stdin, stdout, stderr, tty, resize)
func (ds *dockerService) LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
return ds.streamingRuntime.Attach(id.ID, stdin, stdout, stderr, resize)
}

func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
@@ -43,19 +42,10 @@ func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontaine
return dockertools.GetContainerLogs(ds.client, pod, containerID, logOptions, stdout, stderr, container.Config.Tty)
}

func (ds *dockerService) PortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error {
return dockertools.PortForward(ds.client, sandboxID, port, stream)
func (ds *dockerService) LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error {
return ds.streamingRuntime.PortForward(sandboxID, int32(port), stream)
}

func (ds *dockerService) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
container, err := ds.client.InspectContainer(containerID.ID)
if err != nil {
return err
}
if !container.State.Running {
return fmt.Errorf("container not running (%s)", container.ID)
}

handler := &dockertools.NativeExecHandler{}
return handler.ExecInContainer(ds.client, container, cmd, stdin, stdout, stderr, tty, resize)
func (ds *dockerService) LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
return ds.streamingRuntime.Exec(containerID.ID, cmd, stdin, stdout, stderr, tty, resize)
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.