-
-
Notifications
You must be signed in to change notification settings - Fork 14
/
source.go
127 lines (106 loc) · 2.67 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package s3
import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/sirupsen/logrus"
"github.com/yunify/qscamel/constants"
"github.com/yunify/qscamel/model"
"github.com/yunify/qscamel/utils"
)
// Reachable implement source.Reachable
func (c *Client) Reachable() bool {
return true
}
// List implement source.List
func (c *Client) List(ctx context.Context, j *model.DirectoryObject, fn func(o model.Object)) (err error) {
cp := utils.Join(c.Path, j.Key) + "/"
if cp == "/" {
cp = ""
}
marker := j.Marker
// If ListObjectV2 is enabled, we should use ListObjectsV2 instead.
if c.EnableListObjectsV2 {
for {
resp, err := c.client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(c.BucketName),
Prefix: aws.String(cp),
MaxKeys: aws.Int64(MaxKeys),
StartAfter: aws.String(marker),
})
if err != nil {
logrus.Errorf("List objects failed for %v.", err)
return err
}
for _, v := range resp.Contents {
object := &model.SingleObject{
Key: utils.Relative(*v.Key, c.Path),
Size: *v.Size,
}
fn(object)
}
marker = aws.StringValue(resp.StartAfter)
if !aws.BoolValue(resp.IsTruncated) {
marker = ""
}
// Update task content.
j.Marker = marker
err = model.CreateObject(ctx, j)
if err != nil {
logrus.Errorf("Save task failed for %v.", err)
return err
}
if marker == "" {
break
}
}
return
}
for {
resp, err := c.client.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(c.BucketName),
Prefix: aws.String(cp),
MaxKeys: aws.Int64(MaxKeys),
Marker: aws.String(marker),
})
if err != nil {
logrus.Errorf("List objects failed for %v.", err)
return err
}
for _, v := range resp.Contents {
object := &model.SingleObject{
Key: utils.Relative(*v.Key, c.Path),
Size: *v.Size,
}
fn(object)
}
// Some s3 compatible services may not return next marker, we should also try
// the last key in contents.
marker = aws.StringValue(resp.NextMarker)
if marker == "" && len(resp.Contents) > 0 {
marker = aws.StringValue(resp.Contents[len(resp.Contents)-1].Key)
}
if !aws.BoolValue(resp.IsTruncated) {
marker = ""
}
if marker == "" {
break
}
// Update task content.
j.Marker = marker
err = model.CreateObject(ctx, j)
if err != nil {
logrus.Errorf("Save task failed for %v.", err)
return err
}
}
return
}
// Reach implement source.Fetch
func (c *Client) Reach(ctx context.Context, p string) (url string, err error) {
return "", constants.ErrEndpointFuncNotImplemented
}
// Readable implement source.Readable
func (c *Client) Readable() bool {
return false
}