Skip to content

Commit

Permalink
Merge pull request #1 from IvanovOleg/master
Browse files Browse the repository at this point in the history
Added support for Azure Blob Storage
  • Loading branch information
maorfr committed Nov 13, 2018
2 parents f396167 + fe1c55a commit cef955d
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 2 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ skbn cp \
--dst k8s://<namespace>/<podName>/<containerName>/<path>
```

### Copy files from Kubernetes to Azure Blob Storage

```
skbn cp \
--src k8s://<namespace>/<podName>/<containerName>/<path> \
--dst abs://<account>/<container>/<path>
```

### Copy files from Azure Blob Storage to Kubernetes

```
skbn cp \
--src abs://<account>/<container>/<path> \
--dst k8s://<namespace>/<podName>/<containerName>/<path>
```

## Credentials


Expand All @@ -54,7 +70,11 @@ Skbn tries to get credentials in the following order:

Skbn uses the default AWS [credentials chain](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html).

### Azure Blob Storage

Skbn uses `AZURE_STORAGE_ACCOUNT` and `AZURE_STORAGE_ACCESS_KEY ` environment variables.

## Examples

1. [In-cluster example](https://github.com/nuvo/skbn/tree/master/examples/in-cluster)
2. [Code example](https://github.com/nuvo/skbn/tree/master/examples/code)
2. [Code example](https://github.com/nuvo/skbn/tree/master/examples/code)
9 changes: 8 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ import:
- aws
- aws/session
- service/s3
- package: github.com/Azure/azure-pipeline-go
version: "v0.1.8"
subpackages:
- pipeline
- package: github.com/Azure/azure-storage-blob-go
version: "0.3.0"
subpackages:
- azblob
- package: github.com/spf13/cobra
version: v0.0.3
- package: k8s.io/api
Expand All @@ -22,4 +30,3 @@ import:
- kubernetes
- rest
- tools/clientcmd

245 changes: 245 additions & 0 deletions pkg/skbn/abs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package skbn

import (
"bytes"
"context"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
)

var err error

func validateAbsPath(pathSplit []string) error {
if len(pathSplit) >= 1 {
return nil
}
return fmt.Errorf("illegal path: %s", filepath.Join(pathSplit...))
}

func initAbsVariables(split []string) (string, string, string) {
account := split[0]
container := split[1]
path := filepath.Join(split[2:]...)

return account, container, path
}

func getNewPipeline() (pipeline.Pipeline, error) {
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")

if len(accountName) == 0 || len(accountKey) == 0 {
err := errors.New("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
return nil, err
}

credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)

if err != nil {
return nil, err
}

po := azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: 3,
TryTimeout: time.Second * 3,
RetryDelay: time.Second * 1,
MaxRetryDelay: time.Second * 3,
},
}

pl := azblob.NewPipeline(credential, po)

return pl, nil
}

func getServiceURL(pl pipeline.Pipeline, accountName string) (azblob.ServiceURL, error) {
URL, err := url.Parse(
fmt.Sprintf("https://%s.blob.core.windows.net/", accountName))

if err != nil {
return azblob.ServiceURL{}, err
}

surl := azblob.NewServiceURL(*URL, pl)
return surl, nil
}

func getContainerURL(pl pipeline.Pipeline, accountName string, containerName string) (azblob.ContainerURL, error) {
URL, err := url.Parse(
fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))

if err != nil {
return azblob.ContainerURL{}, err
}

curl := azblob.NewContainerURL(*URL, pl)
return curl, nil
}

func getBlobURL(curl azblob.ContainerURL, blob string) azblob.BlockBlobURL {
return curl.NewBlockBlobURL(blob)
}

func listContainers(ctx context.Context, surl azblob.ServiceURL) ([]azblob.ContainerItem, error) {
lc, err := surl.ListContainersSegment(ctx, azblob.Marker{}, azblob.ListContainersSegmentOptions{})

if err != nil {
return nil, err
}

return lc.ContainerItems, nil
}

func containerExists(list []azblob.ContainerItem, containerName string) bool {
exists := false
for _, v := range list {
if containerName == v.Name {
exists = true
}
}
return exists
}

// GetClientToAbs checks the connection to azure blob storage and returns the tested client (pipeline)
func GetClientToAbs(ctx context.Context, path string) (pipeline.Pipeline, error) {
pSplit := strings.Split(path, "/")
a, c, _ := initAbsVariables(pSplit)
pl, err := getNewPipeline()

if err != nil {
return nil, err
}

su, err := getServiceURL(pl, a)

if err != nil {
return nil, err
}

lc, err := listContainers(ctx, su)

if err != nil {
return nil, err
}

if !containerExists(lc, c) {
err := errors.New("Azure Blob Storage container doesn't exist")
return nil, err
}

return pl, nil
}

// GetListOfFilesFromAbs gets list of files in path from azure blob storage (recursive)
func GetListOfFilesFromAbs(ctx context.Context, iClient interface{}, path string) ([]string, error) {
pSplit := strings.Split(path, "/")

if err := validateAbsPath(pSplit); err != nil {
return nil, err
}

a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return nil, err
}

bl := []string{}

for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := cu.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{})

if err != nil {
return nil, err
}

marker = listBlob.NextMarker

for _, blobInfo := range listBlob.Segment.BlobItems {
if strings.Contains(blobInfo.Name, p) {
bl = append(bl, strings.Replace(blobInfo.Name, p, "", 1))
}
}
}

return bl, nil
}

// DownloadFromAbs downloads a single file from azure blob storage
func DownloadFromAbs(ctx context.Context, iClient interface{}, path string) ([]byte, error) {
pSplit := strings.Split(path, "/")

if err := validateAbsPath(pSplit); err != nil {
return nil, err
}

a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return nil, err
}

bu := getBlobURL(cu, p)
dr, err := bu.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)

if err != nil {
return nil, err
}

bs := dr.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
dd := bytes.Buffer{}
_, err = dd.ReadFrom(bs)

if err != nil {
return nil, err
}

return dd.Bytes(), nil
}

// UploadToAbs uploads a single file to azure blob storage
func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath string, buffer []byte) error {
pSplit := strings.Split(toPath, "/")

if err := validateAbsPath(pSplit); err != nil {
return err
}

if len(pSplit) == 1 {
_, fn := filepath.Split(fromPath)
pSplit = append(pSplit, fn)
}

a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return err
}

bu := getBlobURL(cu, p)

_, err = azblob.UploadBufferToBlockBlob(ctx, buffer, bu, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})

if err != nil {
return err
}

return nil
}
Loading

0 comments on commit cef955d

Please sign in to comment.