/
s3.go
134 lines (114 loc) · 3.17 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package storage
import (
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/Kasita-Inc/gadget/errors"
"github.com/Kasita-Inc/gadget/log"
)
var publicACL = "public-read"
// Bucket wraps the S3 downloader with an in memory cache
type Bucket struct {
bucket string
key string
// ACL Amazon S3 access control lists value
ACL string
}
// NewS3 returns a Bucket with an S3 downloader
func NewS3(bucket, key string) *Bucket {
return &Bucket{
bucket: bucket,
key: key,
ACL: publicACL,
}
}
func newSession() (*session.Session, errors.TracerError) {
session, err := session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
})
return session, errors.Wrap(err)
}
// ReadObject downloads a file from s3 into a byte array
func (b *Bucket) ReadObject() ([]byte, errors.TracerError) {
data := aws.NewWriteAtBuffer([]byte{})
session, tracerError := newSession()
if tracerError != nil {
return nil, tracerError
}
downloader := s3manager.NewDownloader(session)
_, err := downloader.Download(data,
&s3.GetObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(b.key),
})
if err != nil {
log.Errorf("Issue loading from S3, %s/%s (%s)", b.bucket, b.key, err)
return nil, errors.Wrap(err)
}
return data.Bytes(), nil
}
// WriteObject writes an object to a file in s3
func (b *Bucket) WriteObject(p []byte) errors.TracerError {
session, tracerError := newSession()
if tracerError != nil {
return tracerError
}
uploader := s3manager.NewUploader(session)
upParams := &s3manager.UploadInput{
Bucket: &b.bucket,
Key: &b.key,
Body: bytes.NewReader(p),
ACL: &b.ACL,
}
_, err := uploader.Upload(upParams)
if nil != err {
log.Errorf("Issue writing to S3, %s/%s (%s)", b.bucket, b.key, err)
return errors.Wrap(err)
}
return nil
}
// List the contents of a bucket with the given prefix
func (b *Bucket) List(prefix string, startAfter string) (*s3.ListObjectsV2Output, errors.TracerError) {
session, tracerError := newSession()
if tracerError != nil {
return nil, tracerError
}
svc := s3.New(session)
input := &s3.ListObjectsV2Input{
Bucket: aws.String(b.bucket),
Prefix: aws.String(prefix),
StartAfter: aws.String(startAfter),
}
result, err := svc.ListObjectsV2(input)
if err != nil {
return nil, errors.Wrap(err)
}
return result, nil
}
// PruneByPrefix will remove objects over maxObjects from an S3 bucket with the given prefix
func PruneByPrefix(bucket string, prefix string, maxObjects int) {
svc := s3.New(session.New())
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
}
objects, err := svc.ListObjectsV2(input)
if nil != err {
return
}
if len(objects.Contents) > maxObjects {
maxIdx := len(objects.Contents) - maxObjects
for _, obj := range objects.Contents[:maxIdx] {
delInp := &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: obj.Key,
}
_, err = svc.DeleteObject(delInp)
if nil != err {
log.Errorf("prune failed for %s/%s\n%#v", bucket, *obj.Key, err)
}
}
}
}