-
Notifications
You must be signed in to change notification settings - Fork 40
/
uploader.go
216 lines (188 loc) · 6.71 KB
/
uploader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// FIXME: golangci-lint
// nolint:govet,revive
package files
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/redhatinsights/edge-api/config"
)
// Uploader is an interface for uploading repository
type Uploader interface {
UploadRepo(src string, account string, acl string) (string, error)
UploadFile(fname string, uploadPath string) (string, error)
}
// NewUploader returns the uploader used by EdgeAPI based on configurations
func NewUploader(log log.FieldLogger) Uploader {
cfg := config.Get()
var uploader Uploader
uploader = &LocalUploader{
BaseDir: "/tmp",
log: log,
}
if !cfg.Local {
uploader = NewS3Uploader(log, GetNewS3Client())
}
return uploader
}
// S3Uploader defines the mechanism to upload data to S3
type S3Uploader struct {
Client S3ClientInterface
Bucket string
log log.FieldLogger
}
// LocalUploader isn't actually an uploader but implements the interface in
// order to allow the workflow to be done to completion on a local machine
// without S3
type LocalUploader struct {
BaseDir string
log log.FieldLogger
}
// UploadRepo just returns the src repo folder
// It doesnt do anything and it doesn't delete the original folder
// It returns error if the repo is not using u.BaseDir as its base folder
// Allowing offline development without S3 and satisfying the interface
func (u *LocalUploader) UploadRepo(src string, account string, acl string) (string, error) {
if strings.HasPrefix(src, u.BaseDir) {
return src, nil
}
return "", fmt.Errorf("invalid folder to upload on local uploader")
}
// UploadFile basically copies a file to the local server path
// Allowing offline development without S3 and satisfying the interface
func (u *LocalUploader) UploadFile(fname string, uploadPath string) (string, error) {
destfile := filepath.Clean(u.BaseDir + "/" + uploadPath)
u.log.WithFields(log.Fields{"fname": fname, "destfine": destfile}).Debug("Copying fname to destfile")
cmd := exec.Command("cp", fname, destfile) //#nosec G204 - This uploadPath variable is actually controlled by the calling method
err := cmd.Run()
if err != nil {
return "", err
}
return destfile, nil
}
// NewS3Uploader return a new S3Uploader
func NewS3Uploader(log log.FieldLogger, client S3ClientInterface) *S3Uploader {
cfg := config.Get()
return &S3Uploader{
Client: client,
Bucket: cfg.BucketName,
log: log,
}
}
// Struct that contains all details required to upload a file to a destination
type uploadDetails struct {
fileName string
uploadPath string
uploader *S3Uploader
done chan bool
count int
}
func (u *S3Uploader) worker(uploadQueue chan *uploadDetails, acl string, cfg *config.EdgeConfig) {
retryDelay := time.Duration(cfg.RepoFileUploadDelay)
for p := range uploadQueue {
// attempt to upload a file multiple times before erroring
for attempt := 1; attempt <= int(cfg.RepoFileUploadAttempts); attempt++ {
fname, err := p.uploader.UploadFileWithACL(p.fileName, p.uploadPath, acl)
// log on file upload failure and retry
if err != nil {
u.log.WithFields(log.Fields{"fname": fname, "attempt": attempt, "count": p.count, "error": err.Error()}).Error("Error uploading file")
time.Sleep(retryDelay * time.Second)
continue
}
// if upload succeeds on retry, log Info to show error is resolved, else use Trace on first try
if attempt > 1 {
u.log.WithFields(log.Fields{"fname": fname, "attempt": attempt, "count": p.count}).Info("File was uploaded successfully")
} else {
u.log.WithFields(log.Fields{"fname": fname, "attempt": attempt, "count": p.count}).Trace("File was uploaded successfully")
}
break
}
p.done <- true
}
}
// UploadRepo uploads the repo to a backing object storage bucket
// the repository is uploaded to bucket/$account/$name/ with ACL "private" or "public-read"
func (u *S3Uploader) UploadRepo(src string, account string, acl string) (string, error) {
cfg := config.Get()
if acl == "" {
acl = "private"
}
u.log = u.log.WithFields(log.Fields{"src": src, "account": account})
u.log.Info("Uploading repo")
// Wait group is created per request
// this allows multiple repo's to be independently uploaded simultaneously
count := 0
var uploadDetailsList []*uploadDetails
if err := filepath.Walk(src, func(path string, info os.FileInfo, err error) error {
if err != nil {
u.log.WithField("error", err.Error()).Error("Error opening file")
}
if info.IsDir() {
return nil
}
res := new(uploadDetails)
res.fileName = path
res.uploadPath = fmt.Sprintf("%s/%s", account, strings.TrimPrefix(path, cfg.RepoTempPath))
res.uploader = u
res.count = count
res.done = make(chan bool)
uploadDetailsList = append(uploadDetailsList, res)
count++
return nil
}); err != nil {
u.log.WithField("error", err.Error()).Error("Error walking directory")
return "", err
}
log.WithField("fileCount", len(uploadDetailsList)).Debug("Files are being uploaded....")
uploadQueue := make(chan *uploadDetails, len(uploadDetailsList))
for _, u := range uploadDetailsList {
uploadQueue <- u
}
numberOfWorkers := cfg.UploadWorkers
for i := 0; i < numberOfWorkers; i++ {
go u.worker(uploadQueue, acl, cfg)
}
for i, ud := range uploadDetailsList {
<-ud.done
u.log.WithField("index", i).Trace("File is done")
close(ud.done)
}
close(uploadQueue)
u.log.Debug("Channel is closed...")
region := config.Get().BucketRegion
s3URL := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s/%s", u.Bucket, region, account, strings.TrimPrefix(src, cfg.RepoTempPath))
u.log.WithField("s3URL", s3URL).Info("Files are done uploading...")
return s3URL, nil
}
// UploadFileWithACL upload a file from local file system to remote s3 bucket location using the acl supplied.
// uploadFile takes a Filename path as a string and then uploads that to
func (u *S3Uploader) UploadFileWithACL(fname string, uploadPath string, acl string) (string, error) {
f, err := os.Open(filepath.Clean(fname))
if err != nil {
return "", fmt.Errorf("failed to open file %q, %v", fname, err)
}
if acl == "" {
acl = "private"
}
// Upload the file to S3.
_, err = u.Client.PutObject(f, u.Bucket, uploadPath, acl)
if err != nil {
u.log.WithField("error", err.Error()).Error("Error uploading to AWS S3")
return "", err
}
if err := f.Close(); err != nil {
u.log.WithField("error", err.Error()).Error("Error closing file")
return "", err
}
region := config.Get().BucketRegion
s3URL := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", u.Bucket, region, uploadPath)
return s3URL, nil
}
// UploadFile takes a Filename path as a string and then uploads that to the supplied location in s3
func (u *S3Uploader) UploadFile(fname string, uploadPath string) (string, error) {
return u.UploadFileWithACL(fname, uploadPath, "private")
}