-
Notifications
You must be signed in to change notification settings - Fork 351
/
handler.go
311 lines (286 loc) · 10.5 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
package gateway
import (
"errors"
"net/http"
gohttputil "net/http/httputil"
"net/url"
"regexp"
"strings"
"github.com/treeverse/lakefs/pkg/auth"
"github.com/treeverse/lakefs/pkg/auth/model"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/catalog"
gatewayerrors "github.com/treeverse/lakefs/pkg/gateway/errors"
"github.com/treeverse/lakefs/pkg/gateway/multipart"
"github.com/treeverse/lakefs/pkg/gateway/operations"
"github.com/treeverse/lakefs/pkg/gateway/sig"
"github.com/treeverse/lakefs/pkg/httputil"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/permissions"
"github.com/treeverse/lakefs/pkg/stats"
"github.com/treeverse/lakefs/pkg/upload"
)
type contextKey string
const (
ContextKeyUser contextKey = "user"
ContextKeyRepositoryID contextKey = "repository_id"
ContextKeyRepository contextKey = "repository"
ContextKeyAuthContext contextKey = "auth_context"
ContextKeyOperation contextKey = "operation"
ContextKeyRef contextKey = "ref"
ContextKeyPath contextKey = "path"
ContextKeyMatchedHost contextKey = "matched_host"
)
var commaSeparator = regexp.MustCompile(`,\s*`)
var (
contentTypeApplicationXML = "application/xml"
contentTypeTextXML = "text/xml"
)
type handler struct {
sc *ServerContext
ServerErrorHandler http.Handler
operationHandlers map[operations.OperationID]http.Handler
}
type ServerContext struct {
region string
bareDomains []string
catalog catalog.Interface
multipartTracker multipart.Tracker
blockStore block.Adapter
authService auth.GatewayService
stats stats.Collector
pathProvider upload.PathProvider
}
func NewHandler(region string, catalog catalog.Interface, multipartTracker multipart.Tracker, blockStore block.Adapter, authService auth.GatewayService, bareDomains []string, stats stats.Collector, pathProvider upload.PathProvider, fallbackURL *url.URL, auditLogLevel string, traceRequestHeaders bool) http.Handler {
var fallbackHandler http.Handler
if fallbackURL != nil {
fallbackProxy := gohttputil.NewSingleHostReverseProxy(fallbackURL)
fallbackHandler = http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
for _, bareDomain := range bareDomains {
fallback := strings.Replace(request.Host, bareDomain, fallbackURL.Host, 1)
if fallback != request.Host {
request.Host = fallback
break
}
}
fallbackProxy.ServeHTTP(writer, request)
})
}
sc := &ServerContext{
catalog: catalog,
multipartTracker: multipartTracker,
region: region,
bareDomains: bareDomains,
blockStore: blockStore,
authService: authService,
stats: stats,
pathProvider: pathProvider,
}
// setup routes
var h http.Handler
h = &handler{
sc: sc,
ServerErrorHandler: nil,
operationHandlers: map[operations.OperationID]http.Handler{
operations.OperationIDDeleteObject: PathOperationHandler(sc, &operations.DeleteObject{}),
operations.OperationIDDeleteObjects: RepoOperationHandler(sc, &operations.DeleteObjects{}),
operations.OperationIDGetObject: PathOperationHandler(sc, &operations.GetObject{}),
operations.OperationIDPutBucket: RepoOperationHandler(sc, &operations.PutBucket{}),
operations.OperationIDHeadBucket: RepoOperationHandler(sc, &operations.HeadBucket{}),
operations.OperationIDHeadObject: PathOperationHandler(sc, &operations.HeadObject{}),
operations.OperationIDListBuckets: OperationHandler(sc, &operations.ListBuckets{}),
operations.OperationIDListObjects: RepoOperationHandler(sc, &operations.ListObjects{}),
operations.OperationIDPostObject: PathOperationHandler(sc, &operations.PostObject{}),
operations.OperationIDPutObject: PathOperationHandler(sc, &operations.PutObject{}),
operations.OperationIDUnsupportedOperation: unsupportedOperationHandler(),
},
}
loggingMiddleware := httputil.LoggingMiddleware(
"X-Amz-Request-Id",
logging.Fields{"service_name": "s3_gateway"},
auditLogLevel,
traceRequestHeaders)
h = loggingMiddleware(h)
h = EnrichWithOperation(sc,
DurationHandler(
AuthenticationHandler(authService, EnrichWithParts(bareDomains,
EnrichWithRepositoryOrFallback(catalog, authService, fallbackHandler,
OperationLookupHandler(
h))))))
logging.Default().WithFields(logging.Fields{
"s3_bare_domain": bareDomains,
"s3_region": region,
}).Info("initialized S3 Gateway handler")
return h
}
func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
setDefaultContentType(w, req)
o := req.Context().Value(ContextKeyOperation).(*operations.Operation)
operationHandler := h.operationHandlers[o.OperationID]
if operationHandler == nil {
// TODO(johnnyaug): consider other status code or add text with unknown gateway operation
w.WriteHeader(http.StatusNotFound)
return
}
operationHandler.ServeHTTP(w, req)
}
func getAPIErrOrDefault(err error, defaultAPIErr gatewayerrors.APIErrorCode) gatewayerrors.APIError {
apiError, ok := err.(gatewayerrors.APIErrorCode)
if ok {
return apiError.ToAPIErr()
} else {
return defaultAPIErr.ToAPIErr()
}
}
func OperationHandler(sc *ServerContext, handler operations.AuthenticatedOperationHandler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
perms, err := handler.RequiredPermissions(req)
if err != nil {
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
return
}
authOp := authorize(w, req, sc.authService, perms)
if authOp == nil {
return
}
handler.Handle(w, req, authOp)
})
}
func RepoOperationHandler(sc *ServerContext, handler operations.RepoOperationHandler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
repo := ctx.Value(ContextKeyRepository).(*catalog.Repository)
matchedHost := ctx.Value(ContextKeyMatchedHost).(bool)
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
perms, err := handler.RequiredPermissions(req, repo.Name)
if err != nil {
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
return
}
authOp := authorize(w, req, sc.authService, perms)
if authOp == nil {
return
}
repoOperation := &operations.RepoOperation{
AuthorizedOperation: authOp,
Repository: repo,
MatchedHost: matchedHost,
}
req = req.WithContext(logging.AddFields(ctx, logging.Fields{
logging.RepositoryFieldKey: repo.Name,
logging.MatchedHostFieldKey: matchedHost,
}))
handler.Handle(w, req, repoOperation)
})
}
func PathOperationHandler(sc *ServerContext, handler operations.PathOperationHandler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
repo := ctx.Value(ContextKeyRepository).(*catalog.Repository)
refID := ctx.Value(ContextKeyRef).(string)
path := ctx.Value(ContextKeyPath).(string)
matchedHost := ctx.Value(ContextKeyMatchedHost).(bool)
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
perms, err := handler.RequiredPermissions(req, repo.Name, refID, path)
if err != nil {
if errors.Is(err, gatewayerrors.ErrInvalidCopySource) {
_ = o.EncodeError(w, req, gatewayerrors.ErrInvalidCopySource.ToAPIErr())
} else {
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
}
return
}
authOp := authorize(w, req, sc.authService, perms)
if authOp == nil {
return
}
// run callback
operation := &operations.PathOperation{
RefOperation: &operations.RefOperation{
RepoOperation: &operations.RepoOperation{
AuthorizedOperation: authOp,
Repository: repo,
MatchedHost: matchedHost,
},
Reference: refID,
},
Path: path,
}
req = req.WithContext(logging.AddFields(ctx, logging.Fields{
logging.RepositoryFieldKey: repo.Name,
logging.RefHostFieldKey: refID,
logging.PathFieldKey: path,
logging.MatchedHostFieldKey: matchedHost,
}))
handler.Handle(w, req, operation)
})
}
func authorize(w http.ResponseWriter, req *http.Request, authService auth.GatewayService, perms permissions.Node) *operations.AuthorizedOperation {
ctx := req.Context()
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
username := ctx.Value(ContextKeyUser).(*model.User).Username
authContext := ctx.Value(ContextKeyAuthContext).(sig.SigContext)
if len(perms.Nodes) == 0 && len(perms.Permission.Action) == 0 {
// has not provided required permissions
return &operations.AuthorizedOperation{
Operation: o,
Principal: username,
}
}
authResp, err := authService.Authorize(req.Context(), &auth.AuthorizationRequest{
Username: username,
RequiredPermissions: perms,
})
if err != nil {
o.Log(req).WithError(err).Error("failed to authorize")
_ = o.EncodeError(w, req, gatewayerrors.ErrInternalError.ToAPIErr())
return nil
}
if authResp.Error != nil || !authResp.Allowed {
o.Log(req).WithError(authResp.Error).WithField("key", authContext.GetAccessKeyID()).Warn("no permission")
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
return nil
}
return &operations.AuthorizedOperation{
Operation: o,
Principal: username,
}
}
func selectContentType(acceptable []string) *string {
for _, acceptableTypes := range acceptable {
acceptable := commaSeparator.Split(acceptableTypes, -1)
for _, a := range acceptable {
switch a {
case contentTypeTextXML:
return &contentTypeTextXML
case contentTypeApplicationXML:
return &contentTypeApplicationXML
}
}
}
return nil
}
func setDefaultContentType(w http.ResponseWriter, req *http.Request) {
acceptable, ok := req.Header["Accept"]
if ok {
defaultContentType := selectContentType(acceptable)
if defaultContentType != nil {
w.Header().Set("Content-Type", *defaultContentType)
}
// If no requested content type matched, still OK at least for proxied content
// (GET or HEAD), so set up to auto-detect.
} else {
w.Header().Set("Content-Type", contentTypeApplicationXML)
// For proxied content (GET or HEAD) the type will be reset according to
// whatever headers arrive, including setting up to auto-detect content-type if
// none is specified by the adapter.
}
}
func unsupportedOperationHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
o := &operations.Operation{}
_ = o.EncodeError(w, req, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr())
})
}