-
Notifications
You must be signed in to change notification settings - Fork 351
/
external_recordings.go
96 lines (83 loc) · 2.37 KB
/
external_recordings.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package simulator
import (
"io/ioutil"
"os"
"path/filepath"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/treeverse/lakefs/pkg/logging"
)
const EtagExtension = "etag"
type externalRecordDownloader struct {
downloader *s3manager.Downloader
}
func NewExternalRecordDownloader(region string) *externalRecordDownloader {
// The session the S3 Downloader will use
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
Credentials: credentials.AnonymousCredentials,
}))
// Create a downloader with the session and default options
downloader := s3manager.NewDownloader(sess)
return &externalRecordDownloader{downloader}
}
func getEtagFileName(path string) string {
return path + "." + EtagExtension
}
func getLocalEtag(path string) (string, error) {
// if etag exists return
etagFileName := getEtagFileName(path)
etag, err := ioutil.ReadFile(etagFileName)
if err == nil {
return string(etag), nil
}
if os.IsNotExist(err) {
return "", nil
}
return "", err
}
func (d *externalRecordDownloader) DownloadRecording(bucket, key, destination string) error {
etag, err := getLocalEtag(destination)
if err != nil {
return err
}
headObject, err := d.downloader.S3.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return err
}
s3Etag := aws.StringValue(headObject.ETag)
if s3Etag == etag {
return nil
}
logging.Default().WithFields(logging.Fields{"bucket": bucket, "key": key, "destination": destination}).Info("download Recording")
// make sure target folder exists
dir := filepath.Dir(destination)
_ = os.MkdirAll(dir, os.ModePerm)
// create file
f, err := os.Create(destination)
if err != nil {
return err
}
defer func() {
_ = f.Close()
}()
// Write the contents of S3 Object to the file
n, err := d.downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return err
}
logging.Default().WithFields(logging.Fields{"file": destination, "bytes": n}).Info("file downloaded")
// write the etag file
etagFileName := getEtagFileName(destination)
err = ioutil.WriteFile(etagFileName, []byte(s3Etag), 0644) //nolint:gosec
return err
}