From 4c7e401b068a0e7cd8e7a3b5a4573f39d76695db Mon Sep 17 00:00:00 2001 From: Simon Hardy Date: Wed, 13 Mar 2019 13:43:01 +0100 Subject: [PATCH] add support for gcs --- .gitignore | 4 + Gopkg.lock | 155 +++++++++++++++++++++++++++++++-- Gopkg.toml | 8 ++ README.md | 14 +++ cmd/skbn.go | 2 +- examples/code/example.go | 6 +- pkg/skbn/gcs.go | 180 +++++++++++++++++++++++++++++++++++++++ pkg/skbn/kube.go | 2 +- pkg/skbn/s3.go | 2 +- pkg/skbn/skbn.go | 32 ++++++- 10 files changed, 393 insertions(+), 12 deletions(-) create mode 100644 pkg/skbn/gcs.go diff --git a/.gitignore b/.gitignore index 1488d80..fbb3009 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,7 @@ test/ # dep/glide vendor/ bin/ + +# IntelliJ IDEA +.idea/ +skbn.iml diff --git a/Gopkg.lock b/Gopkg.lock index f4b19fe..37c709f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,6 +1,22 @@ # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. +[[projects]] + digest = "1:ab38507dcaeb1aec1b6b8b425151c94ad9468812d214e546bf75d77d13f9e9c1" + name = "cloud.google.com/go" + packages = [ + "compute/metadata", + "iam", + "internal", + "internal/optional", + "internal/trace", + "internal/version", + "storage", + ] + pruneopts = "UT" + revision = "f52f9bc132541d2aa914f42100c36d10b1ef7e0c" + version = "v0.37.0" + [[projects]] digest = "1:d2ccb697dc13c8fbffafa37baae97594d5592ae8f7e113471084137315536e2b" name = "github.com/Azure/azure-pipeline-go" @@ -96,10 +112,11 @@ version = "v0.5" [[projects]] - digest = "1:17fe264ee908afc795734e8c4e63db2accabaf57326dbf21763a7d6b86096260" + digest = "1:14834e04828af9e53954f1be45ea7f190c4c26d746009c2ab07c5828595539e9" name = "github.com/golang/protobuf" packages = [ "proto", + "protoc-gen-go/descriptor", "ptypes", "ptypes/any", "ptypes/duration", @@ -124,6 +141,14 @@ pruneopts = "UT" revision = "44d81051d367757e1c7c6a5a86423ece9afcf63c" +[[projects]] + digest = "1:856bd1e35f6da8ce5671a5df09d0e89bf01e9b74b3dabb6d097d39b3813801e1" + name = "github.com/googleapis/gax-go" + packages = ["v2"] + pruneopts = "UT" + revision = "c8a15bac9b9fe955bd9f900272f9a306465d28cf" + version = "v2.0.3" + [[projects]] digest = "1:75eb87381d25cc75212f52358df9c3a2719584eaa9685cd510ce28699122f39d" name = "github.com/googleapis/gnostic" @@ -145,6 +170,14 @@ pruneopts = "UT" revision = "787624de3eb7bd915c329cba748687a3b22666a6" +[[projects]] + digest = "1:67474f760e9ac3799f740db2c489e6423a4cde45520673ec123ac831ad849cb8" + name = "github.com/hashicorp/golang-lru" + packages = ["simplelru"] + pruneopts = "UT" + revision = "7087cb70de9f7a8bc0a10c375cb0d2280a8edf9c" + version = "v0.5.1" + [[projects]] digest = "1:3e260afa138eab6492b531a3b3d10ab4cb70512d423faa78b8949dec76e66a21" name = "github.com/imdario/mergo" @@ -224,6 +257,29 @@ revision = "298182f68c66c05229eb03ac171abe6e309ee79a" version = "v1.0.3" +[[projects]] + digest = "1:1af1920a0f0dc25426ba2e57154b9c091ec2ed83be9107abcf83d23c6c9a4194" + name = "go.opencensus.io" + packages = [ + ".", + "exemplar", + "internal", + "internal/tagencoding", + "plugin/ochttp", + "plugin/ochttp/propagation/b3", + "stats", + "stats/internal", + "stats/view", + "tag", + "trace", + "trace/internal", + "trace/propagation", + "trace/tracestate", + ] + pruneopts = "UT" + revision = "f305e5c4e2cf345eba88de13d10de1126fa45a61" + version = "v0.19.1" + [[projects]] digest = "1:3f3a05ae0b95893d90b9b3b5afdb79a9b3d96e4e36e099d841ae602e4aca0da8" name = "golang.org/x/crypto" @@ -233,27 +289,33 @@ [[projects]] branch = "release-branch.go1.10" - digest = "1:bdc99a0ff03b87c7fe65dce8cfd5f7db86d65446850b2c655db7bc740f16aded" + digest = "1:cd65938fd2f89b2c1a7504e875a16a3bc7f5184fd849f4ffe632700b2ce7ab11" name = "golang.org/x/net" packages = [ "context", + "context/ctxhttp", "http2", "http2/hpack", "idna", + "internal/timeseries", "lex/httplex", + "trace", ] pruneopts = "UT" revision = "0ed95abb35c445290478a5348a7b38bb154135fd" [[projects]] - digest = "1:9359217acc6040b4be710ce34473acef28023ad39bfafecea34ffaea7f1e1890" + digest = "1:5e9f22cf754ab20a5dff0ae04b12516b112c5b81cd44dccccde148865084d730" name = "golang.org/x/oauth2" packages = [ ".", + "google", "internal", + "jws", + "jwt", ] pruneopts = "UT" - revision = "a6bd8cefa1811bd24b86f8902872e4e8225f74c4" + revision = "e64efc72b421e893cbf63f17ba2221e7d6d0b0f3" [[projects]] branch = "master" @@ -297,13 +359,35 @@ revision = "f51c12702a4d776e4c1fa9b0fabab841babae631" [[projects]] - digest = "1:08206298775e5b462e6c0333f4471b44e63f1a70e42952b6ede4ecc9572281eb" + digest = "1:768c35ec83dd17029060ea581d6ca9fdcaef473ec87e93e4bb750949035f6070" + name = "google.golang.org/api" + packages = [ + "gensupport", + "googleapi", + "googleapi/internal/uritemplates", + "googleapi/transport", + "internal", + "iterator", + "option", + "storage/v1", + "transport/http", + "transport/http/internal/propagation", + ] + pruneopts = "UT" + revision = "19e022d8cf43ce81f046bae8cc18c5397cc7732f" + version = "v0.1.0" + +[[projects]] + digest = "1:d2a8db567a76203e3b41c1f632d86485ffd57f8e650a0d1b19d240671c2fddd7" name = "google.golang.org/appengine" packages = [ + ".", "internal", + "internal/app_identity", "internal/base", "internal/datastore", "internal/log", + "internal/modules", "internal/remote_api", "internal/urlfetch", "urlfetch", @@ -312,6 +396,59 @@ revision = "4a4468ece617fc8205e99368fa2200e9d1fad421" version = "v1.3.0" +[[projects]] + branch = "master" + digest = "1:0e25919d4395a2626f31b9607131bed0ec6407d3579403e503eafc418c05e49f" + name = "google.golang.org/genproto" + packages = [ + "googleapis/api/annotations", + "googleapis/iam/v1", + "googleapis/rpc/code", + "googleapis/rpc/status", + ] + pruneopts = "UT" + revision = "5fe7a883aa19554f42890211544aa549836af7b7" + +[[projects]] + digest = "1:cbc746de4662c66fd24a037501bd65aa0f8ad0bfca0c92576e0abb88864e3741" + name = "google.golang.org/grpc" + packages = [ + ".", + "balancer", + "balancer/base", + "balancer/roundrobin", + "binarylog/grpc_binarylog_v1", + "codes", + "connectivity", + "credentials", + "credentials/internal", + "encoding", + "encoding/proto", + "grpclog", + "internal", + "internal/backoff", + "internal/binarylog", + "internal/channelz", + "internal/envconfig", + "internal/grpcrand", + "internal/grpcsync", + "internal/syscall", + "internal/transport", + "keepalive", + "metadata", + "naming", + "peer", + "resolver", + "resolver/dns", + "resolver/passthrough", + "stats", + "status", + "tap", + ] + pruneopts = "UT" + revision = "2fdaae294f38ed9a121193c51ec99fecd3b13eb7" + version = "v1.19.0" + [[projects]] digest = "1:5aebed51f26a49cb77a0c34d1328a0d29839e7a1204b14ff8e26a3df8ec61736" name = "gopkg.in/djherbis/nio.v2" @@ -426,7 +563,7 @@ revision = "2b1284ed4c93a43499e781493253e2ac5959c4fd" [[projects]] - digest = "1:92c7b523fffa5badf467754612745bfa8629c4999a44c0c2521c16e2ad50bd2a" + digest = "1:89965ed41772270efa6a201c54382ca28bba606bfb364fb6cd0c1252c57bf1a2" name = "k8s.io/client-go" packages = [ "discovery", @@ -469,8 +606,10 @@ "pkg/apis/clientauthentication/v1beta1", "pkg/version", "plugin/pkg/client/auth/exec", + "plugin/pkg/client/auth/gcp", "rest", "rest/watch", + "third_party/forked/golang/template", "tools/auth", "tools/clientcmd", "tools/clientcmd/api", @@ -487,6 +626,7 @@ "util/flowcontrol", "util/homedir", "util/integer", + "util/jsonpath", ] pruneopts = "UT" revision = "e64494209f554a6723674bd494d69445fb76a1d4" @@ -511,6 +651,7 @@ analyzer-name = "dep" analyzer-version = 1 input-imports = [ + "cloud.google.com/go/storage", "github.com/Azure/azure-pipeline-go/pipeline", "github.com/Azure/azure-storage-blob-go/azblob", "github.com/aws/aws-sdk-go/aws", @@ -519,10 +660,12 @@ "github.com/aws/aws-sdk-go/service/s3/s3manager", "github.com/djherbis/buffer", "github.com/spf13/cobra", + "google.golang.org/api/iterator", "gopkg.in/djherbis/nio.v2", "k8s.io/api/core/v1", "k8s.io/apimachinery/pkg/runtime", "k8s.io/client-go/kubernetes", + "k8s.io/client-go/plugin/pkg/client/auth/gcp", "k8s.io/client-go/rest", "k8s.io/client-go/tools/clientcmd", "k8s.io/client-go/tools/remotecommand", diff --git a/Gopkg.toml b/Gopkg.toml index 4ba0d56..a045da6 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -10,6 +10,14 @@ name = "github.com/aws/aws-sdk-go" version = "1.16.7" +[[override]] + name = "golang.org/x/oauth2" + revision = "e64efc72b421e893cbf63f17ba2221e7d6d0b0f3" + +[[constraint]] + name = "cloud.google.com/go" + version = "0.37.0" + [[constraint]] name = "github.com/djherbis/buffer" version = "1.0.0" diff --git a/README.md b/README.md index 20f324c..755b1e5 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Skbn currently supports the following providers: * AWS S3 * Minio S3 * Azure Blob Storage +* Google Cloud Storage ## Install @@ -67,6 +68,14 @@ skbn cp \ --dst k8s:///// ``` +### Copy files from Kubernetes to Google Cloud Storage + +``` +skbn cp \ + --src k8s:///// \ + --dst gcs:/// +``` + ## Advanced usage ### Copy files from source to destination in parallel @@ -166,6 +175,11 @@ In addition, the `AWS_REGION` environment variable should be set (default is `eu Skbn uses `AZURE_STORAGE_ACCOUNT` and `AZURE_STORAGE_ACCESS_KEY` environment variables for authentication. +### Google Cloud Storage + +Skbn uses Google [Application Default Credentials](https://cloud.google.com/docs/authentication/production). +Basically, it will first look for the `GOOGLE_APPLICATION_CREDENTIALS` environment variable. If it is not defined, it will look for the default service account, or throw an error if none is configured. + ## Examples 1. [In-cluster example](/examples/in-cluster) diff --git a/cmd/skbn.go b/cmd/skbn.go index 92d39bb..338bb04 100644 --- a/cmd/skbn.go +++ b/cmd/skbn.go @@ -6,7 +6,7 @@ import ( "log" "os" - "github.com/maorfr/skbn/pkg/skbn" + "skbn/pkg/skbn" "github.com/spf13/cobra" ) diff --git a/examples/code/example.go b/examples/code/example.go index 3ad5d92..8c0bbcb 100644 --- a/examples/code/example.go +++ b/examples/code/example.go @@ -2,8 +2,9 @@ package main import ( "log" + "time" - "github.com/maorfr/skbn/pkg/skbn" + "skbn/pkg/skbn" ) func main() { @@ -12,7 +13,10 @@ func main() { parallel := 0 // all at once bufferSize := 1.0 // 1GB of in memory buffer size + start := time.Now() if err := skbn.Copy(src, dst, parallel, bufferSize); err != nil { log.Fatal(err) } + elapsed := time.Since(start) + log.Printf("Copy execution time: %s", elapsed) } diff --git a/pkg/skbn/gcs.go b/pkg/skbn/gcs.go new file mode 100644 index 0000000..af3597c --- /dev/null +++ b/pkg/skbn/gcs.go @@ -0,0 +1,180 @@ +package skbn + +import ( + "context" + "fmt" + "google.golang.org/api/iterator" + "io" + "log" + "path/filepath" + "strings" + + "skbn/pkg/utils" + + "cloud.google.com/go/storage" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" +) + +// GetClientToGcs checks the connection to GCS and returns the tested client +func GetClientToGcs(ctx context.Context, path string) (*storage.Client, error) { + pSplit := strings.Split(path, "/") + bucketName, _ := initGcsVariables(pSplit) + attempts := 3 + attempt := 0 + for attempt < attempts { + attempt++ + + client, err := storage.NewClient(ctx) + if err != nil { + if attempt == attempts { + return nil, err + } + utils.Sleep(attempt) + continue + } + + bucket := client.Bucket(bucketName) + _, err = bucket.Attrs(ctx) + if err != nil { + if attempt == attempts { + return nil, err + } + } + if err == nil { + return client, nil + } + utils.Sleep(attempt) + } + + log.Println("Could not get client to GCS") + return nil, nil +} + +// GetListOfFilesFromGcs gets list of files in path from GCS (recursive) +func GetListOfFilesFromGcs(ctx context.Context, iClient interface{}, path string) ([]string, error) { + client := iClient.(*storage.Client) + pSplit := strings.Split(path, "/") + if err := validateGcsPath(pSplit); err != nil { + return nil, err + } + bucketName, gcsPath := initGcsVariables(pSplit) + + var outLines []string + bucket := client.Bucket(bucketName) + objectIterator := bucket.Objects(ctx, &storage.Query{Prefix: gcsPath}) // gets all files and directories recursively + for objectAttrs, err := objectIterator.Next(); err != iterator.Done; objectAttrs, err = objectIterator.Next() { + if err != nil { + return nil, err + } + fileName := objectAttrs.Name + if !strings.HasSuffix(fileName, "/") { // don't append directories + outLines = append(outLines, strings.Replace(fileName, gcsPath, "", 1)) + } + } + + return outLines, nil +} + +// DownloadFromGcs downloads a single file from GCS +func DownloadFromGcs(ctx context.Context, iClient interface{}, path string, writer io.Writer) error { + client := iClient.(*storage.Client) + pSplit := strings.Split(path, "/") + if err := validateGcsPath(pSplit); err != nil { + return err + } + bucketName, gcsPath := initGcsVariables(pSplit) + + attempts := 3 + attempt := 0 + for attempt < attempts { + attempt++ + + err := func() error { + reader, err := client.Bucket(bucketName).Object(gcsPath).NewReader(ctx) + defer func() { + if err := reader.Close(); err != nil { + log.Println("Error in reader.Close()", err) + } + }() + if err != nil { // no object found at [path] + return err + } + _, err = io.Copy(writer, reader) + if err != nil { // error other than EOF occurred + return err + } + return nil + }() + if err != nil { + if attempt == attempts { + return err + } + utils.Sleep(attempt) + continue + } + return nil + } + + log.Println("Could not download file from GCS at", path) + return nil +} + +// UploadToGCS uploads a single file to GCS +func UploadToGcs(ctx context.Context, iClient interface{}, toPath, fromPath string, reader io.Reader) error { + client := iClient.(*storage.Client) + pSplit := strings.Split(toPath, "/") + if err := validateGcsPath(pSplit); err != nil { + return err + } + if len(pSplit) == 1 { + _, fileName := filepath.Split(fromPath) + pSplit = append(pSplit, fileName) + } + bucketName, gcsPath := initGcsVariables(pSplit) + + attempts := 3 + attempt := 0 + for attempt < attempts { + attempt++ + + err := func() error { + writer := client.Bucket(bucketName).Object(gcsPath).NewWriter(ctx) + defer func() { + if err := writer.Close(); err != nil { + log.Println("Error in writer.Close()", err) + } + }() + _, err := io.Copy(writer, reader) + if err != nil { // error other than EOF occurred + return err + } + return nil + }() + + if err != nil { + if attempt == attempts { + return err + } + utils.Sleep(attempt) + continue + } + return nil + } + + log.Println("Could not upload file to GCS at", toPath) + return nil +} + +func validateGcsPath(pathSplit []string) error { + if len(pathSplit) >= 1 { + return nil + } + return fmt.Errorf("illegal path: %s", filepath.Join(pathSplit...)) +} + +func initGcsVariables(split []string) (string, string) { + bucket := split[0] + path := filepath.Join(split[1:]...) + + return bucket, path +} diff --git a/pkg/skbn/kube.go b/pkg/skbn/kube.go index 6e1006c..b6f9d20 100644 --- a/pkg/skbn/kube.go +++ b/pkg/skbn/kube.go @@ -8,7 +8,7 @@ import ( "path/filepath" "strings" - "github.com/maorfr/skbn/pkg/utils" + "skbn/pkg/utils" core_v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" diff --git a/pkg/skbn/s3.go b/pkg/skbn/s3.go index a1b8573..772e8b2 100644 --- a/pkg/skbn/s3.go +++ b/pkg/skbn/s3.go @@ -8,7 +8,7 @@ import ( "strconv" "strings" - "github.com/maorfr/skbn/pkg/utils" + "skbn/pkg/utils" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" diff --git a/pkg/skbn/skbn.go b/pkg/skbn/skbn.go index 48fdd13..af47c5e 100644 --- a/pkg/skbn/skbn.go +++ b/pkg/skbn/skbn.go @@ -8,7 +8,7 @@ import ( "math" "path/filepath" - "github.com/maorfr/skbn/pkg/utils" + "skbn/pkg/utils" "github.com/djherbis/buffer" "gopkg.in/djherbis/nio.v2" @@ -51,6 +51,7 @@ func TestImplementationsExist(srcPrefix, dstPrefix string) error { case "k8s": case "s3": case "abs": + case "gcs": default: return fmt.Errorf(srcPrefix + " not implemented") } @@ -59,6 +60,7 @@ func TestImplementationsExist(srcPrefix, dstPrefix string) error { case "k8s": case "s3": case "abs": + case "gcs": default: return fmt.Errorf(dstPrefix + " not implemented") } @@ -102,7 +104,6 @@ func GetFromToPaths(srcClient interface{}, srcPrefix, srcPath, dstPath string) ( // PerformCopy performs the actual copy action func PerformCopy(srcClient, dstClient interface{}, srcPrefix, dstPrefix string, fromToPaths []FromToPair, parallel int, bufferSize float64) error { - // Execute in parallel totalFiles := len(fromToPaths) if parallel == 0 { @@ -203,6 +204,12 @@ func GetListOfFiles(client interface{}, prefix, path string) ([]string, error) { return nil, err } relativePaths = paths + case "gcs": + paths, err := GetListOfFilesFromGcs(ctx, client, path) + if err != nil { + return nil, err + } + relativePaths = paths default: return nil, fmt.Errorf(prefix + " not implemented") } @@ -231,6 +238,11 @@ func Download(srcClient interface{}, srcPrefix, srcPath string, writer io.Writer if err != nil { return err } + case "gcs": + err := DownloadFromGcs(ctx, srcClient, srcPath, writer) + if err != nil { + return err + } default: return fmt.Errorf(srcPrefix + " not implemented") } @@ -259,6 +271,11 @@ func Upload(dstClient interface{}, dstPrefix, dstPath, srcPath string, reader io if err != nil { return err } + case "gcs": + err := UploadToGcs(ctx, dstClient, dstPath, srcPath, reader) + if err != nil { + return err + } default: return fmt.Errorf(dstPrefix + " not implemented") } @@ -301,6 +318,17 @@ func initClient(ctx context.Context, existingClient interface{}, prefix, path, t } newClient = client + case "gcs": + if isTestedAndClientExists(prefix, tested, existingClient) { + newClient = existingClient + break + } + client, err := GetClientToGcs(ctx, path) + if err != nil { + return nil, "", err + } + newClient = client + default: return nil, "", fmt.Errorf(prefix + " not implemented") }