Skip to content

Commit

Permalink
gc: add metrics for deleted resources (#711)
Browse files Browse the repository at this point in the history
Add counters for the number of resources deleted by the gc worker, the
repository gc worker and the namespace gc worker.
  • Loading branch information
kleesc committed Apr 9, 2021
1 parent ecc125f commit f774e4c
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 36 deletions.
118 changes: 90 additions & 28 deletions data/model/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
DerivedStorageForImage,
)
from data.database import TagManifestToManifest, TagToRepositoryTag, TagManifestLabelMap
from util.metrics.prometheus import gc_table_rows_deleted


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -93,8 +95,13 @@ def purge_repository(repo, force=False):
# Note that new-model Tag's must be deleted in *two* passes, as they can reference parent tags,
# and MySQL is... particular... about such relationships when deleting.
if repo.kind.name == "application":
ApprTag.delete().where(ApprTag.repository == repo, ~(ApprTag.linked_tag >> None)).execute()
ApprTag.delete().where(ApprTag.repository == repo).execute()
fst_pass = (
ApprTag.delete()
.where(ApprTag.repository == repo, ~(ApprTag.linked_tag >> None))
.execute()
)
snd_pass = ApprTag.delete().where(ApprTag.repository == repo).execute()
gc_table_rows_deleted.labels(table="ApprTag").inc(fst_pass + snd_pass)
else:
# GC to remove the images and storage.
_purge_repository_contents(repo)
Expand Down Expand Up @@ -372,11 +379,16 @@ def _purge_oci_tag(tag, context, allow_non_expired=False):
return False

# Delete mapping rows.
TagToRepositoryTag.delete().where(TagToRepositoryTag.tag == tag).execute()
deleted_tag_to_repotag = (
TagToRepositoryTag.delete().where(TagToRepositoryTag.tag == tag).execute()
)

# Delete the tag.
tag.delete_instance()

gc_table_rows_deleted.labels(table="Tag").inc()
gc_table_rows_deleted.labels(table="TagToRepositoryTag").inc(deleted_tag_to_repotag)


def _purge_pre_oci_tag(tag, context, allow_non_expired=False):
assert tag.repository_id == context.repository.id
Expand Down Expand Up @@ -410,13 +422,18 @@ def _purge_pre_oci_tag(tag, context, allow_non_expired=False):
return False

# Delete mapping rows.
TagToRepositoryTag.delete().where(
TagToRepositoryTag.repository_tag == reloaded_tag
).execute()
deleted_tag_to_repotag = (
TagToRepositoryTag.delete()
.where(TagToRepositoryTag.repository_tag == reloaded_tag)
.execute()
)

# Delete the tag.
reloaded_tag.delete_instance()

gc_table_rows_deleted.labels(table="RepositoryTag").inc()
gc_table_rows_deleted.labels(table="TagToRepositoryTag").inc(deleted_tag_to_repotag)


def _purge_uploaded_blob(uploaded_blob, context, allow_non_expired=False):
assert allow_non_expired or uploaded_blob.expires_at <= datetime.utcnow()
Expand All @@ -426,6 +443,7 @@ def _purge_uploaded_blob(uploaded_blob, context, allow_non_expired=False):

# Delete the uploaded blob.
uploaded_blob.delete_instance()
gc_table_rows_deleted.labels(table="UploadedBlob").inc()


def _check_manifest_used(manifest_id):
Expand Down Expand Up @@ -488,37 +506,62 @@ def _garbage_collect_manifest(manifest_id, context):
return False

# Delete any label mappings.
(TagManifestLabelMap.delete().where(TagManifestLabelMap.manifest == manifest_id).execute())
deleted_tag_manifest_label_map = (
TagManifestLabelMap.delete()
.where(TagManifestLabelMap.manifest == manifest_id)
.execute()
)

# Delete any mapping rows for the manifest.
TagManifestToManifest.delete().where(
TagManifestToManifest.manifest == manifest_id
).execute()
deleted_tag_manifest_to_manifest = (
TagManifestToManifest.delete()
.where(TagManifestToManifest.manifest == manifest_id)
.execute()
)

# Delete any label rows.
ManifestLabel.delete().where(
ManifestLabel.manifest == manifest_id, ManifestLabel.repository == context.repository
).execute()
deleted_manifest_label = (
ManifestLabel.delete()
.where(
ManifestLabel.manifest == manifest_id,
ManifestLabel.repository == context.repository,
)
.execute()
)

# Delete any child manifest rows.
ManifestChild.delete().where(
ManifestChild.manifest == manifest_id, ManifestChild.repository == context.repository
).execute()
deleted_manifest_child = (
ManifestChild.delete()
.where(
ManifestChild.manifest == manifest_id,
ManifestChild.repository == context.repository,
)
.execute()
)

# Delete the manifest blobs for the manifest.
ManifestBlob.delete().where(
ManifestBlob.manifest == manifest_id, ManifestBlob.repository == context.repository
).execute()
deleted_manifest_blob = (
ManifestBlob.delete()
.where(
ManifestBlob.manifest == manifest_id, ManifestBlob.repository == context.repository
)
.execute()
)

# Delete the security status for the manifest
ManifestSecurityStatus.delete().where(
ManifestSecurityStatus.manifest == manifest_id,
ManifestSecurityStatus.repository == context.repository,
).execute()
deleted_manifest_security = (
ManifestSecurityStatus.delete()
.where(
ManifestSecurityStatus.manifest == manifest_id,
ManifestSecurityStatus.repository == context.repository,
)
.execute()
)

# Delete the manifest legacy image row.
deleted_manifest_legacy_image = 0
if legacy_image_id:
(
deleted_manifest_legacy_image = (
ManifestLegacyImage.delete()
.where(
ManifestLegacyImage.manifest == manifest_id,
Expand All @@ -531,6 +574,18 @@ def _garbage_collect_manifest(manifest_id, context):
manifest.delete_instance()

context.mark_manifest_removed(manifest)

gc_table_rows_deleted.labels(table="TagManifestLabelMap").inc(deleted_tag_manifest_label_map)
gc_table_rows_deleted.labels(table="TagManifestToManifest").inc(
deleted_tag_manifest_to_manifest
)
gc_table_rows_deleted.labels(table="ManifestLabel").inc(deleted_manifest_label)
gc_table_rows_deleted.labels(table="ManifestChild").inc(deleted_manifest_child)
gc_table_rows_deleted.labels(table="ManifestBlob").inc(deleted_manifest_blob)
gc_table_rows_deleted.labels(table="ManifestSecurityStatus").inc(deleted_manifest_security)
if deleted_manifest_legacy_image:
gc_table_rows_deleted.labels(table="ManifestLegacyImage").inc(deleted_manifest_legacy_image)

return True


Expand Down Expand Up @@ -570,13 +625,16 @@ def _garbage_collect_legacy_manifest(legacy_manifest_id, context):
.get()
)
context.add_manifest_id(tmt.manifest_id)
tmt.delete_instance()
tmt_deleted = tmt.delete_instance()
if tmt_deleted:
gc_table_rows_deleted.labels(table="TagManifestToManifest").inc()
except TagManifestToManifest.DoesNotExist:
pass

# Delete the tag manifest.
tag_manifest.delete_instance()

tag_manifest_deleted = tag_manifest.delete_instance()
if tag_manifest_deleted:
gc_table_rows_deleted.labels(table="TagManifest").inc()
return True


Expand Down Expand Up @@ -650,7 +708,7 @@ def _garbage_collect_legacy_image(legacy_image_id, context):
assert image.repository_id == context.repository.id

# Delete any derived storage for the image.
(
deleted_derived_storage = (
DerivedStorageForImage.delete()
.where(DerivedStorageForImage.source_image == legacy_image_id)
.execute()
Expand All @@ -661,6 +719,9 @@ def _garbage_collect_legacy_image(legacy_image_id, context):

context.mark_legacy_image_removed(image)

gc_table_rows_deleted.labels(table="Image").inc()
gc_table_rows_deleted.labels(table="DerivedStorageForImage").inc(deleted_derived_storage)

if config.image_cleanup_callbacks:
for callback in config.image_cleanup_callbacks:
callback([image])
Expand Down Expand Up @@ -700,5 +761,6 @@ def _garbage_collect_label(label_id, context):

if result:
context.mark_label_id_removed(label_id)
gc_table_rows_deleted.labels(table="Label").inc(result)

return result
36 changes: 28 additions & 8 deletions data/model/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ManifestBlob,
UploadedBlob,
)
from util.metrics.prometheus import gc_table_rows_deleted, gc_storage_blobs_deleted

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -191,25 +192,43 @@ def placements_to_filtered_paths_set(placements_list):
)

# Remove the placements for orphaned storages
deleted_image_storage_placement = 0
if placements_to_remove:
ImageStoragePlacement.delete().where(
ImageStoragePlacement.storage == storage_id_to_check
).execute()
deleted_image_storage_placement = (
ImageStoragePlacement.delete()
.where(ImageStoragePlacement.storage == storage_id_to_check)
.execute()
)

# Remove all orphaned storages
TorrentInfo.delete().where(TorrentInfo.storage == storage_id_to_check).execute()
deleted_torrent_info = (
TorrentInfo.delete().where(TorrentInfo.storage == storage_id_to_check).execute()
)

ImageStorageSignature.delete().where(
ImageStorageSignature.storage == storage_id_to_check
).execute()
deleted_image_storage_signature = (
ImageStorageSignature.delete()
.where(ImageStorageSignature.storage == storage_id_to_check)
.execute()
)

ImageStorage.delete().where(ImageStorage.id == storage_id_to_check).execute()
deleted_image_storage = (
ImageStorage.delete().where(ImageStorage.id == storage_id_to_check).execute()
)

# Determine the paths to remove. We cannot simply remove all paths matching storages, as CAS
# can share the same path. We further filter these paths by checking for any storages still in
# the database with the same content checksum.
paths_to_remove.extend(placements_to_filtered_paths_set(placements_to_remove))

gc_table_rows_deleted.labels(table="TorrentInfo").inc(deleted_torrent_info)
gc_table_rows_deleted.labels(table="ImageStorageSignature").inc(
deleted_image_storage_signature
)
gc_table_rows_deleted.labels(table="ImageStorage").inc(deleted_image_storage)
gc_table_rows_deleted.labels(table="ImageStoragePlacement").inc(
deleted_image_storage_placement
)

# We are going to make the conscious decision to not delete image storage blobs inside
# transactions.
# This may end up producing garbage in s3, trading off for higher availability in the database.
Expand All @@ -230,6 +249,7 @@ def placements_to_filtered_paths_set(placements_list):

logger.debug("Removing %s from %s", image_path, location_name)
config.store.remove({location_name}, image_path)
gc_storage_blobs_deleted.inc()

return orphaned_storage_ids

Expand Down
21 changes: 21 additions & 0 deletions util/metrics/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import threading
import time
from collections import namedtuple
import urllib.request, urllib.error, urllib.parse

from cachetools.func import lru_cache
Expand All @@ -15,6 +16,7 @@
logger = logging.getLogger(__name__)


# DB connections
db_pooled_connections_in_use = Gauge(
"quay_db_pooled_connections_in_use", "number of pooled db connections in use"
)
Expand All @@ -36,6 +38,25 @@
labelnames=["method", "route", "status"],
)

# GC: DB table rows
gc_table_rows_deleted = Counter(
"quay_gc_table_rows_deleted", "number of table rows deleted by GC", labelnames=["table"]
)

# GC: Storage blob
gc_storage_blobs_deleted = Counter(
"quay_gc_storage_blobs_deleted", "number of storage blobs deleted"
)

# GC iterations
gc_repos_purged = Counter(
"quay_gc_repos_purged", "number of repositories purged by the RepositoryGCWorker"
)
gc_namespaces_purged = Counter(
"quay_gc_namespaces_purged", "number of namespaces purged by the NamespaceGCWorker"
)
gc_iterations = Counter("quay_gc_iterations", "number of iterations by the GCWorker")


PROMETHEUS_PUSH_INTERVAL_SECONDS = 30
ONE_DAY_IN_SECONDS = 60 * 60 * 24
Expand Down
2 changes: 2 additions & 0 deletions workers/gc/gcworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from workers.worker import Worker
from util.locking import GlobalLock, LockNotAcquiredException
from workers.gunicorn_worker import GunicornWorker
from util.metrics.prometheus import gc_iterations

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,6 +60,7 @@ def _garbage_collection_repos(self, skip_lock_for_testing=False):
except Repository.DoesNotExist:
return

gc_iterations.inc()
logger.debug(
"Starting GC of repository #%s (%s)", repository.id, repository.name
)
Expand Down
3 changes: 3 additions & 0 deletions workers/namespacegcworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from workers.queueworker import QueueWorker, WorkerSleepException
from util.log import logfile_path
from util.locking import GlobalLock, LockNotAcquiredException
from util.metrics.prometheus import gc_namespaces_purged
from workers.gunicorn_worker import GunicornWorker

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,6 +40,8 @@ def _perform_gc(self, job_details):
if not model.user.delete_namespace_via_marker(marker_id, all_queues):
raise Exception("GC interrupted; will retry")

gc_namespaces_purged.inc()


def create_gunicorn_worker():
"""
Expand Down
3 changes: 3 additions & 0 deletions workers/repositorygcworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from workers.queueworker import QueueWorker, WorkerSleepException
from util.log import logfile_path
from util.locking import GlobalLock, LockNotAcquiredException
from util.metrics.prometheus import gc_repos_purged
from workers.gunicorn_worker import GunicornWorker

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -47,6 +48,8 @@ def _perform_gc(self, job_details):
if not model.gc.purge_repository(marker.repository):
raise Exception("GC interrupted; will retry")

gc_repos_purged.inc()


def create_gunicorn_worker():
"""
Expand Down

0 comments on commit f774e4c

Please sign in to comment.