Skip to content

Commit

Permalink
Merge 94ef27b into f62a3c3
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-a committed Sep 9, 2018
2 parents f62a3c3 + 94ef27b commit a825d94
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 161 deletions.
200 changes: 151 additions & 49 deletions cmd/uplink/cmd/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,107 +4,209 @@
package cmd

import (
"context"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/minio/minio/pkg/hash"
"github.com/spf13/cobra"
"github.com/zeebo/errs"

"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/paths"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/storage/buckets"
"storj.io/storj/pkg/storage/objects"
"storj.io/storj/pkg/utils"
)

var (
cpCfg Config
cpCmd = &cobra.Command{
func init() {
addCmd(&cobra.Command{
Use: "cp",
Short: "A brief description of your command",
RunE: copy,
Short: "Copies a local file or Storj object to another location locally or in Storj",
RunE: copyMain,
})
}

func cleanAbsPath(path string) string {
prefix := strings.HasSuffix(path, "/")
path = filepath.Join("/", path)
if !strings.HasSuffix(path, "/") && prefix {
path += "/"
}
)
return path
}

func init() {
RootCmd.AddCommand(cpCmd)
cfgstruct.Bind(cpCmd.Flags(), &cpCfg, cfgstruct.ConfDir(defaultConfDir))
cpCmd.Flags().String("config", filepath.Join(defaultConfDir, "config.yaml"), "path to configuration")
// upload uploads args[0] from local machine to s3 compatible object args[1]
func upload(ctx context.Context, bs buckets.Store, srcFile string, destObj *url.URL) error {
if destObj.Scheme == "" {
return fmt.Errorf("Invalid destination")
}

destObj.Path = cleanAbsPath(destObj.Path)
// if object name not specified, default to filename
if strings.HasSuffix(destObj.Path, "/") {
destObj.Path = filepath.Join(destObj.Path, filepath.Base(srcFile))
}

f, err := os.Open(srcFile)
if err != nil {
return err
}
defer utils.LogClose(f)

o, err := bs.GetObjectStore(ctx, destObj.Host)
if err != nil {
return err
}

meta := objects.SerializableMeta{}
expTime := time.Time{}

_, err = o.Put(ctx, paths.New(destObj.Path), f, meta, expTime)
if err != nil {
return err
}

fmt.Printf("Created: %s\n", destObj.Path)

return nil
}

func copy(cmd *cobra.Command, args []string) (err error) {
ctx := process.Ctx(cmd)
// download downloads s3 compatible object args[0] to args[1] on local machine
func download(ctx context.Context, bs buckets.Store, srcObj *url.URL, destFile string) error {
if srcObj.Scheme == "" {
return fmt.Errorf("Invalid source")
}

if len(args) == 0 {
return errs.New("No file specified for copy")
o, err := bs.GetObjectStore(ctx, srcObj.Host)
if err != nil {
return err
}

if len(args) == 1 {
return errs.New("No destination specified")
if fi, err := os.Stat(destFile); err == nil && fi.IsDir() {
destFile = filepath.Join(destFile, filepath.Base(srcObj.Path))
}

so, err := getStorjObjects(ctx, cpCfg)
f, err := os.Create(destFile)
if err != nil {
return err
}
defer utils.LogClose(f)

u, err := url.Parse(args[0])
rr, _, err := o.Get(ctx, paths.New(srcObj.Path))
if err != nil {
return err
}
defer utils.LogClose(rr)

if u.Scheme == "" {
f, err := os.Open(args[0])
if err != nil {
return err
}
r, err := rr.Range(ctx, 0, rr.Size())
if err != nil {
return err
}
defer utils.LogClose(r)

fi, err := f.Stat()
if err != nil {
return err
}
_, err = io.Copy(f, r)
if err != nil {
return err
}

fr, err := hash.NewReader(f, fi.Size(), "", "")
if err != nil {
return err
}
fmt.Printf("Downloaded %s to %s\n", srcObj.Path, destFile)

defer func() { _ = f.Close() }()
return nil
}

u, err = url.Parse(args[1])
if err != nil {
return err
}
// copy copies s3 compatible object args[0] to s3 compatible object args[1]
func copy(ctx context.Context, bs buckets.Store, srcObj *url.URL, destObj *url.URL) error {
o, err := bs.GetObjectStore(ctx, srcObj.Host)
if err != nil {
return err
}

oi, err := so.PutObject(ctx, u.Host, u.Path, fr, nil)
rr, _, err := o.Get(ctx, paths.New(srcObj.Path))
if err != nil {
return err
}
defer utils.LogClose(rr)

r, err := rr.Range(ctx, 0, rr.Size())
if err != nil {
return err
}
defer utils.LogClose(r)

if destObj.Host != srcObj.Host {
o, err = bs.GetObjectStore(ctx, destObj.Host)
if err != nil {
return err
}
}

fmt.Println("Bucket:", oi.Bucket)
fmt.Println("Object:", oi.Name)
meta := objects.SerializableMeta{}
expTime := time.Time{}

return nil
destObj.Path = cleanAbsPath(destObj.Path)
// if destination object name not specified, default to source object name
if strings.HasSuffix(destObj.Path, "/") {
destObj.Path = filepath.Join(destObj.Path, filepath.Base(srcObj.Path))
}

oi, err := so.GetObjectInfo(ctx, u.Host, u.Path)
_, err = o.Put(ctx, paths.New(destObj.Path), r, meta, expTime)
if err != nil {
return err
}

f, err := os.Create(args[1])
fmt.Printf("%s copied to %s\n", srcObj.Host+srcObj.Path, destObj.Host+destObj.Path)

return nil
}

// copyMain is the function executed when cpCmd is called
func copyMain(cmd *cobra.Command, args []string) (err error) {
if len(args) == 0 {
return fmt.Errorf("No object specified for copy")
}
if len(args) == 1 {
return fmt.Errorf("No destination specified")
}

ctx := process.Ctx(cmd)

u0, err := utils.ParseURL(args[0])
if err != nil {
return err
}

defer func() { _ = f.Close() }()
u1, err := utils.ParseURL(args[1])
if err != nil {
return err
}

err = so.GetObject(ctx, oi.Bucket, oi.Name, 0, oi.Size, f, oi.ETag)
bs, err := cfg.BucketStore(ctx)
if err != nil {
return err
}

fmt.Printf("Downloaded %s to %s", oi.Bucket+oi.Name, args[1])
// if uploading
if u0.Scheme == "" {
if u1.Host == "" {
return fmt.Errorf("No bucket specified. Please use format sj://bucket/")
}

return nil
return upload(ctx, bs, args[0], u1)
}

// if downloading
if u1.Scheme == "" {
if u0.Host == "" {
return fmt.Errorf("No bucket specified. Please use format sj://bucket/")
}

return download(ctx, bs, u0, args[1])
}

// if copying from one remote location to another
return copy(ctx, bs, u0, u1)
}

0 comments on commit a825d94

Please sign in to comment.