/
data.go
151 lines (127 loc) · 3.28 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//
// command/data.go
//
// Copyright (c) 2016-2017 Junpei Kawamoto
//
// This file is part of Roadie.
//
// Roadie is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Roadie is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Roadie. If not, see <http://www.gnu.org/licenses/>.
//
package command
import (
"fmt"
"net/url"
"path"
"path/filepath"
"runtime"
"sync"
"golang.org/x/sync/errgroup"
"github.com/jkawamoto/roadie/cloud"
"github.com/jkawamoto/roadie/script"
"github.com/urfave/cli"
)
// optDataPut defines options for data put command.
type optDataPut struct {
*Metadata
// Filename is a path for an actual file or a glob pattern.
Filename string
// StoredName is a path where the file to be stored.
// If Filename is a glob pattern, matched files will be uploaded in a
// directory named by this argument.
StoredName string
}
// run defines process of data put command.
func (o *optDataPut) run() (err error) {
filenames, err := filepath.Glob(o.Filename)
if err != nil {
return
} else if len(filenames) == 0 {
return fmt.Errorf("no files matching %q", o.Filename)
}
service, err := o.StorageManager()
if err != nil {
return
}
storage := cloud.NewStorage(service, o.Spinner.Writer)
wg, ctx := errgroup.WithContext(o.Context)
semaphore := make(chan struct{}, runtime.NumCPU()-1)
var outputLock sync.Mutex
var outputs []string
for _, target := range filenames {
select {
case <-ctx.Done():
return ctx.Err()
case semaphore <- struct{}{}:
func(target string) {
wg.Go(func() (err error) {
defer func() { <-semaphore }()
var output string
if o.StoredName == "" {
output = filepath.Base(target)
} else if len(filenames) == 1 {
output = o.StoredName
} else {
output = path.Join(o.StoredName, filepath.Base(target))
}
var loc *url.URL
loc, err = createURL(script.DataPrefix, output)
if err != nil {
return
}
err = storage.UploadFile(ctx, loc, target)
if err != nil {
return
}
outputLock.Lock()
defer outputLock.Unlock()
outputs = append(outputs, fmt.Sprintf("%v -> %v", target, loc))
return
})
}(target)
}
}
err = wg.Wait()
if err != nil {
return
}
for _, v := range outputs {
fmt.Fprintln(o.Stdout, v)
}
return
}
// CmdDataPut uploads a given file.
func CmdDataPut(c *cli.Context) (err error) {
// Checn the number of arguments.
n := c.NArg()
if n < 1 || n > 2 {
fmt.Printf("expected 1 or 2 arguments. (%d given)\n", n)
return cli.ShowSubcommandHelp(c)
}
// Get metadata.
m, err := getMetadata(c)
if err != nil {
return
}
// Set up options.
opt := &optDataPut{
Metadata: m,
Filename: c.Args().First(),
StoredName: filepath.ToSlash(c.Args().Get(1)),
}
// Execute the command.
if err := opt.run(); err != nil {
err = cli.NewExitError(err.Error(), 2)
}
return
}