forked from chrislusf/gleam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vfs_s3.go
104 lines (81 loc) · 2.33 KB
/
vfs_s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package filesystem
import (
"fmt"
"io"
"math/rand"
"os"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
const (
AWS_ACCESS_KEY = OptionName("aws_access_key")
AWS_SECRET_KEY = OptionName("aws_secret_key")
)
type S3FileSystem struct {
}
func (fs *S3FileSystem) Accept(fl *FileLocation) bool {
return strings.HasPrefix(fl.Location, "s3://")
}
func (fs *S3FileSystem) Open(fl *FileLocation) (VirtualFile, error) {
sess, err := session.NewSession(aws.NewConfig().WithCredentials(
credentials.NewStaticCredentials(Option[AWS_ACCESS_KEY], Option[AWS_SECRET_KEY], ""),
))
if err != nil {
fmt.Println("failed to create session,", err)
return nil, err
}
svc := s3.New(sess)
bucketName, objectKey, err := splitS3LocationToParts(fl.Location)
if err != nil {
return nil, fmt.Errorf("Failed to split S3 location to parts %s: %v", fl.Location, err)
}
params := &s3.GetObjectInput{
Bucket: aws.String(bucketName), // Required
Key: aws.String(objectKey), // Required
}
resp, err := svc.GetObject(params)
if err != nil {
return nil, err
}
return newVirtualFileS3(resp.Body)
}
func (fs *S3FileSystem) List(fl *FileLocation) (fileLocations []*FileLocation, err error) {
return nil, fmt.Errorf("S3 Listing is not supported yet.")
}
func (fs *S3FileSystem) IsDir(fl *FileLocation) bool {
return false
}
func splitS3LocationToParts(location string) (bucketName, objectKey string, err error) {
s3Prefix := "s3://"
if !strings.HasPrefix(location, s3Prefix) {
return "", "", fmt.Errorf("parameter %s should start with hdfs://", location)
}
parts := strings.SplitN(location[len(s3Prefix):], "/", 2)
return parts[0], parts[1], nil
}
type VirtualFileS3 struct {
*os.File
filename string
size int64
}
func newVirtualFileS3(readerCloser io.ReadCloser) (*VirtualFileS3, error) {
filename := fmt.Sprintf("%s/s3_%d", os.TempDir(), rand.Uint32())
outFile, err := os.Create(filename)
if err != nil {
return nil, err
}
size, err := io.Copy(outFile, readerCloser)
readerCloser.Close()
outFile.Seek(0, 0)
return &VirtualFileS3{outFile, filename, size}, err
}
func (vf *VirtualFileS3) Size() int64 {
return vf.size
}
func (vf *VirtualFileS3) Close() error {
vf.File.Close()
return os.Remove(vf.filename)
}