diff --git a/bin/rucio-judge-repairer b/bin/rucio-judge-repairer index 8eeef90730..7066eb9f0d 100755 --- a/bin/rucio-judge-repairer +++ b/bin/rucio-judge-repairer @@ -19,9 +19,8 @@ Judge-Repairer is a daemon to repair stuck replication rules. """ import argparse -import signal -from rucio.daemons.judge.repairer import run, stop +from rucio.daemons.judge.repairer import JudgeRepairer def get_parser(): @@ -36,10 +35,10 @@ def get_parser(): if __name__ == "__main__": - signal.signal(signal.SIGTERM, stop) parser = get_parser() args = parser.parse_args() + judge_repairer = JudgeRepairer(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time) try: - run(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time) + judge_repairer.run() except KeyboardInterrupt: - stop() + judge_repairer.stop() diff --git a/lib/rucio/daemons/judge/repairer.py b/lib/rucio/daemons/judge/repairer.py index b797f202b0..64db8f91bb 100644 --- a/lib/rucio/daemons/judge/repairer.py +++ b/lib/rucio/daemons/judge/repairer.py @@ -16,130 +16,79 @@ """ Judge-Repairer is a daemon to repair stuck replication rules. """ -import functools import logging -import threading import time import traceback from copy import deepcopy from datetime import datetime, timedelta from random import randint from re import match -from typing import TYPE_CHECKING +from typing import Any from rucio.db.sqla.constants import ORACLE_CONNECTION_LOST_CONTACT_REGEX, ORACLE_RESOURCE_BUSY_REGEX from sqlalchemy.exc import DatabaseError -import rucio.db.sqla.util -from rucio.common import exception from rucio.common.exception import DatabaseException -from rucio.common.logging import setup_logging from rucio.core.monitor import MetricManager from rucio.core.rule import repair_rule, get_stuck_rules -from rucio.daemons.common import run_daemon - -if TYPE_CHECKING: - from types import FrameType - from typing import Optional +from rucio.daemons.common import Daemon, HeartbeatHandler METRICS = MetricManager(module=__name__) -graceful_stop = threading.Event() -DAEMON_NAME = 'judge-repairer' - - -def rule_repairer(once=False, sleep_time=60): - """ - Main loop to check for STUCK replication rules - """ - paused_rules = {} # {rule_id: datetime} - run_daemon( - once=once, - graceful_stop=graceful_stop, - executable=DAEMON_NAME, - partition_wait_time=1, - sleep_time=sleep_time, - run_once_fnc=functools.partial( - run_once, - paused_rules=paused_rules, - delta=-1 if once else 1800, - ) - ) - - -def run_once(paused_rules, delta, heartbeat_handler, **_kwargs): - worker_number, total_workers, logger = heartbeat_handler.live() - - start = time.time() - - # Refresh paused rules - iter_paused_rules = deepcopy(paused_rules) - for key in iter_paused_rules: - if datetime.utcnow() > paused_rules[key]: - del paused_rules[key] - - # Select a bunch of rules for this worker to repair - rules = get_stuck_rules(total_workers=total_workers, - worker_number=worker_number, - delta=delta, - limit=100, - blocked_rules=[key for key in paused_rules]) - - logger(logging.DEBUG, 'index query time %f fetch size is %d' % (time.time() - start, len(rules))) - - if not rules: - logger(logging.DEBUG, 'did not get any work (paused_rules=%s)' % (str(len(paused_rules)))) - return - - for rule_id in rules: - _, _, logger = heartbeat_handler.live() - rule_id = rule_id[0] - logger(logging.INFO, 'Repairing rule %s' % (rule_id)) - if graceful_stop.is_set(): - break - try: - start = time.time() - repair_rule(rule_id=rule_id) - logger(logging.DEBUG, 'repairing of %s took %f' % (rule_id, time.time() - start)) - except (DatabaseException, DatabaseError) as e: - if match(ORACLE_RESOURCE_BUSY_REGEX, str(e.args[0])): - paused_rules[rule_id] = datetime.utcnow() + timedelta(seconds=randint(600, 2400)) - logger(logging.WARNING, 'Locks detected for %s' % (rule_id)) - METRICS.counter('exceptions.{exception}').labels(exception='LocksDetected').inc() - elif match('.*QueuePool.*', str(e.args[0])): - logger(logging.WARNING, traceback.format_exc()) - METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() - elif match(ORACLE_CONNECTION_LOST_CONTACT_REGEX, str(e.args[0])): - logger(logging.WARNING, traceback.format_exc()) - METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() - else: - logger(logging.ERROR, traceback.format_exc()) - METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() - - -def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None: - """ - Graceful exit. - """ - - graceful_stop.set() - - -def run(once=False, threads=1, sleep_time=60): - """ - Starts up the Judge-Repairer threads. - """ - setup_logging(process_name=DAEMON_NAME) - if rucio.db.sqla.util.is_old_db(): - raise exception.DatabaseException('Database was not updated, daemon won\'t start') - if once: - rule_repairer(once) - else: - logging.info('Repairer starting %s threads' % str(threads)) - threads = [threading.Thread(target=rule_repairer, kwargs={'once': once, - 'sleep_time': sleep_time}) for i in range(0, threads)] - [t.start() for t in threads] - # Interruptible joins require a timeout. - while threads[0].is_alive(): - [t.join(timeout=3.14) for t in threads] +class JudgeRepairer(Daemon): + def __init__(self, **_kwargs) -> None: + super().__init__(daemon_name="judge-repairer", **_kwargs) + self.delta = -1 if self.once else 1800 + self.paused_rules = {} # {rule_id: datetime} + + def _run_once(self, heartbeat_handler: "HeartbeatHandler", **_kwargs) -> tuple[bool, Any]: + worker_number, total_workers, logger = heartbeat_handler.live() + must_sleep = False + + start = time.time() + + # Refresh paused rules + iter_paused_rules = deepcopy(self.paused_rules) + for key in iter_paused_rules: + if datetime.utcnow() > self.paused_rules[key]: + del self.paused_rules[key] + + # Select a bunch of rules for this worker to repair + rules = get_stuck_rules(total_workers=total_workers, + worker_number=worker_number, + delta=self.delta, + limit=100, + blocked_rules=[key for key in self.paused_rules]) + + logger(logging.DEBUG, 'index query time %f fetch size is %d' % (time.time() - start, len(rules))) + + if not rules: + logger(logging.DEBUG, 'did not get any work (paused_rules=%s)' % (str(len(self.paused_rules)))) + return must_sleep, None + + for rule_id in rules: + _, _, logger = heartbeat_handler.live() + rule_id = rule_id[0] + logger(logging.INFO, 'Repairing rule %s' % (rule_id)) + if self.graceful_stop.is_set(): + break + try: + start = time.time() + repair_rule(rule_id=rule_id) + logger(logging.DEBUG, 'repairing of %s took %f' % (rule_id, time.time() - start)) + except (DatabaseException, DatabaseError) as e: + if match(ORACLE_RESOURCE_BUSY_REGEX, str(e.args[0])): + self.paused_rules[rule_id] = datetime.utcnow() + timedelta(seconds=randint(600, 2400)) + logger(logging.WARNING, 'Locks detected for %s' % (rule_id)) + METRICS.counter('exceptions.{exception}').labels(exception='LocksDetected').inc() + elif match('.*QueuePool.*', str(e.args[0])): + logger(logging.WARNING, traceback.format_exc()) + METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() + elif match(ORACLE_CONNECTION_LOST_CONTACT_REGEX, str(e.args[0])): + logger(logging.WARNING, traceback.format_exc()) + METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() + else: + logger(logging.ERROR, traceback.format_exc()) + METRICS.counter('exceptions.{exception}').labels(exception=e.__class__.__name__).inc() + return must_sleep, None diff --git a/lib/rucio/daemons/reaper/reaper.py b/lib/rucio/daemons/reaper/reaper.py index 301349ab7f..484b9b28f6 100644 --- a/lib/rucio/daemons/reaper/reaper.py +++ b/lib/rucio/daemons/reaper/reaper.py @@ -19,7 +19,6 @@ import logging import random -import threading import time import traceback from configparser import NoOptionError, NoSectionError @@ -69,7 +68,7 @@ if TYPE_CHECKING: from collections.abc import Callable -GRACEFUL_STOP = threading.Event() + METRICS = MetricManager(module=__name__) REGION = make_region_memcached(expiration_time=600) EXCLUDED_RSE_GAUGE = METRICS.gauge( diff --git a/lib/rucio/daemons/undertaker/undertaker.py b/lib/rucio/daemons/undertaker/undertaker.py index 634a701a4c..1d997ae9f6 100644 --- a/lib/rucio/daemons/undertaker/undertaker.py +++ b/lib/rucio/daemons/undertaker/undertaker.py @@ -18,7 +18,6 @@ """ import logging -import threading import traceback from copy import deepcopy from datetime import datetime, timedelta @@ -43,7 +42,6 @@ logging.getLogger("requests").setLevel(logging.CRITICAL) METRICS = MetricManager(module=__name__) -graceful_stop = threading.Event() class Undertaker(Daemon): diff --git a/tests/test_daemons.py b/tests/test_daemons.py index e3178cbe78..370bb923c6 100644 --- a/tests/test_daemons.py +++ b/tests/test_daemons.py @@ -28,7 +28,7 @@ from rucio.daemons.conveyor import finisher, poller, receiver, stager, submitter, throttler, preparer from rucio.daemons.follower import follower from rucio.daemons.hermes import hermes -from rucio.daemons.judge import cleaner, evaluator, injector, repairer +from rucio.daemons.judge import cleaner, evaluator, injector from rucio.daemons.oauthmanager import oauthmanager from rucio.daemons.reaper import dark_reaper from rucio.daemons.replicarecoverer import suspicious_replica_recoverer @@ -59,7 +59,6 @@ cleaner, evaluator, injector, - repairer, oauthmanager, dark_reaper, suspicious_replica_recoverer, diff --git a/tests/test_judge_repairer.py b/tests/test_judge_repairer.py index 446072993e..38ff8a41fa 100644 --- a/tests/test_judge_repairer.py +++ b/tests/test_judge_repairer.py @@ -31,7 +31,7 @@ from rucio.core.rse import add_rse_attribute, add_rse, update_rse from rucio.core.rule import get_rule, add_rule from rucio.daemons.judge.evaluator import re_evaluator -from rucio.daemons.judge.repairer import rule_repairer +from rucio.daemons.judge.repairer import JudgeRepairer from rucio.db.sqla import models from rucio.db.sqla.constants import DIDType, RuleState, ReplicaState from rucio.db.sqla.session import get_session @@ -92,8 +92,7 @@ def setUpClass(cls): def test_to_repair_a_rule_with_NONE_grouping_whose_transfer_failed(self): """ JUDGE REPAIRER: Test to repair a rule with 1 failed transfer (lock)""" - - rule_repairer(once=True) # Clean out the repairer + JudgeRepairer(once=True).run() # Clean out the repairer scope = InternalScope('mock', **self.vo) files = create_files(3, scope, self.rse4_id, bytes_=100) dataset = did_name_generator('dataset') @@ -112,7 +111,7 @@ def test_to_repair_a_rule_with_NONE_grouping_whose_transfer_failed(self): assert (rule_id == get_rule(rule_id)['id'].replace('-', '').lower()) assert (RuleState.STUCK == get_rule(rule_id)['state']) - rule_repairer(once=True) + JudgeRepairer(once=True).run() assert (RuleState.REPLICATING == get_rule(rule_id)['state']) assert (get_replica(scope=files[2]['scope'], name=files[2]['name'], rse_id=failed_rse_id)['state'] == ReplicaState.UNAVAILABLE) assert (get_replica(scope=files[2]['scope'], name=files[2]['name'], rse_id=failed_rse_id)['lock_cnt'] == 0) @@ -120,7 +119,7 @@ def test_to_repair_a_rule_with_NONE_grouping_whose_transfer_failed(self): def test_to_repair_a_rule_with_ALL_grouping_whose_transfer_failed(self): """ JUDGE REPAIRER: Test to repair a rule with 1 failed transfer (lock)""" - rule_repairer(once=True) # Clean out the repairer + JudgeRepairer(once=True).run() # Clean out the repairer scope = InternalScope('mock', **self.vo) files = create_files(4, scope, self.rse4_id, bytes_=100) dataset = did_name_generator('dataset') @@ -136,7 +135,7 @@ def test_to_repair_a_rule_with_ALL_grouping_whose_transfer_failed(self): assert (rule_id == get_rule(rule_id)['id'].replace('-', '').lower()) assert (RuleState.STUCK == get_rule(rule_id)['state']) - rule_repairer(once=True) + JudgeRepairer(once=True).run() assert (RuleState.REPLICATING == get_rule(rule_id)['state']) assert (get_replica_locks(scope=files[2]['scope'], name=files[2]['name'])[0].rse_id == get_replica_locks(scope=files[3]['scope'], name=files[3]['name'])[0].rse_id) assert (get_replica_locks(scope=files[1]['scope'], name=files[1]['name'])[0].rse_id == get_replica_locks(scope=files[3]['scope'], name=files[3]['name'])[0].rse_id) @@ -144,7 +143,7 @@ def test_to_repair_a_rule_with_ALL_grouping_whose_transfer_failed(self): def test_to_repair_a_rule_with_DATASET_grouping_whose_transfer_failed(self): """ JUDGE REPAIRER: Test to repair a rule with 1 failed transfer (lock)""" - rule_repairer(once=True) # Clean out the repairer + JudgeRepairer(once=True).run() # Clean out the repairer scope = InternalScope('mock', **self.vo) files = create_files(4, scope, self.rse4_id, bytes_=100) dataset = did_name_generator('dataset') @@ -160,7 +159,7 @@ def test_to_repair_a_rule_with_DATASET_grouping_whose_transfer_failed(self): assert (rule_id == get_rule(rule_id)['id'].replace('-', '').lower()) assert (RuleState.STUCK == get_rule(rule_id)['state']) - rule_repairer(once=True) + JudgeRepairer(once=True).run() assert (RuleState.REPLICATING == get_rule(rule_id)['state']) assert (get_replica_locks(scope=files[2]['scope'], name=files[2]['name'])[0].rse_id == get_replica_locks(scope=files[3]['scope'], name=files[3]['name'])[0].rse_id) assert (get_replica_locks(scope=files[1]['scope'], name=files[1]['name'])[0].rse_id == get_replica_locks(scope=files[3]['scope'], name=files[3]['name'])[0].rse_id) @@ -194,7 +193,7 @@ def test_repair_a_rule_with_missing_locks(self): rule.state = RuleState.STUCK session.commit() - rule_repairer(once=True) + JudgeRepairer(once=True).run() for file in files: assert (len(get_replica_locks(scope=file['scope'], name=file['name'])) == 2) @@ -231,7 +230,7 @@ def test_repair_a_rule_with_source_replica_expression(self): replica.state = ReplicaState.AVAILABLE session.commit() - rule_repairer(once=True) + JudgeRepairer(once=True).run() assert (RuleState.OK == get_rule(rule_id1)['state']) assert (RuleState.REPLICATING == get_rule(rule_id2)['state']) @@ -239,7 +238,7 @@ def test_repair_a_rule_with_source_replica_expression(self): def test_to_repair_a_rule_with_only_1_rse_whose_transfers_failed(self): """ JUDGE REPAIRER: Test to repair a rule with only 1 rse whose transfers failed (lock)""" - rule_repairer(once=True) # Clean out the repairer + JudgeRepairer(once=True).run() # Clean out the repairer scope = InternalScope('mock', **self.vo) files = create_files(4, scope, self.rse4_id, bytes_=100) dataset = did_name_generator('dataset') @@ -259,7 +258,7 @@ def test_to_repair_a_rule_with_only_1_rse_whose_transfers_failed(self): assert (rule_id == get_rule(rule_id)['id'].replace('-', '').lower()) assert (RuleState.STUCK == get_rule(rule_id)['state']) - rule_repairer(once=True) + JudgeRepairer(once=True).run() # Stil assert STUCK because of delays: assert (RuleState.STUCK == get_rule(rule_id)['state']) @@ -270,7 +269,7 @@ def test_to_repair_a_rule_with_only_1_rse_whose_transfers_failed(self): def test_to_repair_a_rule_with_NONE_grouping_whose_transfer_failed_and_flipping_to_other_rse(self): """ JUDGE REPAIRER: Test to repair a rule with 1 failed transfer and flip to other rse(lock)""" - rule_repairer(once=True) # Clean out the repairer + JudgeRepairer(once=True).run() # Clean out the repairer scope = InternalScope('mock', **self.vo) files = create_files(4, scope, self.rse4_id, bytes_=100) dataset = did_name_generator('dataset') @@ -288,7 +287,7 @@ def test_to_repair_a_rule_with_NONE_grouping_whose_transfer_failed_and_flipping_ assert (rule_id == get_rule(rule_id)['id'].replace('-', '').lower()) assert (RuleState.STUCK == get_rule(rule_id)['state']) - rule_repairer(once=True) + JudgeRepairer(once=True).run() assert (RuleState.REPLICATING == get_rule(rule_id)['state']) assert (get_replica_locks(scope=files[3]['scope'], name=files[3]['name'])[0].rse_id != old_rse_id) @@ -298,7 +297,7 @@ def test_to_repair_a_rule_with_only_1_rse_whose_site_is_blocklisted(self): rse = rse_name_generator() rse_id = add_rse(rse, **self.vo) set_local_account_limit(self.jdoe, rse_id, -1) - rule_repairer(once=True) # Clean out the repairer + JudgeRepairer(once=True).run() # Clean out the repairer region = make_region().configure( 'dogpile.cache.pymemcache', @@ -320,22 +319,31 @@ def change_availability(new_value): if ignore_availability: change_availability(False) - rule_id = add_rule(dids=[{'scope': scope, 'name': dataset}], account=self.jdoe, copies=1, rse_expression=rse, grouping=grouping, weight=None, lifetime=None, locked=False, subscription_id=None, ignore_availability=ignore_availability, activity='DebugJudge')[0] + rule_id = add_rule( + dids=[{'scope': scope, 'name': dataset}], + account=self.jdoe, copies=1, rse_expression=rse, grouping=grouping, + weight=None, lifetime=None, locked=False, subscription_id=None, + ignore_availability=ignore_availability, activity='DebugJudge')[0] assert (RuleState.STUCK == get_rule(rule_id)['state']) - rule_repairer(once=True) + JudgeRepairer(once=True).run() assert (RuleState.REPLICATING == get_rule(rule_id)['state']) change_availability(True) else: - rule_id = add_rule(dids=[{'scope': scope, 'name': dataset}], account=self.jdoe, copies=1, rse_expression=rse, grouping=grouping, weight=None, lifetime=None, locked=False, subscription_id=None, ignore_availability=ignore_availability, activity='DebugJudge')[0] + rule_id = add_rule( + dids=[{'scope': scope, 'name': dataset}], + account=self.jdoe, copies=1, rse_expression=rse, + grouping=grouping, weight=None, lifetime=None, locked=False, + subscription_id=None, + ignore_availability=ignore_availability, activity='DebugJudge')[0] failed_transfer(scope=scope, name=files[0]['name'], rse_id=get_replica_locks(scope=files[0]['scope'], name=files[0]['name'])[0].rse_id) change_availability(False) assert (RuleState.STUCK == get_rule(rule_id)['state']) - rule_repairer(once=True) + JudgeRepairer(once=True).run() assert (RuleState.STUCK == get_rule(rule_id)['state']) change_availability(True) - rule_repairer(once=True) + JudgeRepairer(once=True).run() assert (RuleState.REPLICATING == get_rule(rule_id)['state'])