forked from evergreen-ci/evergreen
/
s3_get.go
254 lines (210 loc) · 7.24 KB
/
s3_get.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package command
import (
"context"
"net/http"
"os"
"path/filepath"
"time"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/evergreen-ci/evergreen/model"
"github.com/evergreen-ci/evergreen/rest/client"
"github.com/evergreen-ci/evergreen/util"
"github.com/evergreen-ci/pail"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
// A plugin command to fetch a resource from an s3 bucket and download it to
// the local machine.
type s3get struct {
// AwsKey and AwsSecret are the user's credentials for
// authenticating interactions with s3.
AwsKey string `mapstructure:"aws_key" plugin:"expand"`
AwsSecret string `mapstructure:"aws_secret" plugin:"expand"`
// RemoteFile is the filepath of the file to get, within its bucket
RemoteFile string `mapstructure:"remote_file" plugin:"expand"`
// Bucket is the s3 bucket holding the desired file
Bucket string `mapstructure:"bucket" plugin:"expand"`
// BuildVariants stores a list of MCI build variants to run the command for.
// If the list is empty, it runs for all build variants.
BuildVariants []string `mapstructure:"build_variants" plugin:"expand"`
// Only one of these two should be specified. local_file indicates that the
// s3 resource should be downloaded as-is to the specified file, and
// extract_to indicates that the remote resource is a .tgz file to be
// downloaded to the specified directory.
LocalFile string `mapstructure:"local_file" plugin:"expand"`
ExtractTo string `mapstructure:"extract_to" plugin:"expand"`
bucket pail.Bucket
base
}
func s3GetFactory() Command { return &s3get{} }
func (c *s3get) Name() string { return "s3.get" }
// s3get-specific implementation of ParseParams.
func (c *s3get) ParseParams(params map[string]interface{}) error {
if err := mapstructure.Decode(params, c); err != nil {
return errors.Wrapf(err, "error decoding %v params", c.Name())
}
// make sure the command params are valid
if err := c.validateParams(); err != nil {
return errors.Wrapf(err, "error validating %v params", c.Name())
}
return nil
}
// Validate that all necessary params are set, and that only one of
// local_file and extract_to is specified.
func (c *s3get) validateParams() error {
if c.AwsKey == "" {
return errors.New("aws_key cannot be blank")
}
if c.AwsSecret == "" {
return errors.New("aws_secret cannot be blank")
}
if c.RemoteFile == "" {
return errors.New("remote_file cannot be blank")
}
// make sure the bucket is valid
if err := validateS3BucketName(c.Bucket); err != nil {
return errors.Wrapf(err, "%v is an invalid bucket name", c.Bucket)
}
// make sure local file and extract-to dir aren't both specified
if c.LocalFile != "" && c.ExtractTo != "" {
return errors.New("cannot specify both local_file and extract_to directory")
}
// make sure one is specified
if c.LocalFile == "" && c.ExtractTo == "" {
return errors.New("must specify either local_file or extract_to")
}
return nil
}
func (c *s3get) shouldRunForVariant(buildVariantName string) bool {
//No buildvariant filter, so run always
if len(c.BuildVariants) == 0 {
return true
}
//Only run if the buildvariant specified appears in our list.
return util.StringSliceContains(c.BuildVariants, buildVariantName)
}
// Apply the expansions from the relevant task config to all appropriate
// fields of the s3get.
func (c *s3get) expandParams(conf *model.TaskConfig) error {
return util.ExpandValues(c, conf.Expansions)
}
// Implementation of Execute. Expands the parameters, and then fetches the
// resource from s3.
func (c *s3get) Execute(ctx context.Context,
comm client.Communicator, logger client.LoggerProducer, conf *model.TaskConfig) error {
// expand necessary params
if err := c.expandParams(conf); err != nil {
return err
}
// validate the params
if err := c.validateParams(); err != nil {
return errors.Wrap(err, "expanded params are not valid")
}
// create pail bucket
client := util.GetHTTPClient()
defer util.PutHTTPClient(client)
err := c.createPailBucket(client)
if err != nil {
return errors.Wrap(err, "problem connecting to s3")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := c.bucket.Check(ctx); err != nil {
return errors.Wrap(err, "invalid pail bucket")
}
if !c.shouldRunForVariant(conf.BuildVariant.Name) {
logger.Task().Infof("Skipping S3 get of remote file %v for variant %v",
c.RemoteFile, conf.BuildVariant.Name)
return nil
}
// if the local file or extract_to is a relative path, join it to the
// working dir
if c.LocalFile != "" {
if !filepath.IsAbs(c.LocalFile) {
c.LocalFile = filepath.Join(conf.WorkDir, c.LocalFile)
}
if err := createEnclosingDirectoryIfNeeded(c.LocalFile); err != nil {
return errors.WithStack(err)
}
}
if c.ExtractTo != "" {
if !filepath.IsAbs(c.ExtractTo) {
c.ExtractTo = filepath.Join(conf.WorkDir, c.ExtractTo)
}
if err := createEnclosingDirectoryIfNeeded(c.ExtractTo); err != nil {
return errors.WithStack(err)
}
}
errChan := make(chan error)
go func() {
errChan <- errors.WithStack(c.getWithRetry(ctx, logger))
}()
select {
case err := <-errChan:
return errors.WithStack(err)
case <-ctx.Done():
logger.Execution().Info("Received signal to terminate execution of S3 Get Command")
return nil
}
}
// Wrapper around the Get() function to retry it
func (c *s3get) getWithRetry(ctx context.Context, logger client.LoggerProducer) error {
backoffCounter := getS3OpBackoff()
timer := time.NewTimer(0)
defer timer.Stop()
for i := 1; i <= maxS3OpAttempts; i++ {
logger.Task().Infof("fetching %s from s3 bucket %s (attempt %d of %d)",
c.RemoteFile, c.Bucket, i, maxS3OpAttempts)
select {
case <-ctx.Done():
return errors.New("s3 get operation aborted")
case <-timer.C:
err := errors.WithStack(c.get(ctx))
if err == nil {
return nil
}
logger.Execution().Errorf("problem getting %s from s3 bucket, retrying. [%v]",
c.RemoteFile, err)
timer.Reset(backoffCounter.Duration())
}
}
return errors.Errorf("S3 get failed after %d attempts", maxS3OpAttempts)
}
// Fetch the specified resource from s3.
func (c *s3get) get(ctx context.Context) error {
// either untar the remote, or just write to a file
if c.LocalFile != "" {
// remove the file, if it exists
exists, err := util.FileExists(c.LocalFile)
if err != nil {
return errors.Wrapf(err, "error checking existence of local file %v",
c.LocalFile)
}
if exists {
if err := os.RemoveAll(c.LocalFile); err != nil {
return errors.Wrapf(err, "error clearing local file %v", c.LocalFile)
}
}
// download to local file
return errors.Wrapf(c.bucket.Download(ctx, c.RemoteFile, c.LocalFile),
"error downloading %s to %s", c.RemoteFile, c.LocalFile)
}
reader, err := c.bucket.Reader(ctx, c.RemoteFile)
if err != nil {
return errors.WithStack(err)
}
if err := util.ExtractTarball(ctx, reader, c.ExtractTo, []string{}); err != nil {
return errors.Wrapf(err, "problem extracting %s from archive", c.RemoteFile)
}
return nil
}
func (c *s3get) createPailBucket(client *http.Client) error {
opts := pail.S3Options{
Credentials: pail.CreateAWSCredentials(c.AwsKey, c.AwsSecret, ""),
Region: endpoints.UsEast1RegionID,
Name: c.Bucket,
}
bucket, err := pail.NewS3BucketWithHTTPClient(client, opts)
c.bucket = bucket
return err
}