-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathhandler_interface.go
227 lines (211 loc) · 6.89 KB
/
handler_interface.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package s3
import (
"cloud.google.com/go/storage"
"cloudsidecar/pkg/logging"
"context"
"encoding/base64"
"github.com/aws/aws-sdk-go-v2/service/s3/s3iface"
"github.com/gorilla/mux"
"github.com/spf13/viper"
"google.golang.org/api/option"
"net/http"
"strings"
"sync"
)
type Handler struct {
S3Client s3iface.S3API
GCPClient func() (GCPClient, error)
Context *context.Context
Config *viper.Viper
GCPClientToBucket func(bucket string, client GCPClient) GCPBucket
GCPBucketToObject func(name string, bucket GCPBucket) GCPObject
GCPObjectToWriter func(object GCPObject, ctx context.Context) GCPObjectWriter
GCPClientPerKey map[string]GCPClient
gcpClientMapLock sync.Mutex
GCPClientPool map[string][]GCPClient
gcpClientPoolLock sync.Mutex
}
func NewHandler(config *viper.Viper) Handler {
return Handler{
Config: config,
GCPClientToBucket: func(bucket string, client GCPClient) GCPBucket {
return client.Bucket(bucket)
},
GCPBucketToObject: func(name string, bucket GCPBucket) GCPObject {
return bucket.Object(name)
},
GCPClientPerKey: make(map[string]GCPClient),
GCPClientPool: make(map[string][]GCPClient),
GCPObjectToWriter: func(object GCPObject, ctx context.Context) GCPObjectWriter {
return object.NewWriter(ctx)
},
}
}
type GCPClient interface {
Bucket(name string) *storage.BucketHandle
Close() error
}
type GCPObject interface {
ACL() *storage.ACLHandle
Generation(gen int64) *storage.ObjectHandle
If(conds storage.Conditions) *storage.ObjectHandle
Key(encryptionKey []byte) *storage.ObjectHandle
Attrs(ctx context.Context) (attrs *storage.ObjectAttrs, err error)
Update(ctx context.Context, uattrs storage.ObjectAttrsToUpdate) (oa *storage.ObjectAttrs, err error)
BucketName() string
ObjectName() string
Delete(ctx context.Context) error
ReadCompressed(compressed bool) *storage.ObjectHandle
NewWriter(ctx context.Context) *storage.Writer
NewReader(ctx context.Context) (*storage.Reader, error)
NewRangeReader(ctx context.Context, offset, length int64) (r *storage.Reader, err error)
CopierFrom(src *storage.ObjectHandle) *storage.Copier
ComposerFrom(srcs ...*storage.ObjectHandle) *storage.Composer
}
type GCPObjectWriter interface {
Write(p []byte) (n int, err error)
Close() error
CloseWithError(err error) error
Attrs() *storage.ObjectAttrs
}
type GCPBucket interface {
Create(ctx context.Context, projectID string, attrs *storage.BucketAttrs) (err error)
Delete(ctx context.Context) (err error)
ACL() *storage.ACLHandle
DefaultObjectACL() *storage.ACLHandle
Object(name string) *storage.ObjectHandle
Attrs(ctx context.Context) (attrs *storage.BucketAttrs, err error)
Update(ctx context.Context, uattrs storage.BucketAttrsToUpdate) (attrs *storage.BucketAttrs, err error)
If(conds storage.BucketConditions) *storage.BucketHandle
UserProject(projectID string) *storage.BucketHandle
LockRetentionPolicy(ctx context.Context) error
Objects(ctx context.Context, q *storage.Query) *storage.ObjectIterator
}
type HandlerInterface interface {
GetS3Client() s3iface.S3API
GetGCPClient(key string) GCPClient
GetContext() *context.Context
GetConfig() *viper.Viper
SetS3Client(s3Client s3iface.S3API)
SetGCPClient(key string, gcpClient GCPClient)
SetContext(context *context.Context)
SetConfig(config *viper.Viper)
SetGCPClientFromCreds(creds *string) GCPClient
GCPRequestSetup(request *http.Request) GCPClient
}
func (handler *Handler) GetConnection(key string) (GCPClient, error) {
handler.gcpClientPoolLock.Lock()
defer handler.gcpClientPoolLock.Unlock()
var client GCPClient
var err error
if handler.GCPClientPool[key] == nil || len(handler.GCPClientPool[key]) == 0 {
if key != "" {
client, err = handler.SetGCPClientFromCreds(&key)
} else {
client, err = handler.GCPClient()
}
handler.GCPClientPool[key] = []GCPClient{client}
} else {
client = handler.GCPClientPool[key][0]
err = nil
handler.GCPClientPool[key] = handler.GCPClientPool[key][1:]
}
return client, err
}
func (handler *Handler) ReturnConnection(client GCPClient, request *http.Request) {
if handler.Config != nil {
keyFromUrl := handler.Config.Get("gcp_destination_config.key_from_url")
vars := mux.Vars(request)
creds := vars["creds"]
if keyFromUrl != nil && keyFromUrl == true && creds != "" {
handler.ReturnConnectionByKey(client, creds)
return
}
}
handler.ReturnConnectionByKey(client, "")
}
func (handler *Handler) ReturnConnectionByKey(client GCPClient, key string) {
handler.gcpClientPoolLock.Lock()
defer handler.gcpClientPoolLock.Unlock()
handler.GCPClientPool[key] = append(handler.GCPClientPool[key], client)
}
func (handler *Handler) GCPRequestSetup(request *http.Request) (GCPClient, error) {
logging.LogUsingGCP()
if handler.Config != nil {
keyFromUrl := handler.Config.Get("gcp_destination_config.key_from_url")
vars := mux.Vars(request)
creds := vars["creds"]
if keyFromUrl != nil && keyFromUrl == true && creds != "" {
return handler.GetConnection(creds)
}
}
return handler.GetConnection("")
}
func (handler *Handler) SetGCPClientFromCreds(creds *string) (GCPClient, error) {
if connection, ok := handler.GCPClientPerKey[*creds]; ok {
return connection, nil
} else {
decrypted, _ := base64.StdEncoding.DecodeString(*creds)
// _ = handler.GetGCPClient().Close()
handler.gcpClientMapLock.Lock()
defer handler.gcpClientMapLock.Unlock()
var client GCPClient
var doubleCheck bool
var err error
if client, doubleCheck = handler.GCPClientPerKey[*creds]; !doubleCheck {
client, err = storage.NewClient(*handler.GetContext(), option.WithCredentialsJSON([]byte(decrypted)))
}
return client, err
}
}
func (handler *Handler) GetS3Client() s3iface.S3API {
return handler.S3Client
}
func (handler *Handler) GetGCPClient(key string) (GCPClient, error) {
if key != "" {
if connection, ok := handler.GCPClientPerKey[key]; ok {
return connection, nil
}
}
return handler.GCPClient()
}
func (handler *Handler) Shutdown() {
for _, pool := range handler.GCPClientPool {
for _, conn := range pool {
conn.Close()
}
}
logging.Log.Debug("Shutdown s3")
}
func (handler *Handler) GetContext() *context.Context {
return handler.Context
}
func (handler *Handler) GetConfig() *viper.Viper {
return handler.Config
}
func (handler *Handler) SetS3Client(s3Client s3iface.S3API) {
handler.S3Client = s3Client
}
func (handler *Handler) SetContext(context *context.Context) {
handler.Context = context
}
func (handler *Handler) SetConfig(config *viper.Viper) {
handler.Config = config
}
func (handler *Handler) BucketRename(bucket string) string {
if handler.Config != nil {
renameMap := handler.Config.GetStringMapString("gcp_destination_config.gcs_config.bucket_rename")
bucket = strings.Replace(bucket, ".", "__dot__", -1)
if renameMap != nil {
if val, ok := renameMap[bucket]; ok {
return val
}
}
return bucket
} else {
return bucket
}
}
const (
XmlHeader string = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
)