Skip to content

Commit

Permalink
blobuploadcleanupworker: Add cleanup for orphaned blobs (PROJQUAY-2313)…
Browse files Browse the repository at this point in the history
… (#967)

Currently blobs leftover in the uploads directory during cancelled uploads do not get cleaned up since they are no longer tracked. This change cleans up the uploads storage directory directly.
  • Loading branch information
bcaton85 committed Nov 17, 2021
1 parent bbacf23 commit 22282da
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 3 deletions.
3 changes: 3 additions & 0 deletions config.py
Expand Up @@ -799,3 +799,6 @@ def create_transaction(db):

# Allow creation of push to public repo
CREATE_REPOSITORY_ON_PUSH_PUBLIC = False

# Automatically clean stale blobs leftover in the uploads storage folder from cancelled uploads
CLEAN_BLOB_UPLOAD_FOLDER = False
3 changes: 3 additions & 0 deletions storage/basestorage.py
Expand Up @@ -90,6 +90,9 @@ def remove(self, path):
def get_checksum(self, path):
raise NotImplementedError

def clean_partial_uploads(self, deletion_date_threshold):
raise NotImplementedError

def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END):
"""
Copy the specified number of bytes from the input file stream to the output stream.
Expand Down
28 changes: 27 additions & 1 deletion storage/cloud.py
Expand Up @@ -3,7 +3,7 @@
import copy

from collections import namedtuple
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from io import BufferedIOBase, StringIO, BytesIO
from itertools import chain
from uuid import uuid4
Expand Down Expand Up @@ -696,6 +696,32 @@ def cancel_chunked_upload(self, uuid, storage_metadata):
for chunk in self._chunk_list_from_metadata(storage_metadata):
self.remove(chunk.path)

def clean_partial_uploads(self, deletion_date_threshold):
self._initialize_cloud_conn()
path = self._init_path("uploads")
paginator = self.get_cloud_conn().get_paginator(self._list_object_version)
for page in paginator.paginate(Bucket=self._bucket_name, Prefix=path):
for obj_info in page.get("Contents", []):
if obj_info["LastModified"] <= datetime.now(timezone.utc) - deletion_date_threshold:
obj = self.get_cloud_bucket().Object(obj_info["Key"])
try:
obj.load()
obj.delete()
logger.debug(
"Expired blob removed from uploads folder: %s", obj_info["Key"]
)
except botocore.exceptions.ClientError as s3r:
if not s3r.response["Error"]["Code"] in _MISSING_KEY_ERROR_CODES:
logger.exception(
"Got error when attempting to clean blob with key in uploads folder: %s",
obj_info["Key"],
str(s3r),
)
else:
logger.debug(
"Blob not found in uploads folder with key %s", obj_info["Key"]
)


class S3Storage(_CloudStorage):
def __init__(
Expand Down
1 change: 1 addition & 0 deletions storage/distributedstorage.py
Expand Up @@ -64,6 +64,7 @@ def locations(self):
validate = _location_aware(BaseStorage.validate, requires_write=True)
get_checksum = _location_aware(BaseStorage.get_checksum)
get_supports_resumable_downloads = _location_aware(BaseStorage.get_supports_resumable_downloads)
clean_partial_uploads = _location_aware(BaseStorage.clean_partial_uploads, requires_write=True)

initiate_chunked_upload = _location_aware(
BaseStorageV2.initiate_chunked_upload, requires_write=True
Expand Down
30 changes: 29 additions & 1 deletion storage/test/test_cloud_storage.py
@@ -1,10 +1,10 @@
import os
import time

from io import BytesIO

import pytest

import moto
import botocore.exceptions
import boto3

Expand All @@ -15,11 +15,14 @@
from storage.cloud import _CHUNKS_KEY
from storage.cloud import _build_endpoint_url

from datetime import timedelta

_TEST_CONTENT = os.urandom(1024)
_TEST_BUCKET = "somebucket"
_TEST_USER = "someuser"
_TEST_PASSWORD = "somepassword"
_TEST_PATH = "some/cool/path"
_TEST_UPLOADS_PATH = "uploads/ee160658-9444-4950-8ec6-30faab40529c"
_TEST_CONTEXT = StorageContext("nyc", None, None, None)


Expand Down Expand Up @@ -315,3 +318,28 @@ def test_rechunked(max_size, parts):
assert len(rechunked) == len(parts)
for index, chunk in enumerate(rechunked):
assert chunk == parts[index]


@pytest.mark.parametrize("path", ["/", _TEST_PATH])
def test_clean_partial_uploads(storage_engine, path):

# Setup root path and add come content to _root_path/uploads
storage_engine._root_path = path
storage_engine.put_content(_TEST_UPLOADS_PATH, _TEST_CONTENT)
assert storage_engine.exists(_TEST_UPLOADS_PATH)
assert storage_engine.get_content(_TEST_UPLOADS_PATH) == _TEST_CONTENT

# Test ensure fresh blobs are not deleted
storage_engine.clean_partial_uploads(timedelta(days=2))
assert storage_engine.exists(_TEST_UPLOADS_PATH)
assert storage_engine.get_content(_TEST_UPLOADS_PATH) == _TEST_CONTENT

# Test deletion of stale blobs
time.sleep(1)
storage_engine.clean_partial_uploads(timedelta(seconds=0))
assert not storage_engine.exists(_TEST_UPLOADS_PATH)

# Test if uploads folder does not exist
storage_engine.remove("uploads")
assert not storage_engine.exists("uploads")
storage_engine.clean_partial_uploads(timedelta(seconds=0))
6 changes: 6 additions & 0 deletions util/config/schema.py
Expand Up @@ -1203,5 +1203,11 @@
"description": "Whether to create a repository when pushing to an unexisting public repo",
"x-example": False,
},
# Clean partial uploads during S3 multipart upload
"CLEAN_BLOB_UPLOAD_FOLDER": {
"type": "boolean",
"description": "Automatically clean stale blobs leftover in the uploads storage folder from cancelled uploads",
"x-example": False,
},
},
}
22 changes: 21 additions & 1 deletion workers/blobuploadcleanupworker/blobuploadcleanupworker.py
Expand Up @@ -28,13 +28,33 @@ def __init__(self):
def _try_cleanup_uploads(self):
"""
Performs garbage collection on the blobupload table.
Will also perform garbage collection on the uploads folder in the S3 bucket,
if applicable.
"""
try:
with GlobalLock("BLOB_CLEANUP", lock_ttl=LOCK_TTL):
self._cleanup_uploads()
if app.config.get("CLEAN_BLOB_UPLOAD_FOLDER", False):
self._try_clean_partial_uploads()
except LockNotAcquiredException:
logger.debug("Could not acquire global lock for blob upload cleanup worker")
return

def _try_clean_partial_uploads(self):
"""
Uploads cancelled before completion leaves the possibility of untracked blobs being
leftover in the uploads storage folder.
This function cleans those blobs older than DELETION_DATE_THRESHOLD
"""
try:
storage.clean_partial_uploads(storage.preferred_locations, DELETION_DATE_THRESHOLD)
except NotImplementedError:
if len(storage.preferred_locations) > 0:
logger.debug(
'Cleaning partial uploads not applicable to storage location "%s"',
storage.preferred_locations[0],
)
else:
logger.debug("No preferred locations found")

def _cleanup_uploads(self):
"""
Expand Down
Expand Up @@ -22,6 +22,10 @@ def noop(_):
worker = BlobUploadCleanupWorker()
worker._cleanup_uploads()

storage_mock.locations = ["default"]
worker._try_clean_partial_uploads()

storage_mock.clean_partial_uploads.assert_called_once()
storage_mock.cancel_chunked_upload.assert_called_once()

# Ensure the blob no longer exists.
Expand Down

0 comments on commit 22282da

Please sign in to comment.