Skip to content

Commit

Permalink
Deletion: Fix DarkReaper arguments Fix #4285 (#4286)
Browse files Browse the repository at this point in the history
* Dark reaper patch
* cleanup code
  • Loading branch information
ericvaandering authored and bari12 committed Feb 4, 2021
1 parent 34de52d commit d1b4c0d
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions lib/rucio/daemons/reaper/dark_reaper.py
Expand Up @@ -24,6 +24,7 @@
# - Patrick Austin <patrick.austin@stfc.ac.uk>, 2020
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
# - Dimitrios Christidis <dimitrios.christidis@cern.ch>, 2020-2021
# - Eric Vaandering <ewv@fnal.gov>, 2021

'''
Dark Reaper is a daemon to manage quarantined file deletion.
Expand Down Expand Up @@ -89,8 +90,10 @@ def reaper(rses=[], worker_number=0, total_workers=1, chunk_size=100, once=False
while not GRACEFUL_STOP.is_set():
try:
# heartbeat
heartbeat = live(executable=executable, hostname=hostname, pid=pid, thread=thread, hash_executable=hash_executable)
logging.info('Dark Reaper({0[worker_number]}/{0[total_workers]}): Live gives {0[heartbeat]}'.format(locals()))
heartbeat = live(executable=executable, hostname=hostname, pid=pid, thread=thread,
hash_executable=hash_executable)
logging.info('Dark Reaper({0[worker_number]}/{0[total_workers]}): Live gives {0[heartbeat]}'
.format(locals()))
nothing_to_do = True

random.shuffle(rses)
Expand All @@ -117,11 +120,13 @@ def reaper(rses=[], worker_number=0, total_workers=1, chunk_size=100, once=False
'path': replica['path']}],
operation='delete',
scheme=scheme).values())[0])
logging.info('Dark Reaper %s-%s: Deletion ATTEMPT of %s:%s as %s on %s', worker_number, total_workers, scope, replica['name'], pfn, rse)
logging.info('Dark Reaper %s-%s: Deletion ATTEMPT of %s:%s as %s on %s',
worker_number, total_workers, scope, replica['name'], pfn, rse)
start = time.time()
prot.delete(pfn)
duration = time.time() - start
logging.info('Dark Reaper %s-%s: Deletion SUCCESS of %s:%s as %s on %s in %s seconds', worker_number, total_workers, scope, replica['name'], pfn, rse, duration)
logging.info('Dark Reaper %s-%s: Deletion SUCCESS of %s:%s as %s on %s in %s seconds',
worker_number, total_workers, scope, replica['name'], pfn, rse, duration)
payload = {'scope': scope,
'name': replica['name'],
'rse': rse,
Expand All @@ -136,11 +141,13 @@ def reaper(rses=[], worker_number=0, total_workers=1, chunk_size=100, once=False
add_message('deletion-done', payload)
deleted_replicas.append(replica)
except SourceNotFound:
err_msg = 'Dark Reaper %s-%s: Deletion NOTFOUND of %s:%s as %s on %s' % (worker_number, total_workers, scope, replica['name'], pfn, rse)
err_msg = ('Dark Reaper %s-%s: Deletion NOTFOUND of %s:%s as %s on %s'
% (worker_number, total_workers, scope, replica['name'], pfn, rse))
logging.warning(err_msg)
deleted_replicas.append(replica)
except (ServiceUnavailable, RSEAccessDenied, ResourceTemporaryUnavailable) as error:
err_msg = 'Dark Reaper %s-%s: Deletion NOACCESS of %s:%s as %s on %s: %s' % (worker_number, total_workers, scope, replica['name'], pfn, rse, str(error))
err_msg = ('Dark Reaper %s-%s: Deletion NOACCESS of %s:%s as %s on %s: %s'
% (worker_number, total_workers, scope, replica['name'], pfn, rse, str(error)))
logging.warning(err_msg)
payload = {'scope': scope,
'name': replica['name'],
Expand Down Expand Up @@ -219,11 +226,13 @@ def run(total_workers=1, chunk_size=100, once=False, rses=[], scheme=None,
if vos:
invalid = set(vos) - set([v['vo'] for v in list_vos()])
if invalid:
msg = 'VO{} {} cannot be found'.format('s' if len(invalid) > 1 else '', ', '.join([repr(v) for v in invalid]))
msg = 'VO{} {} cannot be found'.format('s' if len(invalid) > 1 else '',
', '.join([repr(v) for v in invalid]))
raise VONotFound(msg)
else:
vos = [v['vo'] for v in list_vos()]
logging.info('Dark Reaper: This instance will work on VO%s: %s' % ('s' if len(vos) > 1 else '', ', '.join([v for v in vos])))
logging.info('Dark Reaper: This instance will work on VO%s: %s'
% ('s' if len(vos) > 1 else '', ', '.join([v for v in vos])))

all_rses = []
for vo in vos:
Expand All @@ -240,11 +249,11 @@ def run(total_workers=1, chunk_size=100, once=False, rses=[], scheme=None,
rses = all_rses

if exclude_rses:
excluded_rses = parse_expression(exclude_rses)
excluded_rses = [rse['id'] for rse in parse_expression(exclude_rses)]
rses = [rse for rse in rses if rse not in excluded_rses]

if include_rses:
included_rses = parse_expression(include_rses)
included_rses = [rse['id'] for rse in parse_expression(include_rses)]
rses = [rse for rse in rses if rse in included_rses]

if not rses:
Expand All @@ -259,7 +268,8 @@ def run(total_workers=1, chunk_size=100, once=False, rses=[], scheme=None,
'once': once,
'chunk_size': chunk_size,
'scheme': scheme}
threads.append(threading.Thread(target=reaper, kwargs=kwargs, name='Worker: %s, Total_Workers: %s' % (worker, total_workers)))
threads.append(threading.Thread(target=reaper, kwargs=kwargs,
name='Worker: %s, Total_Workers: %s' % (worker, total_workers)))
[t.start() for t in threads]
while threads[0].is_alive():
[t.join(timeout=3.14) for t in threads]

0 comments on commit d1b4c0d

Please sign in to comment.