-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3_client.go
121 lines (113 loc) · 2.83 KB
/
s3_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package s3
import (
"bytes"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/tsandl/TYDB/dbserver/leveldb/storage"
"io/ioutil"
"path"
)
type S3Client struct {
s3Store *s3.S3
opt OpenOption
}
func GetS3Client(opt OpenOption) (*S3Client, error) {
creds := credentials.NewStaticCredentials(opt.Ak, opt.Sk, "")
_, err := creds.Get()
if err != nil {
return nil, err
}
config := &aws.Config{
Region: aws.String(opt.Region),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
Credentials: creds,
Endpoint: aws.String(opt.Endpoint),
}
client := s3.New(session.New(config))
testBucket := &s3.HeadBucketInput{
Bucket: aws.String(opt.Bucket),
}
_, err = client.HeadBucket(testBucket)
if err != nil {
return nil, err
}
opt.Path = path.Clean(opt.Path)
return &S3Client{s3Store: client, opt: opt}, nil
}
func (client *S3Client) PutBytes(key string, data []byte) error {
_, err := client.s3Store.PutObject(&s3.PutObjectInput{
Bucket: aws.String(client.opt.Bucket),
Key: aws.String(client.opt.Path + "/" + key),
Body: bytes.NewReader(data),
})
return err
}
func (client *S3Client) GetBytes(key string) ([]byte, error) {
rsps, err := client.s3Store.GetObject(&s3.GetObjectInput{
Bucket: aws.String(client.opt.Bucket),
Key: aws.String(client.opt.Path + "/" + key),
})
if err != nil {
return nil, err
}
data, err := ioutil.ReadAll(rsps.Body)
return data, err
}
func (client *S3Client) Remove(key string) error {
_, err := client.s3Store.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(client.opt.Bucket),
Key: aws.String(client.opt.Path + "/" + key),
})
if err != nil {
return err
}
return nil
}
func (client *S3Client) List() ([]storage.FileDesc, error) {
files := []storage.FileDesc{}
err := client.s3Store.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(client.opt.Bucket),
Prefix: aws.String(client.opt.Path + "/"),
Delimiter: aws.String("/"),
},
func(p *s3.ListObjectsOutput, last bool) (shouldContinue bool) {
for _, obj := range p.Contents {
fullName := *obj.Key
_, relName := path.Split(fullName)
fd, pOK := fsParseName(relName)
if pOK {
files = append(files, fd)
}
}
return true
},
)
return files, err
}
func fsParseName(name string) (fd storage.FileDesc, ok bool) {
var tail string
_, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail)
if err == nil {
switch tail {
case "log":
fd.Type = storage.TypeJournal
case "ldb", "sst":
fd.Type = storage.TypeTable
case "tmp":
fd.Type = storage.TypeTemp
default:
return
}
return fd, true
}
n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail)
if n == 1 {
fd.Type = storage.TypeManifest
return fd, true
}
return
}