/
dataset_upload.go
126 lines (102 loc) · 3.09 KB
/
dataset_upload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package cmd
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"github.com/jessevdk/go-flags"
"github.com/pkg/errors"
"github.com/nerdalize/nerd/pkg/transfer"
"github.com/nerdalize/nerd/svc"
"github.com/mitchellh/cli"
homedir "github.com/mitchellh/go-homedir"
)
//DatasetUpload command
type DatasetUpload struct {
Name string `long:"name" short:"n" description:"assign a name to the dataset"`
*command
}
//DatasetUploadFactory creates the command
func DatasetUploadFactory(ui cli.Ui) cli.CommandFactory {
cmd := &DatasetUpload{}
cmd.command = createCommand(ui, cmd.Execute, cmd.Description, cmd.Usage, cmd, &TransferOpts{}, flags.PassAfterNonOption, "nerd dataset upload")
return func() (cli.Command, error) {
return cmd, nil
}
}
//Execute runs the command
func (cmd *DatasetUpload) Execute(args []string) (err error) {
sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
if len(args) < 1 {
return errShowUsage(fmt.Sprintf(MessageNotEnoughArguments, 1, ""))
} else if len(args) > 1 {
return errShowUsage(fmt.Sprintf(MessageTooManyArguments, 1, ""))
}
//Expand tilde for homedir
dir, err := homedir.Expand(args[0])
if err != nil {
return renderServiceError(err, "failed to expand home directory in dataset local path")
}
dir, err = filepath.Abs(dir)
if err != nil {
return renderServiceError(err, "failed to turn local path into absolute path")
}
// check if directory exists
_, err = os.Open(dir)
if err != nil {
return errors.Wrap(err, "failed to upload dataset")
}
deps, err := NewDeps(cmd.Logger(), cmd.globalOpts.KubeOpts)
if err != nil {
return renderConfigError(err, "failed to configure")
}
kube := svc.NewKube(deps)
t, ok := cmd.advancedOpts.(*TransferOpts)
if !ok {
return renderConfigError(fmt.Errorf("unable to use transfer options"), "failed to configure")
}
mgr, sto, sta, err := t.TransferManager(kube)
if err != nil {
return errors.Wrap(err, "failed to setup transfer manager")
}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var h transfer.Handle
if h, err = mgr.Create(
ctx,
cmd.Name,
*sto,
*sta,
); err != nil {
return renderServiceError(err, "failed to create dataset with name '%s'", cmd.Name)
}
defer h.Close()
go func() {
<-sigCh
cancel()
}()
err = h.Push(ctx, dir, &progressBarReporter{})
if err != nil {
ctx := context.Background() //new context for deletion
e := mgr.Remove(ctx, h.Name())
if e != nil {
return errors.Wrapf(err, "failed to remove dataset: %v", e)
}
return renderServiceError(err, "failed to upload dataset")
}
cmd.out.Infof("Uploaded dataset: '%s'", h.Name())
cmd.out.Infof("To run a job with a dataset, use: 'nerd job run'")
return nil
}
// Description returns long-form help text
func (cmd *DatasetUpload) Description() string { return cmd.Synopsis() }
// Synopsis returns a one-line
func (cmd *DatasetUpload) Synopsis() string { return "Upload a dataset to your compute cluster." }
// Usage shows usage
func (cmd *DatasetUpload) Usage() string {
return "nerd dataset upload [OPTIONS] DIR_TO_UPLOAD"
}