-
Notifications
You must be signed in to change notification settings - Fork 567
/
bucket.go
118 lines (111 loc) · 3.82 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package obj
import (
"context"
"crypto/tls"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/pachyderm/pachyderm/v2/src/internal/cmdutil"
"github.com/pachyderm/pachyderm/v2/src/internal/promutil"
"net/http"
"net/url"
"strconv"
"time"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"gocloud.dev/blob"
"gocloud.dev/blob/fileblob"
"gocloud.dev/blob/s3blob"
)
// Bucket represents access to a single object storage bucket.
// Azure calls this a container, because it's not like that word conflicts with anything.
type Bucket = blob.Bucket
func amazonHTTPClient() (*http.Client, int, error) {
advancedConfig := &AmazonAdvancedConfiguration{}
if err := cmdutil.Populate(advancedConfig); err != nil {
return nil, -1, errors.Wrap(err, "creating amazon http client")
}
timeout, err := time.ParseDuration(advancedConfig.Timeout)
if err != nil {
return nil, -1, errors.Wrap(err, "creating amazon http client")
}
httpClient := &http.Client{Timeout: timeout}
if advancedConfig.NoVerifySSL {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
httpClient.Transport = transport
}
httpClient.Transport = promutil.InstrumentRoundTripper("s3", httpClient.Transport)
return httpClient, advancedConfig.Retries, nil
}
// This seems to be required in order to support disabling ssl verification -- which is needed for EDF testing.
func amazonSession(ctx context.Context, objURL *ObjectStoreURL) (*session.Session, error) {
urlParams, err := url.ParseQuery(objURL.Params)
if err != nil {
return nil, errors.Wrap(err, "creating amazon session")
}
endpoint := urlParams.Get("endpoint")
// if unset, disableSSL will be false.
disableSSL, _ := strconv.ParseBool(urlParams.Get("disableSSL"))
region := urlParams.Get("region")
httpClient, retries, err := amazonHTTPClient()
if err != nil {
return nil, errors.Wrap(err, "creating amazon session")
}
awsConfig := &aws.Config{
Region: aws.String(region),
MaxRetries: aws.Int(retries),
HTTPClient: httpClient,
DisableSSL: aws.Bool(disableSSL),
Logger: log.NewAmazonLogger(ctx),
}
// Set custom endpoint for a custom deployment.
if endpoint != "" {
awsConfig.Endpoint = aws.String(endpoint)
awsConfig.S3ForcePathStyle = aws.Bool(true)
}
// Create new session using awsConfig
sess, err := session.NewSession(awsConfig)
if err != nil {
return nil, errors.Wrap(err, "creating amazon session")
}
return sess, nil
}
// NewAmazonBucket constructs an amazon client by reading credentials
// from a mounted AmazonSecret. You may pass "" for bucket in which case it
// will read the bucket from the secret.
func NewAmazonBucket(ctx context.Context, objURL *ObjectStoreURL) (*Bucket, error) {
// Use or retrieve S3 bucket
sess, err := amazonSession(ctx, objURL)
if err != nil {
return nil, errors.Wrap(err, "amazon bucket")
}
blobBucket, err := s3blob.OpenBucket(ctx, sess, objURL.Bucket, nil)
if err != nil {
return nil, errors.Wrap(err, "amazon bucket")
}
return blobBucket, nil
}
// NewBucket creates a Bucket using the given backend and storage root (for
// local backends).
func NewBucket(ctx context.Context, storageBackend, storageRoot, storageURL string) (*Bucket, error) {
var err error
var bucket *Bucket
objURL, err := ParseURL(storageURL)
if err != nil {
return nil, errors.Wrap(err, "new bucket")
}
switch storageBackend {
case Amazon:
bucket, err = NewAmazonBucket(ctx, objURL)
case Google, Microsoft:
bucket, err = blob.OpenBucket(ctx, objURL.BucketString())
case Local:
bucket, err = fileblob.OpenBucket(storageRoot, nil)
default:
return nil, errors.Errorf("unrecognized storage backend: %s", storageBackend)
}
if err != nil {
return nil, errors.Wrap(err, "new bucket")
}
return bucket, nil
}