From 29d7ea873029a7dbb5bf1208debd4f76225e2445 Mon Sep 17 00:00:00 2001 From: Martin Barisits Date: Fri, 5 Apr 2019 10:47:39 +0200 Subject: [PATCH] Dataset deletion: Mechanism to decrease contention in Undertaker; Fix #2355 --- lib/rucio/daemons/undertaker/undertaker.py | 32 +++++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/lib/rucio/daemons/undertaker/undertaker.py b/lib/rucio/daemons/undertaker/undertaker.py index 6843695130..053a71801b 100644 --- a/lib/rucio/daemons/undertaker/undertaker.py +++ b/lib/rucio/daemons/undertaker/undertaker.py @@ -15,7 +15,7 @@ # Authors: # - Vincent Garonne , 2013-2018 # - Cedric Serfon , 2013-2015 -# - Martin Barisits , 2016-2018 +# - Martin Barisits , 2016-2019 # - Hannes Hansen , 2018-2019 # # PY3K COMPATIBLE @@ -32,9 +32,15 @@ import time import traceback +from copy import deepcopy +from datetime import datetime, timedelta +from re import match +from random import randint + +from sqlalchemy.exc import DatabaseError from rucio.common.config import config_get -from rucio.common.exception import DatabaseException, RuleNotFound +from rucio.common.exception import DatabaseException, UnsupportedOperation, RuleNotFound from rucio.common.utils import chunks from rucio.core.heartbeat import live, die, sanity_check from rucio.core.monitor import record_counter @@ -62,12 +68,24 @@ def undertaker(worker_number=1, total_workers=1, chunk_size=5, once=False): pid = os.getpid() thread = threading.current_thread() sanity_check(executable='rucio-undertaker', hostname=hostname) + + paused_dids = {} # {(scope, name): datetime} + while not GRACEFUL_STOP.is_set(): try: heartbeat = live(executable='rucio-undertaker', hostname=hostname, pid=pid, thread=thread, older_than=6000) logging.info('Undertaker({0[worker_number]}/{0[total_workers]}): Live gives {0[heartbeat]}'.format(locals())) + # Refresh paused dids + iter_paused_dids = deepcopy(paused_dids) + for key in iter_paused_dids: + if datetime.utcnow() > paused_dids[key]: + del paused_dids[key] + dids = list_expired_dids(worker_number=heartbeat['assign_thread'] + 1, total_workers=heartbeat['nr_threads'], limit=10000) + + dids = [did for did in dids if (did['scope'], did['name']) not in paused_dids] + if not dids and not once: logging.info('Undertaker(%s): Nothing to do. sleep 60.', worker_number) time.sleep(60) @@ -81,8 +99,14 @@ def undertaker(worker_number=1, total_workers=1, chunk_size=5, once=False): record_counter(counters='undertaker.delete_dids', delta=len(chunk)) except RuleNotFound as error: logging.error(error) - except DatabaseException as error: - logging.error('Undertaker(%s): Got database error %s.', worker_number, str(error)) + except (DatabaseException, DatabaseError, UnsupportedOperation) as e: + if match('.*ORA-00054.*', str(e.args[0])) or match('.*55P03.*', str(e.args[0])) or match('.*3572.*', str(e.args[0])): + for did in chunk: + paused_dids[(did['scope'], did['name'])] = datetime.utcnow() + timedelta(seconds=randint(600, 2400)) + record_counter('undertaker.delete_dids.exceptions.LocksDetected') + logging.warning('undertaker[%s/%s]: Locks detected for chunk', heartbeat['assign_thread'], heartbeat['nr_threads']) + else: + logging.error('Undertaker(%s): Got database error %s.', worker_number, str(error)) except: logging.critical(traceback.format_exc()) time.sleep(1)