From 9c32bfb0aead58d46a1500466b2915d00dcc3d9f Mon Sep 17 00:00:00 2001 From: Salvatore Cirone <48855938+aka-somix@users.noreply.github.com> Date: Fri, 31 Mar 2023 17:51:50 +0200 Subject: [PATCH] Add support for Attach API functionality (#1090) Co-authored-by: Pablo Borrelli --- .../internal/provider/mock/mock.go | 7 + internal/kubernetes/remotecommand/attach.go | 79 ++++++++++ node/api/attach.go | 139 ++++++++++++++++++ node/api/server.go | 13 +- node/nodeutil/provider.go | 11 +- 5 files changed, 244 insertions(+), 5 deletions(-) create mode 100644 internal/kubernetes/remotecommand/attach.go create mode 100644 node/api/attach.go diff --git a/cmd/virtual-kubelet/internal/provider/mock/mock.go b/cmd/virtual-kubelet/internal/provider/mock/mock.go index c5b914df8..de4757117 100644 --- a/cmd/virtual-kubelet/internal/provider/mock/mock.go +++ b/cmd/virtual-kubelet/internal/provider/mock/mock.go @@ -301,6 +301,13 @@ func (p *MockProvider) RunInContainer(ctx context.Context, namespace, name, cont return nil } +// AttachToContainer attaches to the executing process of a container in the pod, copying data +// between in/out/err and the container's stdin/stdout/stderr. +func (p *MockProvider) AttachToContainer(ctx context.Context, namespace, name, container string, attach api.AttachIO) error { + log.G(ctx).Infof("receive AttachToContainer %q", container) + return nil +} + // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. func (p *MockProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { diff --git a/internal/kubernetes/remotecommand/attach.go b/internal/kubernetes/remotecommand/attach.go new file mode 100644 index 000000000..660af9db6 --- /dev/null +++ b/internal/kubernetes/remotecommand/attach.go @@ -0,0 +1,79 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "fmt" + "io" + "net/http" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/remotecommand" + utilexec "k8s.io/utils/exec" +) + +// Attacher knows how to attach to a container in a pod. +type Attacher interface { + // AttachToContainer attaches to a container in the pod, copying data + // between in/out/err and the container's stdin/stdout/stderr. + AttachToContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error +} + +// ServeAttach handles requests to attach to a container. After +// creating/receiving the required streams, it delegates the actual attachment +// to the attacher. +func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { + ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) + if !ok { + // error is handled by createStreams + return + } + defer ctx.conn.Close() + + err := attacher.AttachToContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) + if err != nil { + if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() { + rc := exitErr.ExitStatus() + ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Reason: remotecommandconsts.NonZeroExitCodeReason, + Details: &metav1.StatusDetails{ + Causes: []metav1.StatusCause{ + { + Type: remotecommandconsts.ExitCodeCauseType, + Message: fmt.Sprintf("%d", rc), + }, + }, + }, + Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr), + }}) + return + } + err = fmt.Errorf("error attaching to container: %v", err) + runtime.HandleError(err) + ctx.writeStatus(apierrors.NewInternalError(err)) + return + } + ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ + Status: metav1.StatusSuccess, + }}) +} diff --git a/node/api/attach.go b/node/api/attach.go new file mode 100644 index 000000000..a2a6b13fe --- /dev/null +++ b/node/api/attach.go @@ -0,0 +1,139 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "io" + "net/http" + "strings" + "time" + + "github.com/gorilla/mux" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" + "github.com/virtual-kubelet/virtual-kubelet/internal/kubernetes/remotecommand" + "k8s.io/apimachinery/pkg/types" + remoteutils "k8s.io/client-go/tools/remotecommand" +) + +// ContainerAttachHandlerFunc defines the handler function used for "execing" into a +// container in a pod. +type ContainerAttachHandlerFunc func(ctx context.Context, namespace, podName, containerName string, attach AttachIO) error + +// HandleContainerAttach makes an http handler func from a Provider which execs a command in a pod's container +// Note that this handler currently depends on gorrilla/mux to get url parts as variables. +// TODO(@cpuguy83): don't force gorilla/mux on consumers of this function +func HandleContainerAttach(h ContainerAttachHandlerFunc, opts ...ContainerExecHandlerOption) http.HandlerFunc { + if h == nil { + return NotImplemented + } + + var cfg ContainerExecHandlerConfig + for _, o := range opts { + o(&cfg) + } + + if cfg.StreamIdleTimeout == 0 { + cfg.StreamIdleTimeout = 30 * time.Second + } + if cfg.StreamCreationTimeout == 0 { + cfg.StreamCreationTimeout = 30 * time.Second + } + + return handleError(func(w http.ResponseWriter, req *http.Request) error { + vars := mux.Vars(req) + + namespace := vars["namespace"] + pod := vars["pod"] + container := vars["container"] + + supportedStreamProtocols := strings.Split(req.Header.Get("X-Stream-Protocol-Version"), ",") + + streamOpts, err := getExecOptions(req) + if err != nil { + return errdefs.AsInvalidInput(err) + } + + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + + attach := &containerAttachContext{ctx: ctx, h: h, pod: pod, namespace: namespace, container: container} + remotecommand.ServeAttach( + w, + req, + attach, + "", + "", + container, + streamOpts, + cfg.StreamIdleTimeout, + cfg.StreamCreationTimeout, + supportedStreamProtocols, + ) + + return nil + }) +} + +type containerAttachContext struct { + h ContainerAttachHandlerFunc + namespace, pod, container string + ctx context.Context +} + +// AttachToContainer Implements remotecommand.Attacher +// This is called by remotecommand.ServeAttach +func (c *containerAttachContext) AttachToContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remoteutils.TerminalSize, timeout time.Duration) error { + + eio := &execIO{ + tty: tty, + stdin: in, + stdout: out, + stderr: err, + } + + if tty { + eio.chResize = make(chan TermSize) + } + + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + + if tty { + go func() { + send := func(s remoteutils.TerminalSize) bool { + select { + case eio.chResize <- TermSize{Width: s.Width, Height: s.Height}: + return false + case <-ctx.Done(): + return true + } + } + + for { + select { + case s := <-resize: + if send(s) { + return + } + case <-ctx.Done(): + return + } + } + }() + } + + return c.h(c.ctx, c.namespace, c.pod, c.container, eio) +} diff --git a/node/api/server.go b/node/api/server.go index f0e130f6e..daa357bbe 100644 --- a/node/api/server.go +++ b/node/api/server.go @@ -34,8 +34,9 @@ type ServeMux interface { } type PodHandlerConfig struct { //nolint:golint - RunInContainer ContainerExecHandlerFunc - GetContainerLogs ContainerLogsHandlerFunc + RunInContainer ContainerExecHandlerFunc + AttachToContainer ContainerAttachHandlerFunc + GetContainerLogs ContainerLogsHandlerFunc // GetPods is meant to enumerate the pods that the provider knows about GetPods PodListerFunc // GetPodsFromKubernetes is meant to enumerate the pods that the node is meant to be running @@ -68,6 +69,14 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler { WithExecStreamIdleTimeout(p.StreamIdleTimeout), ), ).Methods("POST", "GET") + r.HandleFunc( + "/attach/{namespace}/{pod}/{container}", + HandleContainerAttach( + p.AttachToContainer, + WithExecStreamCreationTimeout(p.StreamCreationTimeout), + WithExecStreamIdleTimeout(p.StreamIdleTimeout), + ), + ).Methods("POST", "GET") if p.GetStatsSummary != nil { f := HandlePodStatsSummary(p.GetStatsSummary) diff --git a/node/nodeutil/provider.go b/node/nodeutil/provider.go index 6cd707b70..73aab23a1 100644 --- a/node/nodeutil/provider.go +++ b/node/nodeutil/provider.go @@ -28,6 +28,10 @@ type Provider interface { // between in/out/err and the container's stdin/stdout/stderr. RunInContainer(ctx context.Context, namespace, podName, containerName string, cmd []string, attach api.AttachIO) error + // AttachToContainer attaches to the executing process of a container in the pod, copying data + // between in/out/err and the container's stdin/stdout/stderr. + AttachToContainer(ctx context.Context, namespace, podName, containerName string, attach api.AttachIO) error + // GetStatsSummary gets the stats for the node, including running pods GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) @@ -58,9 +62,10 @@ func AttachProviderRoutes(mux api.ServeMux) NodeOpt { return func(cfg *NodeConfig) error { cfg.routeAttacher = func(p Provider, cfg NodeConfig, pods corev1listers.PodLister) { mux.Handle("/", api.PodHandler(api.PodHandlerConfig{ - RunInContainer: p.RunInContainer, - GetContainerLogs: p.GetContainerLogs, - GetPods: p.GetPods, + RunInContainer: p.RunInContainer, + AttachToContainer: p.AttachToContainer, + GetContainerLogs: p.GetContainerLogs, + GetPods: p.GetPods, GetPodsFromKubernetes: func(context.Context) ([]*v1.Pod, error) { return pods.List(labels.Everything()) },