/
s3.go
130 lines (112 loc) · 3.03 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package fs
import (
"context"
"io"
"net/http"
"os"
"path"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
// S3Config is the configuration for a S3-compatible storage provider
type S3Config struct {
// S3 Bucket to store files
Bucket string `toml:"bucket"`
// Region of the S3 service
Region string `toml:"region"`
// EndpointURL is an HTTP endpoint of the S3 API
EndpointURL string `toml:"endpoint_url"`
// Prefix is a prefix (subfolder) to use to build key names
Prefix string `toml:"prefix"`
}
// S3 implements file storage for S3-compatible providers.
type S3 struct {
api s3iface.S3API
uploader *s3manager.Uploader
bucket string
prefix string
}
func NewS3(c S3Config) (*S3, error) {
cfg := aws.NewConfig().
WithEndpoint(c.EndpointURL).
WithRegion(c.Region).
WithLogger(s3logger{}).
WithLogLevel(aws.LogDebug)
sess, err := session.NewSessionWithOptions(session.Options{Config: *cfg})
if err != nil {
return nil, errors.Wrap(err, "failed to initialize S3 session")
}
return &S3{
api: s3.New(sess),
uploader: s3manager.NewUploader(sess),
bucket: c.Bucket,
prefix: c.Prefix,
}, nil
}
func (s *S3) Open(_name string) (http.File, error) {
return nil, errors.New("serving files from S3 is not supported")
}
func (s *S3) Delete(ctx context.Context, name string) error {
key := s.buildKey(name)
_, err := s.api.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: &s.bucket,
Key: &key,
})
return err
}
func (s *S3) Create(ctx context.Context, name string, reader io.Reader) (int64, error) {
key := s.buildKey(name)
logger := log.WithField("key", key)
logger.Infof("uploading file to %s", s.bucket)
r := &readerWithN{Reader: reader}
_, err := s.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: &s.bucket,
Key: &key,
Body: r,
})
if err != nil {
return 0, errors.Wrap(err, "failed to upload file")
}
logger.Debugf("written %d bytes", r.n)
return int64(r.n), nil
}
func (s *S3) Size(ctx context.Context, name string) (int64, error) {
key := s.buildKey(name)
logger := log.WithField("key", key)
logger.Debugf("getting file size from %s", s.bucket)
resp, err := s.api.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: &s.bucket,
Key: &key,
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "NotFound" {
return 0, os.ErrNotExist
}
}
return 0, errors.Wrap(err, "failed to get file size")
}
return *resp.ContentLength, nil
}
func (s *S3) buildKey(name string) string {
return path.Join(s.prefix, name)
}
type readerWithN struct {
io.Reader
n int
}
func (r *readerWithN) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.n += n
return
}
type s3logger struct{}
func (s s3logger) Log(args ...interface{}) {
log.Debug(args...)
}