Skip to content

Commit

Permalink
#3 add use of buffered pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
maorfr committed Dec 3, 2018
1 parent 6deea46 commit 21dd376
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 75 deletions.
25 changes: 15 additions & 10 deletions pkg/skbn/abs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
nio "gopkg.in/djherbis/nio.v2"
)

var err error
Expand Down Expand Up @@ -160,37 +161,40 @@ func GetListOfFilesFromAbs(ctx context.Context, iClient interface{}, path string
}

// DownloadFromAbs downloads a single file from azure blob storage
func DownloadFromAbs(ctx context.Context, iClient interface{}, path string) ([]byte, error) {
func DownloadFromAbs(ctx context.Context, iClient interface{}, path string, pw *nio.PipeWriter) error {
pSplit := strings.Split(path, "/")

if err := validateAbsPath(pSplit); err != nil {
return nil, err
return err
}
a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)
if err != nil {
return nil, err
return err
}

bu := getBlobURL(cu, p)
dr, err := bu.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)
if err != nil {
return nil, err
return err
}

bs := dr.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
dd := bytes.Buffer{}
_, err = dd.ReadFrom(bs)
if err != nil {
return nil, err
return err
}

return dd.Bytes(), nil
// this is a workaround
// we do not want to save the entire file to memory
pw.Write(dd.Bytes())
return nil
}

// UploadToAbs uploads a single file to azure blob storage
func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath string, buffer []byte) error {
func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath string, pr *nio.PipeReader) error {
pSplit := strings.Split(toPath, "/")
if err := validateAbsPath(pSplit); err != nil {
return err
Expand All @@ -210,9 +214,10 @@ func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath stri

bu := getBlobURL(cu, p)

_, err = azblob.UploadBufferToBlockBlob(ctx, buffer, bu, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})
_, err = azblob.UploadStreamToBlockBlob(ctx, pr, bu, azblob.UploadStreamToBlockBlobOptions{
BufferSize: 4 * 1024 * 1024,
MaxBuffers: 16,
})
if err != nil {
return err
}
Expand Down
45 changes: 23 additions & 22 deletions pkg/skbn/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/nuvo/skbn/pkg/utils"
nio "gopkg.in/djherbis/nio.v2"

core_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -73,7 +74,8 @@ func GetListOfFilesFromK8s(iClient interface{}, path, findType, findName string)
for attempt < attempts {
attempt++

output, stderr, err := Exec(client, namespace, podName, containerName, command, nil)
output := new(bytes.Buffer)
stderr, err := Exec(client, namespace, podName, containerName, command, nil, output)
if len(stderr) != 0 {
if attempt == attempts {
return nil, fmt.Errorf("STDERR: " + (string)(stderr))
Expand All @@ -89,7 +91,7 @@ func GetListOfFilesFromK8s(iClient interface{}, path, findType, findName string)
continue
}

lines := strings.Split((string)(output), "\n")
lines := strings.Split((string)(output.Bytes()), "\n")
var outLines []string
for _, line := range lines {
if line != "" {
Expand All @@ -104,11 +106,11 @@ func GetListOfFilesFromK8s(iClient interface{}, path, findType, findName string)
}

// DownloadFromK8s downloads a single file from Kubernetes
func DownloadFromK8s(iClient interface{}, path string) ([]byte, error) {
func DownloadFromK8s(iClient interface{}, path string, pw *nio.PipeWriter) error {
client := *iClient.(*K8sClient)
pSplit := strings.Split(path, "/")
if err := validateK8sPath(pSplit); err != nil {
return nil, err
return err
}
namespace, podName, containerName, pathToCopy := initK8sVariables(pSplit)
command := []string{"cat", pathToCopy}
Expand All @@ -118,26 +120,26 @@ func DownloadFromK8s(iClient interface{}, path string) ([]byte, error) {
for attempt < attempts {
attempt++

stdout, stderr, err := Exec(client, namespace, podName, containerName, command, nil)
stderr, err := Exec(client, namespace, podName, containerName, command, nil, pw)
if attempt == attempts {
if len(stderr) != 0 {
return stdout, fmt.Errorf("STDERR: " + (string)(stderr))
return fmt.Errorf("STDERR: " + (string)(stderr))
}
if err != nil {
return stdout, err
return err
}
}
if err == nil {
return stdout, nil
return nil
}
utils.Sleep(attempt)
}

return nil, nil
return nil
}

// UploadToK8s uploads a single file to Kubernetes
func UploadToK8s(iClient interface{}, toPath, fromPath string, buffer []byte) error {
func UploadToK8s(iClient interface{}, toPath, fromPath string, pr *nio.PipeReader) error {
client := *iClient.(*K8sClient)
pSplit := strings.Split(toPath, "/")
if err := validateK8sPath(pSplit); err != nil {
Expand All @@ -155,7 +157,7 @@ func UploadToK8s(iClient interface{}, toPath, fromPath string, buffer []byte) er
attempt++
dir, _ := filepath.Split(pathToCopy)
command := []string{"mkdir", "-p", dir}
_, stderr, err := Exec(client, namespace, podName, containerName, command, nil)
stderr, err := Exec(client, namespace, podName, containerName, command, nil, nil)

if len(stderr) != 0 {
if attempt == attempts {
Expand All @@ -173,7 +175,7 @@ func UploadToK8s(iClient interface{}, toPath, fromPath string, buffer []byte) er
}

command = []string{"touch", pathToCopy}
_, stderr, err = Exec(client, namespace, podName, containerName, command, nil)
stderr, err = Exec(client, namespace, podName, containerName, command, nil, nil)

if len(stderr) != 0 {
if attempt == attempts {
Expand All @@ -191,8 +193,7 @@ func UploadToK8s(iClient interface{}, toPath, fromPath string, buffer []byte) er
}

command = []string{"cp", "/dev/stdin", pathToCopy}
stdin := bytes.NewReader(buffer)
_, stderr, err = Exec(client, namespace, podName, containerName, command, readerWrapper{stdin})
stderr, err = Exec(client, namespace, podName, containerName, command, readerWrapper{pr}, nil)

if len(stderr) != 0 {
if attempt == attempts {
Expand Down Expand Up @@ -223,7 +224,7 @@ func (r readerWrapper) Read(p []byte) (int, error) {
}

// Exec executes a command in a given container
func Exec(client K8sClient, namespace, podName, containerName string, command []string, stdin io.Reader) ([]byte, []byte, error) {
func Exec(client K8sClient, namespace, podName, containerName string, command []string, stdin io.Reader, stdout io.Writer) ([]byte, error) {
clientset, config := client.ClientSet, client.Config

req := clientset.Core().RESTClient().Post().
Expand All @@ -233,36 +234,36 @@ func Exec(client K8sClient, namespace, podName, containerName string, command []
SubResource("exec")
scheme := runtime.NewScheme()
if err := core_v1.AddToScheme(scheme); err != nil {
return nil, nil, fmt.Errorf("error adding to scheme: %v", err)
return nil, fmt.Errorf("error adding to scheme: %v", err)
}

parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&core_v1.PodExecOptions{
Command: command,
Container: containerName,
Stdin: stdin != nil,
Stdout: true,
Stdout: stdout != nil,
Stderr: true,
TTY: false,
}, parameterCodec)

exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return nil, nil, fmt.Errorf("error while creating Executor: %v", err)
return nil, fmt.Errorf("error while creating Executor: %v", err)
}

var stdout, stderr bytes.Buffer
var stderr bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: &stdout,
Stdout: stdout,
Stderr: &stderr,
Tty: false,
})
if err != nil {
return nil, nil, fmt.Errorf("error in Stream: %v", err)
return nil, fmt.Errorf("error in Stream: %v", err)
}

return stdout.Bytes(), stderr.Bytes(), nil
return stderr.Bytes(), nil
}

func validateK8sPath(pathSplit []string) error {
Expand Down
19 changes: 11 additions & 8 deletions pkg/skbn/s3.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package skbn

import (
"bytes"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/nuvo/skbn/pkg/utils"
nio "gopkg.in/djherbis/nio.v2"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -90,11 +90,11 @@ func GetListOfFilesFromS3(iClient interface{}, path string) ([]string, error) {
}

// DownloadFromS3 downloads a single file from S3
func DownloadFromS3(iClient interface{}, path string) ([]byte, error) {
func DownloadFromS3(iClient interface{}, path string, pw *nio.PipeWriter) error {
s := iClient.(*session.Session)
pSplit := strings.Split(path, "/")
if err := validateS3Path(pSplit); err != nil {
return nil, err
return err
}
bucket, s3Path := initS3Variables(pSplit)

Expand All @@ -113,20 +113,23 @@ func DownloadFromS3(iClient interface{}, path string) ([]byte, error) {
})
if err != nil {
if attempt == attempts {
return nil, err
return err
}
utils.Sleep(attempt)
continue
}

return buffer.Bytes(), nil
// this is a workaround
// we do not want to save the entire file to memory
pw.Write(buffer.Bytes())
return nil
}

return nil, nil
return nil
}

// UploadToS3 uploads a single file to S3
func UploadToS3(iClient interface{}, toPath, fromPath string, buffer []byte) error {
func UploadToS3(iClient interface{}, toPath, fromPath string, pr *nio.PipeReader) error {
s := iClient.(*session.Session)
pSplit := strings.Split(toPath, "/")
if err := validateS3Path(pSplit); err != nil {
Expand All @@ -148,7 +151,7 @@ func UploadToS3(iClient interface{}, toPath, fromPath string, buffer []byte) err
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(s3Path),
Body: bytes.NewReader(buffer),
Body: pr,
})
if err != nil {
if attempt == attempts {
Expand Down
Loading

0 comments on commit 21dd376

Please sign in to comment.