Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROJQUAY-381 - failed mirror tag cleanup #516

Merged
merged 1 commit into from Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 10 additions & 16 deletions util/repomirror/skopeomirror.py
Expand Up @@ -7,6 +7,7 @@

logger = logging.getLogger(__name__)

SKOPEO_TIMEOUT_SECONDS = 300

# success: True or False whether call was successful
# tags: list of tags or empty list
Expand Down Expand Up @@ -120,21 +121,14 @@ def run_skopeo(self, args, proxy):
close_fds=True,
)

# Poll process for new output until finished
stdout = ""
stderr = ""
while True:
stdout_nextline = job.stdout.readline().decode("utf-8")
stdout = stdout + stdout_nextline
stderr_nextline = job.stderr.readline().decode("utf-8")
stderr = stderr + stderr_nextline
if stdout_nextline == "" and stderr_nextline == "" and job.poll() is not None:
break
if stderr_nextline != "":
logger.debug("Skopeo [STDERR]: %s" % stderr_nextline)
if stdout_nextline != "":
logger.debug("Skopeo [STDOUT]: %s" % stdout_nextline)

job.communicate()
try:
(stdout, stderr) = job.communicate(timeout=SKOPEO_TIMEOUT_SECONDS)
except subprocess.TimeoutExpired:
job.kill()
(stdout, stderr) = job.communicate()
stdout = stdout.decode("utf-8")
stderr = stderr.decode("utf-8")
logger.debug("Skopeo [STDERR]: %s" % stderr)
logger.debug("Skopeo [STDOUT]: %s" % stdout)

return SkopeoResults(job.returncode == 0, [], stdout, stderr)
50 changes: 32 additions & 18 deletions workers/repomirrorworker/__init__.py
Expand Up @@ -61,7 +61,7 @@ def process_mirrors(skopeo, token=None):
return None

iterator, next_token = model.repositories_to_mirror(start_token=token)
if iterator is None:
if not iterator:
logger.debug("Found no additional repositories to mirror")
return next_token

Expand Down Expand Up @@ -94,7 +94,7 @@ def perform_mirror(skopeo, mirror):
verbose_logs = False

mirror = claim_mirror(mirror)
if mirror == None:
if not mirror:
raise PreemptedException

emit_log(
Expand Down Expand Up @@ -152,8 +152,6 @@ def perform_mirror(skopeo, mirror):
now_ms = database.get_epoch_timestamp_ms()
overall_status = RepoMirrorStatus.SUCCESS
try:
delete_obsolete_tags(mirror, tags)

try:
username = (
mirror.external_registry_username.decrypt()
Expand All @@ -176,6 +174,11 @@ def perform_mirror(skopeo, mirror):
)

for tag in tags:
reclaimed_mirror = claim_mirror(mirror)
if not reclaimed_mirror:
raise PreemptedException
mirror = reclaimed_mirror

src_image = "docker://%s:%s" % (mirror.external_reference, tag)
dest_image = "docker://%s/%s/%s:%s" % (
dest_server,
Expand Down Expand Up @@ -223,15 +226,26 @@ def perform_mirror(skopeo, mirror):
)
logger.info("Source '%s' successful sync." % src_image)

mirror = claim_mirror(mirror)
if mirror is None:
emit_log(
mirror,
"repo_mirror_sync_failed",
"lost",
"'%s' with tag pattern '%s'"
% (mirror.external_reference, ",".join(mirror.root_rule.rule_value)),
)
reclaimed_mirror = claim_mirror(mirror)
if not reclaimed_mirror:
raise PreemptedException
mirror = reclaimed_mirror
delete_obsolete_tags(mirror, tags)

except PreemptedException as e:
overall_status = RepoMirrorStatus.FAIL
emit_log(
mirror,
"repo_mirror_sync_failed",
"lost",
"'%s' job lost" % (mirror.external_reference),
tags="",
stdout="Not applicable",
stderr="Not applicable",
)
release_mirror(mirror, overall_status)
return

except Exception as e:
overall_status = RepoMirrorStatus.FAIL
emit_log(
Expand Down Expand Up @@ -332,11 +346,10 @@ def _skopeo_inspect_failure(result):

def rollback(mirror, since_ms):
"""

:param mirror: Mirror to perform rollback on
:param start_time: Time mirror was started; all changes after will be undone
:return:
"""
:param mirror: Mirror to perform rollback on
:param start_time: Time mirror was started; all changes after will be undone
:return:
"""

repository_ref = registry_model.lookup_repository(
mirror.repository.namespace_user.username, mirror.repository.name
Expand Down Expand Up @@ -378,6 +391,7 @@ def delete_obsolete_tags(mirror, tags):
obsolete_tags = list([tag for tag in existing_tags if tag.name not in tags])

for tag in obsolete_tags:
logger.debug("Repo mirroring delete obsolete tag '%s'" % tag.name)
delete_tag(mirror.repository, tag.name)

return obsolete_tags
Expand Down