Skip to content

Commit

Permalink
Reaper : reaper fails when run with --scheme on rses without given sc…
Browse files Browse the repository at this point in the history
…heme : Closes #5791
  • Loading branch information
cserf authored and bari12 committed May 15, 2023
1 parent a820a21 commit 5cfb7d9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
21 changes: 15 additions & 6 deletions lib/rucio/daemons/reaper/reaper.py
Expand Up @@ -38,7 +38,7 @@
from rucio.common.exception import (DatabaseException, RSENotFound,
ReplicaUnAvailable, ReplicaNotFound, ServiceUnavailable,
RSEAccessDenied, ResourceTemporaryUnavailable, SourceNotFound,
VONotFound)
VONotFound, RSEProtocolNotSupported)
from rucio.common.logging import setup_logging
from rucio.common.types import InternalAccount
from rucio.common.stopwatch import Stopwatch
Expand Down Expand Up @@ -238,14 +238,18 @@ def delete_from_storage(heartbeat_handler, hb_payload, replicas, prot, rse_info,
return deleted_files


def _rse_deletion_hostname(rse: RseData) -> "Optional[str]":
def _rse_deletion_hostname(rse: RseData, scheme: "Optional[str]") -> "Optional[str]":
"""
Retrieves the hostname of the default deletion protocol
"""
rse.ensure_loaded(load_info=True)
for prot in rse.info['protocols']:
if prot['domains']['wan']['delete'] == 1:
return prot['hostname']
if scheme:
if prot['scheme'] == scheme and prot['domains']['wan']['delete'] != 0:
return prot['hostname']
else:
if prot['domains']['wan']['delete'] == 1:
return prot['hostname']
return None


Expand Down Expand Up @@ -559,9 +563,12 @@ def _run_once(rses_to_process, chunk_size, greedy, scheme,
percent = needed_free_space / tot_needed_free_space * 100
logger(logging.DEBUG, 'Working on %s. Percentage of the total space needed %.2f', rse.name, percent)

rse_hostname = _rse_deletion_hostname(rse)
rse_hostname = _rse_deletion_hostname(rse, scheme)
if not rse_hostname:
logger(logging.WARNING, 'No default delete protocol for %s', rse.name)
if scheme:
logger(logging.WARNING, 'Protocol %s not supported on %s', scheme, rse.name)
else:
logger(logging.WARNING, 'No default delete protocol for %s', rse.name)
REGION.set('pause_deletion_%s' % rse.id, True)
continue

Expand Down Expand Up @@ -641,6 +648,8 @@ def _run_once(rses_to_process, chunk_size, greedy, scheme,
delete_replicas(rse_id=rse.id, files=deleted_files)
logger(logging.DEBUG, 'delete_replicas successed on %s : %s replicas in %s seconds', rse.name, len(deleted_files), time.time() - del_start)
METRICS.counter('deletion.done').inc(len(deleted_files))
except RSEProtocolNotSupported:
logger(logging.WARNING, 'Protocol %s not supported on %s', scheme, rse.name)
except Exception:
logger(logging.CRITICAL, 'Exception', exc_info=True)

Expand Down
31 changes: 31 additions & 0 deletions lib/rucio/tests/test_reaper.py
Expand Up @@ -459,3 +459,34 @@ def test_archive_of_deleted_dids(vo, did_factory, root_account, core_config_mock
print(did)
deleted_dids.append(did)
assert len(deleted_dids) == len(dids)


@pytest.mark.parametrize("file_config_mock", [
# Run test twice: with, and without, temp tables
{"overrides": [('core', 'use_temp_tables', 'True')]},
{"overrides": [('core', 'use_temp_tables', 'False')]},
], indirect=True)
@pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [
'rucio.daemons.reaper.reaper.REGION'
]}], indirect=True)
def test_run_on_non_existing_scheme(vo, caches_mock, file_config_mock):
""" REAPER (DAEMON): Mock test the reaper daemon with a speficied scheme."""
[cache_region] = caches_mock
scope = InternalScope('data13_hip', vo=vo)

nb_files = 250
file_size = 200 # 2G
rse_name, rse_id, dids = __add_test_rse_and_replicas(vo=vo, scope=scope, rse_name=rse_name_generator(),
names=['lfn' + generate_uuid() for _ in range(nb_files)], file_size=file_size)

rse_core.set_rse_limits(rse_id=rse_id, name='MinFreeSpace', value=50 * file_size)
assert len(list(replica_core.list_replicas(dids=dids, rse_expression=rse_name))) == nb_files

# Now put it over threshold and delete
# Nothing should be deleted since the protocol doesn't exists for this RSE
# The reaper will set a flag pause_deletion_<rse_id>
cache_region.invalidate()
rse_core.set_rse_usage(rse_id=rse_id, source='storage', used=nb_files * file_size, free=1)
reaper(once=True, rses=[], include_rses=rse_name, exclude_rses=None, chunk_size=1000, scheme='https')
assert len(list(replica_core.list_replicas(dids, rse_expression=rse_name))) == 250
assert cache_region.get('pause_deletion_%s' % rse_id)

0 comments on commit 5cfb7d9

Please sign in to comment.