Skip to content

Commit

Permalink
queueworker: prevent stop event on WorkerSleepException (PROJQUAY-1857)…
Browse files Browse the repository at this point in the history
… (#737)

Prevents the queueworker from setting the event to stop the poll_queue
job when a WorkerSleepException is raised. On WorkerSleepException,
the worker should instead skip this iteration (go to sleep). e.g when
the NamespaceGCWorker can't acquire a lock because it is already taken
by some other worker.

Reverts the gcworkers job timeout from 24h to 3h. In case of a
deadlock between processes (for example, redeploying the app will not
clear the existing Redis keys), 24h is too long waiting for the locks to
expires so that the workers can resume work.

Add missing Counter increment for on row deletion on the Manifest table.
  • Loading branch information
kleesc committed Apr 12, 2021
1 parent addaeac commit 90f9ef9
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 5 deletions.
2 changes: 2 additions & 0 deletions data/model/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ def _garbage_collect_manifest(manifest_id, context):
if deleted_manifest_legacy_image:
gc_table_rows_deleted.labels(table="ManifestLegacyImage").inc(deleted_manifest_legacy_image)

gc_table_rows_deleted.labels(table="Manifest").inc()

return True


Expand Down
4 changes: 2 additions & 2 deletions workers/gc/gcworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

logger = logging.getLogger(__name__)

REPOSITORY_GC_TIMEOUT = 24 * 60 * 60 # 24h
REPOSITORY_GC_TIMEOUT = 3 * 60 * 60 # 3h
LOCK_TIMEOUT_PADDING = 60 # 60 seconds


Expand Down Expand Up @@ -60,14 +60,14 @@ def _garbage_collection_repos(self, skip_lock_for_testing=False):
except Repository.DoesNotExist:
return

gc_iterations.inc()
logger.debug(
"Starting GC of repository #%s (%s)", repository.id, repository.name
)
garbage_collect_repo(repository)
logger.debug(
"Finished GC of repository #%s (%s)", repository.id, repository.name
)
gc_iterations.inc()
except LockNotAcquiredException:
logger.debug("Could not acquire repo lock for garbage collection")

Expand Down
2 changes: 1 addition & 1 deletion workers/namespacegcworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


POLL_PERIOD_SECONDS = 60
NAMESPACE_GC_TIMEOUT = 24 * 60 * 60 # 24h
NAMESPACE_GC_TIMEOUT = 3 * 60 * 60 # 3h
LOCK_TIMEOUT_PADDING = 60 # 60 seconds


Expand Down
5 changes: 4 additions & 1 deletion workers/queueworker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import json
import time

from threading import Event, Lock

Expand All @@ -10,6 +11,8 @@

logger = logging.getLogger(__name__)

QUEUE_WORKER_SLEEP_DURATION = 1


class JobException(Exception):
"""
Expand Down Expand Up @@ -142,7 +145,7 @@ def poll_queue(self):
except WorkerSleepException as exc:
logger.debug("Worker has been requested to go to sleep")
self.mark_current_incomplete(restore_retry=True)
self._stop.set()
time.sleep(QUEUE_WORKER_SLEEP_DURATION)

except WorkerUnhealthyException as exc:
logger.error(
Expand Down
2 changes: 1 addition & 1 deletion workers/repositorygcworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


POLL_PERIOD_SECONDS = 60
REPOSITORY_GC_TIMEOUT = 24 * 60 * 60 # 24h
REPOSITORY_GC_TIMEOUT = 3 * 60 * 60 # 3h
LOCK_TIMEOUT_PADDING = 60 # 60 seconds


Expand Down

0 comments on commit 90f9ef9

Please sign in to comment.