-
Notifications
You must be signed in to change notification settings - Fork 351
/
factory.go
208 lines (183 loc) · 5.36 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package store
import (
"context"
"errors"
"fmt"
"net/url"
"time"
"cloud.google.com/go/storage"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/treeverse/lakefs/pkg/block/azure"
"github.com/treeverse/lakefs/pkg/block/factory"
"github.com/treeverse/lakefs/pkg/block/params"
)
var (
ErrNotSupported = errors.New("no storage adapter found")
ErrForbidden = errors.New("forbidden")
ErrBadPath = errors.New("bad path traversal blocked")
)
type ObjectStoreEntry struct {
// FullKey represents the fully qualified path in the object store namespace for the given entry
FullKey string `json:"full_key,omitempty"`
// RelativeKey represents a path relative to prefix (or directory). If none specified, will be identical to FullKey
RelativeKey string `json:"relative_key,omitempty"`
// Address is a full URI for the entry, including the storage namespace (i.e. s3://bucket/path/to/key)
Address string `json:"address,omitempty"`
// ETag represents a hash of the entry's content. Generally as hex encoded MD5,
// but depends on the underlying object store
ETag string `json:"etag,omitempty"`
// Mtime is the last-modified datetime of the entry
Mtime time.Time `json:"mtime,omitempty"`
// Size in bytes
Size int64 `json:"size"`
}
type WalkOptions struct {
// All walked items must be greater than After
After string
// ContinuationToken is passed to the client for efficient listing.
// Value is Opaque to the caller.
ContinuationToken string
}
type Mark struct {
ContinuationToken string
LastKey string
HasMore bool
}
type Walker interface {
Walk(ctx context.Context, storageURI *url.URL, op WalkOptions, walkFn func(e ObjectStoreEntry) error) error
Marker() Mark
}
func (e ObjectStoreEntry) String() string {
return fmt.Sprintf("ObjectStoreEntry: {Address:%s, RelativeKey:%s, ETag:%s, Size:%d, Mtime:%s}",
e.Address, e.RelativeKey, e.ETag, e.Size, e.Mtime)
}
type WalkerOptions struct {
S3EndpointURL string
StorageURI string
}
type WalkerWrapper struct {
walker Walker
uri *url.URL
}
func NewWrapper(walker Walker, uri *url.URL) *WalkerWrapper {
return &WalkerWrapper{
walker: walker,
uri: uri,
}
}
func (ww *WalkerWrapper) Walk(ctx context.Context, opts WalkOptions, walkFn func(e ObjectStoreEntry) error) error {
return ww.walker.Walk(ctx, ww.uri, opts, walkFn)
}
func (ww *WalkerWrapper) Marker() Mark {
return ww.walker.Marker()
}
type WalkerFactory struct {
params params.AdapterConfig
}
func NewFactory(params params.AdapterConfig) *WalkerFactory {
return &WalkerFactory{params: params}
}
func (f *WalkerFactory) buildS3Walker(opts WalkerOptions) (*S3Walker, error) {
var sess *session.Session
if f.params != nil {
s3params, err := f.params.BlockstoreS3Params()
if err != nil {
return nil, err
}
sess, err = factory.BuildS3Client(s3params.AwsConfig, s3params.SkipVerifyCertificateTestOnly)
if err != nil {
return nil, err
}
} else {
var err error
sess, err = getS3Client(opts.S3EndpointURL)
if err != nil {
return nil, err
}
}
return NewS3Walker(sess), nil
}
func (f *WalkerFactory) buildGCSWalker(ctx context.Context) (*GCSWalker, error) {
var svc *storage.Client
if f.params != nil {
gsParams, err := f.params.BlockstoreGSParams()
if err != nil {
return nil, err
}
svc, err = factory.BuildGSClient(ctx, gsParams)
if err != nil {
return nil, err
}
} else {
var err error
svc, err = storage.NewClient(ctx)
if err != nil {
return nil, err
}
}
return NewGCSWalker(svc), nil
}
func (f *WalkerFactory) buildAzureWalker(importURL *url.URL) (*AzureBlobWalker, error) {
storageAccount, err := azure.ExtractStorageAccount(importURL)
if err != nil {
return nil, err
}
var azureParams params.Azure
if f.params != nil {
// server settings
azureParams, err = f.params.BlockstoreAzureParams()
if err != nil {
return nil, err
}
}
// Use StorageAccessKey to initialize storage account client only if it was provided for this given storage account
// Otherwise fall back to the default credentials
if azureParams.StorageAccount != storageAccount {
azureParams.StorageAccount = storageAccount
azureParams.StorageAccessKey = ""
}
c, err := azure.BuildAzureServiceClient(azureParams)
if err != nil {
return nil, err
}
return NewAzureBlobWalker(c)
}
func (f *WalkerFactory) GetWalker(ctx context.Context, opts WalkerOptions) (*WalkerWrapper, error) {
uri, err := url.Parse(opts.StorageURI)
if err != nil {
return nil, fmt.Errorf("could not parse storage URI %s: %w", uri, err)
}
var walker Walker
switch uri.Scheme {
case "s3":
walker, err = f.buildS3Walker(opts)
if err != nil {
return nil, fmt.Errorf("creating s3 walker: %w", err)
}
case "gs":
walker, err = f.buildGCSWalker(ctx)
if err != nil {
return nil, fmt.Errorf("creating gs walker: %w", err)
}
case "http", "https":
walker, err = f.buildAzureWalker(uri)
if err != nil {
return nil, fmt.Errorf("creating Azure walker: %w", err)
}
case "local":
walker, err = f.buildLocalWalker()
if err != nil {
return nil, fmt.Errorf("creating local walker: %w", err)
}
default:
return nil, fmt.Errorf("%w: for scheme: %s", ErrNotSupported, uri.Scheme)
}
return NewWrapper(walker, uri), nil
}
func (f *WalkerFactory) buildLocalWalker() (*LocalWalker, error) {
localParams, err := f.params.BlockstoreLocalParams()
if err != nil {
return nil, err
}
return NewLocalWalker(localParams), nil
}