From aad3c42aba352c3043bd36724f4802ae46fd69aa Mon Sep 17 00:00:00 2001 From: Radu Carpa Date: Fri, 23 Apr 2021 11:24:33 +0200 Subject: [PATCH] Core & Internals: rework tombstone handling. #4491, #4436, #4188 Always set a default tombstone on any replica creation. Introduce two rse attributes which allow to customize the behavior per rse: - One allows to define the normal replica creation tombstone - The other used to configure the multihop temporary replica tombstone As a tombstone will now be added when the intermediate multihop replica is created, remove the redundant protection which was touching the tombstone in the finisher. Add a submitter test which ensures that multihop sources are created; and that the multihop request are created with a tombstone. --- lib/rucio/core/replica.py | 36 +++++++++++++--------- lib/rucio/core/transfer.py | 6 +++- lib/rucio/tests/test_conveyor_submitter.py | 25 +++++++++++++++ lib/rucio/tests/test_replica.py | 31 ++++++++++++++++++- 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/lib/rucio/core/replica.py b/lib/rucio/core/replica.py index c6e3e0962b..740b26acb7 100644 --- a/lib/rucio/core/replica.py +++ b/lib/rucio/core/replica.py @@ -81,6 +81,7 @@ from dogpile.cache.api import NO_VALUE REGION = make_region().configure('dogpile.cache.memory', expiration_time=60) +DEFAULT_TOMBSTONE_DELAY = timedelta(days=1) @read_session @@ -1358,6 +1359,20 @@ def __bulk_add_file_dids(files, account, dataset_meta=None, session=None): return new_files + available_files +def tombstone_from_delay(tombstone_delay): + tombstone = None + + if isinstance(tombstone_delay, timedelta): + tombstone_delay = tombstone_delay.total_seconds() + + if tombstone_delay: + try: + tombstone = datetime.utcnow() + timedelta(seconds=int(tombstone_delay)) + except ValueError: + pass + return tombstone + + @transactional_session def __bulk_add_replicas(rse_id, files, account, session=None): """ @@ -1380,6 +1395,9 @@ def __bulk_add_replicas(rse_id, files, account, session=None): filter(condition) available_replicas = [dict([(column, getattr(row, column)) for column in row._fields]) for row in query] + default_tombstone_delay = next(iter(get_rse_attribute('tombstone_delay', rse_id=rse_id, session=session)), DEFAULT_TOMBSTONE_DELAY) + default_tombstone = tombstone_from_delay(default_tombstone_delay) + new_replicas = [] for file in files: found = False @@ -1396,7 +1414,7 @@ def __bulk_add_replicas(rse_id, files, account, session=None): 'state': ReplicaState(file.get('state', 'A')), 'md5': file.get('md5'), 'adler32': file.get('adler32'), 'lock_cnt': file.get('lock_cnt', 0), - 'tombstone': file.get('tombstone')}) + 'tombstone': file.get('tombstone') or default_tombstone}) try: new_replicas and session.bulk_insert_mappings(models.RSEFileAssociation, new_replicas) @@ -1969,25 +1987,20 @@ def list_and_mark_unlocked_replicas(limit, bytes=None, rse_id=None, delay_second @transactional_session -def update_replicas_states(replicas, nowait=False, add_tombstone=False, session=None): +def update_replicas_states(replicas, nowait=False, session=None): """ Update File replica information and state. :param replicas: The list of replicas. :param nowait: Nowait parameter for the for_update queries. - :param add_tombstone: To set a tombstone in case there is no lock on the replica. :param session: The database session in use. """ for replica in replicas: query = session.query(models.RSEFileAssociation).filter_by(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name']) - lock_cnt = 0 try: if nowait: - rep = query.with_for_update(nowait=True).one() - else: - rep = query.one() - lock_cnt = rep.lock_cnt + query.with_for_update(nowait=True).one() except NoResultFound: # remember scope, name and rse raise exception.ReplicaNotFound("No row found for scope: %s name: %s rse: %s" % (replica['scope'], replica['name'], get_rse_name(replica['rse_id'], session=session))) @@ -2006,19 +2019,12 @@ def update_replicas_states(replicas, nowait=False, add_tombstone=False, session= values['tombstone'] = OBSOLETE elif replica['state'] == ReplicaState.AVAILABLE: rucio.core.lock.successful_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'], nowait=nowait, session=session) - # If No locks we set a tombstone in the future - if add_tombstone and lock_cnt == 0: - set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session) - elif replica['state'] == ReplicaState.UNAVAILABLE: rucio.core.lock.failed_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'], error_message=replica.get('error_message', None), broken_rule_id=replica.get('broken_rule_id', None), broken_message=replica.get('broken_message', None), nowait=nowait, session=session) - # If No locks we set a tombstone in the future - if add_tombstone and lock_cnt == 0: - set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session) elif replica['state'] == ReplicaState.TEMPORARY_UNAVAILABLE: query = query.filter(or_(models.RSEFileAssociation.state == ReplicaState.AVAILABLE, models.RSEFileAssociation.state == ReplicaState.TEMPORARY_UNAVAILABLE)) diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 6cbf14b7b8..8bf11d1671 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -65,7 +65,7 @@ from rucio.core.config import get as core_config_get from rucio.core.monitor import record_counter, record_timer from rucio.core.oidc import get_token_for_account_operation -from rucio.core.replica import add_replicas +from rucio.core.replica import add_replicas, tombstone_from_delay from rucio.core.request import queue_requests, set_requests_state from rucio.core.rse import get_rse_name, get_rse_vo, list_rses, get_rse_supported_checksums from rucio.core.rse_expression_parser import parse_expression @@ -108,6 +108,7 @@ WEBDAV_TRANSFER_MODE = config_get('conveyor', 'webdav_transfer_mode', False, None) +DEFAULT_MULTIHOP_TOMBSTONE_DELAY = datetime.timedelta(hours=2) def submit_bulk_transfers(external_host, files, transfertool='fts3', job_params={}, timeout=None, user_transfer_job=False, logger=logging.log): """ @@ -779,6 +780,8 @@ def protocol(self, rse_id, scheme, operation): except InvalidRSEExpression: multihop_rses = [] + default_multihop_tombstone_delay = core_config_get('transfers', 'multihop_tombstone_delay', default=DEFAULT_MULTIHOP_TOMBSTONE_DELAY, session=session) + for req_id, rule_id, scope, name, md5, adler32, bytes, activity, attributes, previous_attempt_id, dest_rse_id, account, source_rse_id, rse, deterministic, rse_type, path, retry_count, src_url, ranking, link_ranking in req_sources: if ranking is None: @@ -1134,6 +1137,7 @@ def protocol(self, rse_id, scheme, operation): 'bytes': transfers[req_id]['file_metadata']['filesize'], 'adler32': transfers[req_id]['file_metadata']['adler32'], 'md5': transfers[req_id]['file_metadata']['md5'], + 'tombstone': tombstone_from_delay(ctx.rse_attrs(dest_rse_id).get('multihop_tombstone_delay', default_multihop_tombstone_delay)), 'state': 'C'}] try: add_replicas(rse_id=hop['dest_rse_id'], diff --git a/lib/rucio/tests/test_conveyor_submitter.py b/lib/rucio/tests/test_conveyor_submitter.py index d16ce3f023..8e8002c215 100644 --- a/lib/rucio/tests/test_conveyor_submitter.py +++ b/lib/rucio/tests/test_conveyor_submitter.py @@ -26,7 +26,9 @@ from rucio.core import distance as distance_core from rucio.core import request as request_core from rucio.core import rse as rse_core +from rucio.core import replica as replica_core from rucio.core import rule as rule_core +from rucio.core import transfer as transfer_core from rucio.daemons.conveyor.submitter import submitter from rucio.db.sqla.models import Request, Source from rucio.db.sqla.constants import RequestState @@ -114,6 +116,16 @@ def test_multihop_sources_created(rse_factory, did_factory, root_account, core_c for rse_id in jump_rses: rse_core.add_rse_attribute(rse_id, 'available_for_multihop', True) + rse_tombstone_delay = 3600 + rse_multihop_tombstone_delay = 12 * 3600 + + # if both attributes are set, the multihop one will take precedence + rse_core.add_rse_attribute(jump_rse1_id, 'tombstone_delay', rse_tombstone_delay) + rse_core.add_rse_attribute(jump_rse1_id, 'multihop_tombstone_delay', rse_multihop_tombstone_delay) + + # if multihop delay not set, it's the default multihop takes precedence. Not normal tombstone delay. + rse_core.add_rse_attribute(jump_rse2_id, 'tombstone_delay', rse_tombstone_delay) + distance_core.add_distance(src_rse_id, jump_rse1_id, ranking=10) distance_core.add_distance(jump_rse1_id, jump_rse2_id, ranking=10) distance_core.add_distance(jump_rse2_id, jump_rse3_id, ranking=10) @@ -139,3 +151,16 @@ def __ensure_source_exists(rse_id, scope, name, session=None): # Ensure that sources where created for transfers for rse_id in jump_rses + [src_rse_id]: __ensure_source_exists(rse_id, **did) + + # Ensure the tombstone is correctly set on intermediate replicas + replica = replica_core.get_replica(jump_rse1_id, **did) + expected_tombstone = datetime.utcnow() + timedelta(seconds=rse_multihop_tombstone_delay) + assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5) + + replica = replica_core.get_replica(jump_rse2_id, **did) + expected_tombstone = datetime.utcnow() + transfer_core.DEFAULT_MULTIHOP_TOMBSTONE_DELAY + assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5) + + replica = replica_core.get_replica(jump_rse3_id, **did) + expected_tombstone = datetime.utcnow() + transfer_core.DEFAULT_MULTIHOP_TOMBSTONE_DELAY + assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5) diff --git a/lib/rucio/tests/test_replica.py b/lib/rucio/tests/test_replica.py index f657a9fb1a..3ce4e15912 100644 --- a/lib/rucio/tests/test_replica.py +++ b/lib/rucio/tests/test_replica.py @@ -52,7 +52,7 @@ from rucio.core.replica import (add_replica, add_replicas, delete_replicas, get_replicas_state, get_replica, list_replicas, declare_bad_file_replicas, list_bad_replicas, update_replica_state, get_RSEcoverage_of_dataset, get_replica_atime, - touch_replica, get_bad_pfns, set_tombstone) + touch_replica, get_bad_pfns, set_tombstone, DEFAULT_TOMBSTONE_DELAY) from rucio.core.rse import add_protocol, add_rse_attribute, del_rse_attribute from rucio.daemons.badreplicas.minos import run as minos_run from rucio.daemons.badreplicas.minos_temporary_expiration import run as minos_temp_run @@ -522,6 +522,35 @@ def test_set_tombstone(self, rse_factory, mock_scope, root_account): with pytest.raises(ReplicaNotFound): set_tombstone(rse_id, mock_scope, name) + def test_core_default_tombstone_correctly_set(rse_factory, did_factory, root_account): + """ REPLICA (CORE): Per-RSE default tombstone is correctly taken into consideration""" + + # One RSE has an attribute set, the other uses the default value for tombstone + rse1, rse1_id = rse_factory.make_mock_rse() + rse2, rse2_id = rse_factory.make_mock_rse() + tombstone_delay = 3600 + add_rse_attribute(rse_id=rse2_id, key='tombstone_delay', value=tombstone_delay) + + # Will use the default tombstone delay + did1 = did_factory.random_did() + add_replica(rse1_id, bytes=4, account=root_account, **did1) + tombstone = get_replica(rse1_id, **did1)['tombstone'] + expected_tombstone = datetime.utcnow() + DEFAULT_TOMBSTONE_DELAY + assert expected_tombstone - timedelta(minutes=5) < tombstone < expected_tombstone + timedelta(minutes=5) + + # Will use the configured value on the RSE + did2 = did_factory.random_did() + add_replica(rse2_id, bytes=4, account=root_account, **did2) + tombstone = get_replica(rse2_id, **did2)['tombstone'] + expected_tombstone = datetime.utcnow() + timedelta(seconds=tombstone_delay) + assert expected_tombstone - timedelta(minutes=5) < tombstone < expected_tombstone + timedelta(minutes=5) + + # Adding rule removes the tombstone + RuleClient().add_replication_rule([{'name': did1['name'], 'scope': did1['scope'].external}], 1, rse1, locked=True) + assert get_replica(rse1_id, **did1)['tombstone'] is None + RuleClient().add_replication_rule([{'name': did2['name'], 'scope': did2['scope'].external}], 1, rse2, locked=True) + assert get_replica(rse2_id, **did2)['tombstone'] is None + def test_list_replicas_with_updated_after(self, rse_factory, mock_scope, root_account): """ REPLICA (CORE): Add and list file replicas with updated_after filter """ _, rse_id = rse_factory.make_mock_rse()