Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KEP-17: Pipe task implementation #1105

Merged
merged 19 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package task
package podexec

import (
"archive/tar"
"bytes"
"errors"
"fmt"
"io"
"os"
Expand All @@ -19,6 +20,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

var (
// ErrCommandFailed is returned for command executions with an exit code > 0
ErrCommandFailed = errors.New("command failed: ")
)

// PodExec defines a command that will be executed in a running Pod.
// RestCfg - The REST configuration for the cluster.
// PodName - The pod name on which to execute the command.
Expand Down Expand Up @@ -98,14 +104,14 @@ func (pe *PodExec) Run() error {

// FileSize fetches the size of a file in a remote pod. It runs `stat -c %s file` command in the
// pod and parses the output.
func FileSize(file string, pod *v1.Pod, restCfg *rest.Config) (int64, error) {
func FileSize(file string, pod *v1.Pod, ctrName string, restCfg *rest.Config) (int64, error) {
stdout := strings.Builder{}

pe := PodExec{
RestCfg: restCfg,
PodName: pod.Name,
PodNamespace: pod.Namespace,
ContainerName: pipePodContainerName,
ContainerName: ctrName,
Args: []string{"stat", "-c", "%s", file},
In: nil,
Out: &stdout,
Expand All @@ -114,7 +120,7 @@ func FileSize(file string, pod *v1.Pod, restCfg *rest.Config) (int64, error) {
}

if err := pe.Run(); err != nil {
return 0, fmt.Errorf("failed to get the size of %s, err: %v, stderr: %s", file, err, stdout.String())
return 0, fmt.Errorf("%wfailed to get the size of %s, err: %v, stderr: %s", ErrCommandFailed, file, err, stdout.String())
}

raw := stdout.String()
Expand All @@ -131,22 +137,22 @@ func FileSize(file string, pod *v1.Pod, restCfg *rest.Config) (int64, error) {
// of the file via the stdout. Locally, the tar file is extracted into the passed afero filesystem where
// it is saved under the same path. Afero filesystem is used to allow the caller downloading and persisting
// of multiple files concurrently (afero filesystem is thread-safe).
func DownloadFile(fs afero.Fs, file string, pod *v1.Pod, restCfg *rest.Config) error {
func DownloadFile(fs afero.Fs, file string, pod *v1.Pod, ctrName string, restCfg *rest.Config) error {
stdout := bytes.Buffer{}
stderr := strings.Builder{}

pe := PodExec{
RestCfg: restCfg,
PodName: pod.Name,
PodNamespace: pod.Namespace,
ContainerName: pipePodContainerName,
ContainerName: ctrName,
Args: []string{"tar", "cf", "-", file},
In: nil,
Out: &stdout,
Err: &stderr,
}
if err := pe.Run(); err != nil {
return fmt.Errorf("failed to copy pipe file. err: %v, stderr: %s", err, stderr.String())
return fmt.Errorf("%wfailed to copy pipe file. err: %v, stderr: %s", ErrCommandFailed, err, stderr.String())
}

if err := untarFile(fs, &stdout, file); err != nil {
Expand Down Expand Up @@ -182,20 +188,22 @@ func untarFile(fs afero.Fs, r io.Reader, fileName string) error {
if err != nil {
return err
}
defer f.Close() // nolint
nfnt marked this conversation as resolved.
Show resolved Hide resolved

// copy over contents
if _, err := io.Copy(f, tr); err != nil {
return err
}

// manually close here after each file operation; deferring would cause each file close
// to wait until all operations have completed.
f.Close() // nolint

default:
fmt.Printf("skipping %s because it is not a regular file or a directory", header.Name)
}
}

return nil
}

// HasCommandFailed returns true if PodExec command returned an exit code > 0
func HasCommandFailed(err error) bool {
return err != nil && errors.Is(err, ErrCommandFailed)
}
36 changes: 21 additions & 15 deletions pkg/engine/task/task_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/kudobuilder/kudo/pkg/engine/renderer"
"github.com/kudobuilder/kudo/pkg/engine/task/podexec"
"github.com/spf13/afero"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -157,34 +158,28 @@ func isRelative(base, file string) bool {
// sharedVolumeName method searches pod volumes for one of the type emptyDir and returns
// its name. Method expects exactly one such volume to exits and will return an error otherwise.
func sharedVolumeName(pod *corev1.Pod) (string, error) {
name := ""
vols := 0
volumes := []string{}
for _, v := range pod.Spec.Volumes {
if v.EmptyDir != nil {
name = v.Name
vols++
volumes = append(volumes, v.Name)
}
}
if name == "" || vols != 1 {
if len(volumes) != 1 {
return "", errors.New("pipe pod should define one emptyDir shared volume where the artifacts are temporary stored")
}
return name, nil
return volumes[0], nil
}

// sharedMountPath method searches pod initContainer volume mounts for one with a passed name.
// It returns the mount path of the volume if found or an error otherwise.
func sharedMountPath(pod *corev1.Pod, volName string) (string, error) {
mountPath := ""
for _, vm := range pod.Spec.InitContainers[0].VolumeMounts {
if vm.Name == volName {
mountPath = vm.MountPath
return vm.MountPath, nil
}
}

if mountPath == "" {
return "", fmt.Errorf("pipe pod should save generated artifacts in %s", volName)
}
return mountPath, nil
return "", fmt.Errorf("pipe pod should save generated artifacts in %s", volName)
}

// validate method validates passed pipe pod. It is expected to:
Expand Down Expand Up @@ -281,16 +276,27 @@ func copyFiles(fs afero.Fs, ff []PipeFile, pod *corev1.Pod, ctx Context) error {
// Check the size of the pipe file first. K87 has a inherent limit on the size of
zen-dog marked this conversation as resolved.
Show resolved Hide resolved
// Secret/ConfigMap, so we avoid unnecessary copying of files that are too big by
// checking its size first.
size, err := FileSize(f.File, pod, restCfg)
size, err := podexec.FileSize(f.File, pod, pipePodContainerName, restCfg)
if err != nil {
return fatalExecutionError(err, pipeTaskError, ctx.Meta)
// Any remote command exit code > 0 is treated as a fatal error since retrying it doesn't make sense
if podexec.HasCommandFailed(err) {
return fatalExecutionError(err, pipeTaskError, ctx.Meta)
}
return err
}

if size > maxPipeFileSize {
return fatalExecutionError(fmt.Errorf("pipe file %s size %d exceeds maximum file size of %d bytes", f.File, size, maxPipeFileSize), pipeTaskError, ctx.Meta)
}

return DownloadFile(fs, f.File, pod, restCfg)
if err = podexec.DownloadFile(fs, f.File, pod, pipePodContainerName, restCfg); err != nil {
// Any remote command exit code > 0 is treated as a fatal error since retrying it doesn't make sense
if podexec.HasCommandFailed(err) {
return fatalExecutionError(err, pipeTaskError, ctx.Meta)
}
return err
}
return nil
})
}

Expand Down