Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use indirect streaming path for remote CRI shim #36253

Merged
merged 2 commits into from
Nov 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kubelet/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ go_library(
"//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/sysctl:go_default_library",
"//pkg/kubelet/types:go_default_library",
Expand Down
1 change: 0 additions & 1 deletion pkg/kubelet/dockershim/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"docker_service.go",
"docker_streaming.go",
"helpers.go",
"legacy.go",
"naming.go",
"security_context.go",
],
Expand Down
28 changes: 13 additions & 15 deletions pkg/kubelet/dockershim/docker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package dockershim

import (
"fmt"
"io"
"net/http"

"github.com/golang/glog"
"github.com/golang/protobuf/proto"
Expand All @@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network/cni"
"k8s.io/kubernetes/pkg/kubelet/network/kubenet"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/util/term"
)

const (
Expand Down Expand Up @@ -132,23 +131,14 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str
return ds, nil
}

// DockerService is an interface that embeds both the new RuntimeService and
// ImageService interfaces, while including DockerLegacyService for backward
// compatibility.
// DockerService is an interface that embeds the new RuntimeService and
// ImageService interfaces.
type DockerService interface {
internalApi.RuntimeService
internalApi.ImageManagerService
DockerLegacyService
Start() error
}

// DockerLegacyService is an interface that embeds all legacy methods for
// backward compatibility.
type DockerLegacyService interface {
// Supporting legacy methods for docker.
LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error
// For serving streaming calls.
http.Handler
}

type dockerService struct {
Expand Down Expand Up @@ -249,3 +239,11 @@ func (ds *dockerService) Status() (*runtimeApi.RuntimeStatus, error) {
}
return &runtimeApi.RuntimeStatus{Conditions: conditions}, nil
}

func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ds.streamingServer != nil {
ds.streamingServer.ServeHTTP(w, r)
} else {
http.NotFound(w, r)
}
}
41 changes: 0 additions & 41 deletions pkg/kubelet/dockershim/legacy.go

This file was deleted.

7 changes: 3 additions & 4 deletions pkg/kubelet/dockershim/remote/docker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package remote

import (
"fmt"
"time"

"golang.org/x/net/context"
Expand Down Expand Up @@ -165,15 +164,15 @@ func (d *dockerService) ExecSync(ctx context.Context, r *runtimeApi.ExecSyncRequ
}

func (d *dockerService) Exec(ctx context.Context, r *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
return nil, fmt.Errorf("not implemented")
return d.runtimeService.Exec(r)
}

func (d *dockerService) Attach(ctx context.Context, r *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
return nil, fmt.Errorf("not implemented")
return d.runtimeService.Attach(r)
}

func (d *dockerService) PortForward(ctx context.Context, r *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
return nil, fmt.Errorf("not implemented")
return d.runtimeService.PortForward(r)
}

func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeApi.UpdateRuntimeConfigRequest) (*runtimeApi.UpdateRuntimeConfigResponse, error) {
Expand Down
36 changes: 27 additions & 9 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"path"
"sort"
Expand Down Expand Up @@ -62,6 +63,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -527,8 +529,9 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub

switch kubeCfg.ContainerRuntime {
case "docker":
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps)
// Use the new CRI shim for docker.
ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings, kubeCfg.RuntimeCgroups)
ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, streamingConfig, &pluginSettings, kubeCfg.RuntimeCgroups)
if err != nil {
return nil, err
}
Expand All @@ -538,6 +541,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
return nil, err
}

klet.criHandler = ds
rs := ds.(internalApi.RuntimeService)
is := ds.(internalApi.ImageManagerService)
// This is an internal knob to switch between grpc and non-grpc
Expand Down Expand Up @@ -567,13 +571,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
// functions in CRI.
// TODO: Remove this hack after CRI is fully implemented.
// TODO: Move the instrumented interface wrapping into kuberuntime.
runtimeService = &struct {
internalApi.RuntimeService
dockershim.DockerLegacyService
}{
RuntimeService: kuberuntime.NewInstrumentedRuntimeService(rs),
DockerLegacyService: ds,
}
runtimeService = kuberuntime.NewInstrumentedRuntimeService(rs)
imageService = is
case "remote":
runtimeService, imageService, err = getRuntimeAndImageServices(kubeCfg)
Expand Down Expand Up @@ -1080,6 +1078,9 @@ type Kubelet struct {

// The AppArmor validator for checking whether AppArmor is supported.
appArmorValidator apparmor.Validator

// The handler serving CRI streaming calls (exec/attach/port-forward).
criHandler http.Handler
}

// setupDataDirs creates:
Expand Down Expand Up @@ -2070,7 +2071,7 @@ func (kl *Kubelet) ResyncInterval() time.Duration {

// ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime)
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime, kl.criHandler)
}

// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
Expand Down Expand Up @@ -2136,3 +2137,20 @@ func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap)
}
return reservation, nil
}

// Gets the streaming server configuration to use with in-process CRI shims.
func getStreamingConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps) *streaming.Config {
config := &streaming.Config{
// Use a relative redirect (no scheme or host).
BaseURL: &url.URL{
Path: "/cri/",
Copy link
Member

@liggitt liggitt Nov 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work with the redirect following code in the apiserver? I don't remember relative URL resolution

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The relative URL is resolved here: https://github.com/kubernetes/kubernetes/pull/34987/files#diff-4b9b9659e8f0abce3dab96b924f6b107R312
I also verified this works manually.

},
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedProtocols: streaming.DefaultConfig.SupportedProtocols,
}
if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config
}
return config
}
2 changes: 0 additions & 2 deletions pkg/kubelet/kuberuntime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ go_library(
"//pkg/kubelet/api:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/images:go_default_library",
Expand All @@ -56,7 +55,6 @@ go_library(
"//pkg/util/runtime:go_default_library",
"//pkg/util/selinux:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/term:go_default_library",
"//vendor:github.com/coreos/go-semver/semver",
"//vendor:github.com/docker/docker/pkg/jsonlog",
"//vendor:github.com/fsnotify/fsnotify",
Expand Down
26 changes: 0 additions & 26 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/types"
Expand All @@ -42,7 +41,6 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
)

// startContainer starts a container and returns a message indicates why it is failed on error.
Expand Down Expand Up @@ -653,17 +651,6 @@ func findNextInitContainerToRun(pod *api.Pod, podStatus *kubecontainer.PodStatus
return nil, &pod.Spec.InitContainers[0], false
}

// AttachContainer attaches to the container's console
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
// Use `docker attach` directly for in-process docker integration for
// now to unblock other tests.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyAttach(id, stdin, stdout, stderr, tty, resize)
}
return fmt.Errorf("not implemented")
}

// GetContainerLogs returns logs of a specific container.
func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
status, err := m.runtimeService.ContainerStatus(containerID.ID)
Expand Down Expand Up @@ -714,19 +701,6 @@ func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID,
return append(stdout, stderr...), err
}

// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
// Use `docker exec` directly for in-process docker integration for
// now to unblock other tests.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyExec(containerID, cmd, stdin, stdout, stderr, tty, resize)
}
return fmt.Errorf("not implemented")
}

// removeContainer removes the container and the container logs.
// Notice that we remove the container logs first, so that container will not be removed if
// container logs are failed to be removed, and kubelet will retry this later. This guarantees
Expand Down
22 changes: 0 additions & 22 deletions pkg/kubelet/kuberuntime/kuberuntime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kuberuntime
import (
"errors"
"fmt"
"io"
"os"
"time"

Expand All @@ -33,7 +32,6 @@ import (
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -114,8 +112,6 @@ type KubeGenericRuntime interface {
kubecontainer.Runtime
kubecontainer.IndirectStreamingRuntime
kubecontainer.ContainerCommandRunner
// TODO(timstclair): Remove this once the indirect path is fully functional.
kubecontainer.DirectStreamingRuntime
}

// NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
Expand Down Expand Up @@ -919,24 +915,6 @@ func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (k
return pod.Sandboxes[0].ID, nil
}

// Forward the specified port from the specified pod to the stream.
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
formattedPod := kubecontainer.FormatPod(pod)
if len(pod.Sandboxes) == 0 {
glog.Errorf("No sandboxes are found for pod %q", formattedPod)
return fmt.Errorf("sandbox for pod %q not found", formattedPod)
}

// Use docker portforward directly for in-process docker integration
// now to unblock other tests.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyPortForward(pod.Sandboxes[0].ID.ID, port, stream)
}

return fmt.Errorf("not implemented")
}

// UpdatePodCIDR is just a passthrough method to update the runtimeConfig of the shim
// with the podCIDR supplied by the kubelet.
func (m *kubeGenericRuntimeManager) UpdatePodCIDR(podCIDR string) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ go_test(
"//pkg/util/httpstream/spdy:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/term:go_default_library",
"//pkg/util/testing:go_default_library",
"//pkg/volume:go_default_library",
"//vendor:github.com/google/cadvisor/info/v1",
"//vendor:github.com/google/cadvisor/info/v2",
Expand Down