Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 DeleteAllVersions: use pagination #9228

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions util/pkg/vfs/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,31 +119,30 @@ func (p *S3Path) RemoveAllVersions() error {
Prefix: aws.String(p.key),
}

response, err := client.ListObjectVersions(request)
if err != nil {
var versions []*s3.ObjectVersion
var deleteMarkers []*s3.DeleteMarkerEntry
if err := client.ListObjectVersionsPages(request, func(page *s3.ListObjectVersionsOutput, lastPage bool) bool {
versions = append(versions, page.Versions...)
deleteMarkers = append(deleteMarkers, page.DeleteMarkers...)
return true
}); err != nil {
return fmt.Errorf("error listing all versions of file %s: %v", p, err)
}

if len(response.Versions) == 0 && len(response.DeleteMarkers) == 0 {
if len(versions) == 0 && len(deleteMarkers) == 0 {
return os.ErrNotExist
}

// Sometimes S3 will return paginated results if there are too many versions and markers for an object.
// This happens at about entries 1000, so it is unlikely with current use cases.
if aws.BoolValue(response.IsTruncated) {
klog.Warningf("too many versions for %s", p)
}

objects := []*s3.ObjectIdentifier{}
for _, version := range response.Versions {
var objects []*s3.ObjectIdentifier
for _, version := range versions {
klog.V(8).Infof("removing file %s version %q", p, aws.StringValue(version.VersionId))
file := s3.ObjectIdentifier{
Key: version.Key,
VersionId: version.VersionId,
}
objects = append(objects, &file)
}
for _, version := range response.DeleteMarkers {
for _, version := range deleteMarkers {
klog.V(8).Infof("removing marker %s version %q", p, aws.StringValue(version.VersionId))
marker := s3.ObjectIdentifier{
Key: version.Key,
Expand All @@ -152,19 +151,26 @@ func (p *S3Path) RemoveAllVersions() error {
objects = append(objects, &marker)
}

if len(objects) > 0 {
klog.V(8).Infof("removing %d file/marker versions\n", len(objects))

for len(objects) > 0 {
request := &s3.DeleteObjectsInput{
Bucket: aws.String(p.bucket),
Delete: &s3.Delete{
Objects: objects,
},
Delete: &s3.Delete{},
}

// DeleteObjects can only process 1000 objects per call
if len(objects) > 1000 {
request.Delete.Objects = objects[:1000]
objects = objects[1000:]
} else {
request.Delete.Objects = objects
objects = nil
}

klog.V(8).Infof("removing %d file/marker versions\n", len(request.Delete.Objects))

_, err = client.DeleteObjects(request)
if err != nil {
return fmt.Errorf("error removing %d file/marker versions: %v", len(objects), err)
return fmt.Errorf("error removing %d file/marker versions: %v", len(request.Delete.Objects), err)
}
}

Expand Down