-
Notifications
You must be signed in to change notification settings - Fork 348
/
deleteobjects.go
194 lines (178 loc) · 6.62 KB
/
deleteobjects.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package operations
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/treeverse/lakefs/pkg/auth"
"github.com/treeverse/lakefs/pkg/catalog"
gerrors "github.com/treeverse/lakefs/pkg/gateway/errors"
"github.com/treeverse/lakefs/pkg/gateway/path"
"github.com/treeverse/lakefs/pkg/gateway/serde"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/permissions"
)
// maxDeleteObjects maximum number of objects we can delete in one call.
// base on https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
const maxDeleteObjects = 1000
type DeleteObjects struct{}
func (controller *DeleteObjects) RequiredPermissions(_ *http.Request, _ string) (permissions.Node, error) {
return permissions.Node{}, nil
}
func (controller *DeleteObjects) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) {
// verify we only handle delete request
query := req.URL.Query()
if !query.Has("delete") {
_ = o.EncodeError(w, req, nil, gerrors.ERRLakeFSNotSupported.ToAPIErr())
return
}
o.Incr("delete_objects", o.Principal, o.Repository.Name, "")
if o.Repository.ReadOnly {
_ = o.EncodeError(w, req, nil, gerrors.Codes.ToAPIErr(gerrors.ErrReadOnlyRepository))
return
}
decodedXML := &serde.Delete{}
err := DecodeXMLBody(req.Body, decodedXML)
if err != nil {
_ = o.EncodeError(w, req, err, gerrors.Codes.ToAPIErr(gerrors.ErrBadRequest))
return
}
if len(decodedXML.Object) == 0 || len(decodedXML.Object) > maxDeleteObjects {
_ = o.EncodeError(w, req, err, gerrors.Codes.ToAPIErr(gerrors.ErrMalformedXML))
return
}
// delete all the files and collect responses
// arrays of keys/path to delete, left after authorization check
var (
keysToDelete []string
pathsToDelete []string
refsToDelete []string
errs []serde.DeleteError
)
for _, obj := range decodedXML.Object {
resolvedPath, err := path.ResolvePath(obj.Key)
if err != nil {
errs = append(errs, serde.DeleteError{
Code: "ErrDeletingKey",
Key: obj.Key,
Message: fmt.Sprintf("error deleting object: %s", err),
})
continue
}
// authorize this object deletion
authResp, err := o.Auth.Authorize(req.Context(), &auth.AuthorizationRequest{
Username: o.Principal,
RequiredPermissions: permissions.Node{
Permission: permissions.Permission{
Action: permissions.DeleteObjectAction,
Resource: permissions.ObjectArn(o.Repository.Name, resolvedPath.Path),
},
},
})
if err != nil || !authResp.Allowed {
errs = append(errs, serde.DeleteError{
Code: "AccessDenied",
Key: obj.Key,
Message: "Access Denied",
})
continue
}
keysToDelete = append(keysToDelete, obj.Key)
refsToDelete = append(refsToDelete, resolvedPath.Ref)
pathsToDelete = append(pathsToDelete, resolvedPath.Path)
}
if len(pathsToDelete) == 0 {
// construct response - probably we failed with all errors
resp := serde.DeleteResult{Error: errs}
o.EncodeResponse(w, req, resp, http.StatusOK)
return
}
// batch delete - if all paths to delete are same ref
canBatch := true
for i := 1; i < len(refsToDelete); i++ {
if refsToDelete[0] != refsToDelete[i] {
canBatch = false
break
}
}
var resp serde.DeleteResult
if canBatch {
// batch - call batch delete for all keys on ref
resp = controller.batchDelete(req.Context(), o.Log(req), o, decodedXML.Quiet, refsToDelete[0], keysToDelete, pathsToDelete)
} else {
// non batch - call delete for each key
resp = controller.nonBatchDelete(req.Context(), o.Log(req), o, decodedXML.Quiet, keysToDelete, refsToDelete, pathsToDelete)
}
// construct response - concat what we had so far with delete results
if len(errs) > 0 {
resp.Error = append(errs, resp.Error...)
}
o.EncodeResponse(w, req, resp, http.StatusOK)
}
func (controller *DeleteObjects) nonBatchDelete(ctx context.Context, log logging.Logger, o *RepoOperation, quiet bool, keysToDelete []string, refsToDelete []string, pathsToDelete []string) serde.DeleteResult {
var result serde.DeleteResult
for i, key := range keysToDelete {
err := o.Catalog.DeleteEntry(ctx, o.Repository.Name, refsToDelete[i], pathsToDelete[i])
updateDeleteResult(&result, quiet, log, key, err)
}
return result
}
func (controller *DeleteObjects) batchDelete(ctx context.Context, log logging.Logger, o *RepoOperation, quiet bool, ref string, keysToDelete []string, pathsToDelete []string) serde.DeleteResult {
var result serde.DeleteResult
batchErr := o.Catalog.DeleteEntries(ctx, o.Repository.Name, ref, pathsToDelete)
deleteErrs := graveler.NewMapDeleteErrors(batchErr)
for _, key := range keysToDelete {
// err will set to the specific error if possible, fallback to the batch delete error
err := deleteErrs[key]
if err == nil {
err = batchErr
}
updateDeleteResult(&result, quiet, log, key, err)
}
return result
}
// updateDeleteResult check the error and update the 'result' with error or delete response for 'key'
func updateDeleteResult(result *serde.DeleteResult, quiet bool, log logging.Logger, key string, err error) {
deleteError := checkForDeleteError(log, key, err)
if deleteError != nil {
result.Error = append(result.Error, *deleteError)
} else if !quiet {
result.Deleted = append(result.Deleted, serde.Deleted{Key: key})
}
}
func checkForDeleteError(log logging.Logger, key string, err error) *serde.DeleteError {
switch {
case errors.Is(err, graveler.ErrNotFound):
log.Debug("tried to delete a non-existent object (OK)")
case errors.Is(err, graveler.ErrWriteToProtectedBranch):
apiErr := gerrors.Codes.ToAPIErr(gerrors.ErrWriteToProtectedBranch)
return &serde.DeleteError{
Code: apiErr.Code,
Key: key,
Message: fmt.Sprintf("error deleting object: %s", apiErr.Description),
}
case errors.Is(err, graveler.ErrReadOnlyRepository):
apiErr := gerrors.Codes.ToAPIErr(gerrors.ErrReadOnlyRepository)
return &serde.DeleteError{
Code: apiErr.Code,
Key: key,
Message: fmt.Sprintf("error deleting object: %s", apiErr.Description),
}
case errors.Is(err, catalog.ErrPathRequiredValue):
// issue #1706 - https://github.com/treeverse/lakeFS/issues/1706
// Spark trying to delete the path "main/", which we map to branch "main" with an empty path.
// Spark expects it to succeed (not deleting anything is a success), instead of returning an error.
log.Debug("tried to delete with an empty path")
case err != nil:
log.WithField("key", key).WithError(err).Error("failed deleting object")
return &serde.DeleteError{
Code: "ErrDeletingKey",
Key: key,
Message: fmt.Sprintf("error deleting object: %s", err),
}
default:
log.WithField("key", key).Debug("object set for deletion")
}
return nil
}