/
manager.go
107 lines (94 loc) · 2.88 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package s3client
import (
"emperror.dev/errors"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/oxyno-zeta/s3-proxy/pkg/s3-proxy/config"
"github.com/oxyno-zeta/s3-proxy/pkg/s3-proxy/metrics"
"github.com/thoas/go-funk"
)
const oneMega = 1024 * 1024
type manager struct {
targetClient map[string]Client
cfgManager config.Manager
metricCl metrics.Client
}
func (m *manager) GetClientForTarget(name string) Client {
return m.targetClient[name]
}
func (m *manager) Load() error {
// Get configuration
cfg := m.cfgManager.GetConfig()
// Store target keys
tgtKeys := make([]string, 0)
// Loop over all targets
for key, tgt := range cfg.Targets {
// Store key
tgtKeys = append(tgtKeys, key)
// Create new client
cl, err := newClient(tgt, m.metricCl)
// Check error
if err != nil {
return err
}
// Store client
m.targetClient[key] = cl
}
// Get all keys from current object
actualKeysInt := funk.Keys(m.targetClient)
// Check if result exists or not
if actualKeysInt != nil {
// Cast it to string array
actualKeys, _ := actualKeysInt.([]string)
// Get difference between those 2 array
subtract := funk.SubtractString(actualKeys, tgtKeys)
// Loop over subtract keys
for _, key := range subtract {
// Delete key inside actual object
delete(m.targetClient, key)
}
}
// Default
return nil
}
func newClient(tgt *config.TargetConfig, metricsCtx metrics.Client) (Client, error) {
sessionConfig := &aws.Config{
Region: aws.String(tgt.Bucket.Region),
}
// Load credentials if they exists
if tgt.Bucket.Credentials != nil && tgt.Bucket.Credentials.AccessKey != nil && tgt.Bucket.Credentials.SecretKey != nil {
sessionConfig.Credentials = credentials.NewStaticCredentials(tgt.Bucket.Credentials.AccessKey.Value, tgt.Bucket.Credentials.SecretKey.Value, "")
}
// Load custom endpoint if it exists
if tgt.Bucket.S3Endpoint != "" {
sessionConfig.Endpoint = aws.String(tgt.Bucket.S3Endpoint)
sessionConfig.S3ForcePathStyle = aws.Bool(true)
}
// Check if ssl needs to be disabled
if tgt.Bucket.DisableSSL {
sessionConfig.DisableSSL = aws.Bool(true)
}
// Create session
sess, err := session.NewSession(sessionConfig)
// Check error
if err != nil {
return nil, errors.WithStack(err)
}
// Create s3 client
svcClient := s3.New(sess)
s3managerUploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.Concurrency = tgt.Bucket.S3UploadConcurrency
u.MaxUploadParts = tgt.Bucket.S3MaxUploadParts
u.LeavePartsOnError = tgt.Bucket.S3UploadLeavePartsOnError
u.PartSize = tgt.Bucket.S3UploadPartSize * oneMega
})
return &s3client{
svcClient: svcClient,
target: tgt,
metricsCtx: metricsCtx,
s3managerUploader: s3managerUploader,
}, nil
}