/
config.go
133 lines (122 loc) · 3.91 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
Copyright © 2018 the InMAP authors.
This file is part of InMAP.
InMAP is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
InMAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with InMAP. If not, see <http://www.gnu.org/licenses/>.
*/
package cloud
import (
"context"
"fmt"
"net/url"
"path/filepath"
"strings"
"github.com/spatialmodel/inmap/cloud/cloudrpc"
"github.com/spf13/pflag"
)
// jobOutputAddresses returns the locations of where the output files of the job
// with the given name, belonging to the given user, with the given command arguments,
// will be stored.
func (c *Client) jobOutputAddresses(ctx context.Context, name string, cmd []string) (map[string]string, error) {
outputFiles := make(map[string]struct{})
for _, f := range c.outputFileArgs {
outputFiles[f] = struct{}{}
}
user, err := getUser(ctx)
if err != nil {
return nil, err
}
o := make(map[string]string)
execCmd, _, err := c.root.Find(cmd[1:])
if err != nil {
return nil, fmt.Errorf("cloud: couldn't find command %v: %v", cmd[1:], err)
}
flags := execCmd.InheritedFlags()
flags.AddFlagSet(execCmd.LocalFlags())
flags.VisitAll(func(f *pflag.Flag) {
if _, ok := outputFiles[f.Name]; ok { // Is this an output file?
ext := filepath.Ext(f.Value.String())
o[f.Name] = fmt.Sprintf("%s/%s/%s/%s%s", c.bucketName, user, name, strings.Replace(f.Name, ".", "_", -1), ext)
}
})
return o, nil
}
func (c *Client) checkOutputs(ctx context.Context, name string, cmd []string) error {
addrs, err := c.jobOutputAddresses(ctx, name, cmd)
if err != nil {
return err
}
bucket, err := OpenBucket(ctx, c.bucketName)
if err != nil {
return fmt.Errorf("cloud: opening bucket %s: %v", c.bucketName, err)
}
for _, addr := range addrs {
for _, fname := range expandShp(addr) {
url, err := url.Parse(fname)
if err != nil {
return fmt.Errorf("cloud: parsing URL %s: %v", fname, err)
}
key := strings.TrimLeft(url.Path, "/")
r, err := bucket.NewReader(ctx, key, nil)
if err != nil {
return fmt.Errorf("cloud: opening reader for `%s`: %v", key, err)
}
if r.Size() == 0 {
return fmt.Errorf("cloud: output file `%s` is zero-length: %v", key, err)
}
r.Close()
}
}
return nil
}
// setOutputPaths changes the paths of the output files in the given
// job specification so that they match
// the locations where the files should be stored.
func (c *Client) setOutputPaths(ctx context.Context, job *cloudrpc.JobSpec) error {
addrs, err := c.jobOutputAddresses(ctx, job.Name, job.Cmd)
if err != nil {
return err
}
for i, arg := range job.Args {
if addr, ok := addrs[strings.TrimLeft(arg, "--")]; ok {
job.Args[i+1] = addr
}
}
return nil
}
// stageInputs stages the input data in blob storage and replaces the input
// file locations with the actual locations of the staged input files.
func (c *Client) stageInputs(ctx context.Context, job *cloudrpc.JobSpec) error {
bucket, err := OpenBucket(ctx, c.bucketName)
if err != nil {
return err
}
url, err := url.Parse(c.bucketName)
if err != nil {
return fmt.Errorf("inmap/cloud: staging inputs: %v", err)
}
user, err := getUser(ctx)
if err != nil {
return err
}
for fname, data := range job.FileData {
filePath := strings.TrimPrefix(url.Path+"/"+user+"/"+job.Name+"/"+fname, "/")
if err := writeBlob(ctx, bucket, filePath, data); err != nil {
return err
}
for i, arg := range job.Args {
if strings.Contains(arg, fname) {
job.Args[i] = strings.Replace(arg, fname, url.Scheme+"://"+url.Hostname()+"/"+filePath, -1)
}
}
}
return nil
}