diff --git a/lib/rucio/core/replica.py b/lib/rucio/core/replica.py index b9b78a3cbd4..3f237931245 100644 --- a/lib/rucio/core/replica.py +++ b/lib/rucio/core/replica.py @@ -1366,6 +1366,22 @@ def __bulk_add_file_dids(files, account, dataset_meta=None, session=None): return new_files + available_files +def tombstone_from_delay(tombstone_delay): + if not tombstone_delay: + return None + + if not isinstance(tombstone_delay, timedelta): + try: + tombstone_delay = timedelta(seconds=int(tombstone_delay)) + except ValueError: + return None + + if tombstone_delay < timedelta(0): + return datetime(1970, 1, 1) + + return datetime.utcnow() + tombstone_delay + + @transactional_session def __bulk_add_replicas(rse_id, files, account, session=None): """ @@ -1388,6 +1404,9 @@ def __bulk_add_replicas(rse_id, files, account, session=None): filter(condition) available_replicas = [dict([(column, getattr(row, column)) for column in row._fields]) for row in query] + default_tombstone_delay = next(iter(get_rse_attribute('tombstone_delay', rse_id=rse_id, session=session)), None) + default_tombstone = tombstone_from_delay(default_tombstone_delay) + new_replicas = [] for file in files: found = False @@ -1404,7 +1423,7 @@ def __bulk_add_replicas(rse_id, files, account, session=None): 'state': ReplicaState(file.get('state', 'A')), 'md5': file.get('md5'), 'adler32': file.get('adler32'), 'lock_cnt': file.get('lock_cnt', 0), - 'tombstone': file.get('tombstone')}) + 'tombstone': file.get('tombstone') or default_tombstone}) try: new_replicas and session.bulk_insert_mappings(models.RSEFileAssociation, new_replicas) @@ -1956,6 +1975,10 @@ def list_and_mark_unlocked_replicas(limit, bytes=None, rse_id=None, delay_second filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\ filter(or_(models.RSEFileAssociation.state.in_((ReplicaState.AVAILABLE, ReplicaState.UNAVAILABLE, ReplicaState.BAD)), and_(models.RSEFileAssociation.state == ReplicaState.BEING_DELETED, models.RSEFileAssociation.updated_at < datetime.utcnow() - timedelta(seconds=delay_seconds)))).\ + filter(~exists(select([1]).prefix_with("/*+ INDEX(SOURCES SOURCES_SC_NM_DST_IDX) */", dialect='oracle') + .where(and_(models.RSEFileAssociation.scope == models.Source.scope, + models.RSEFileAssociation.name == models.Source.name, + models.RSEFileAssociation.rse_id == models.Source.rse_id)))).\ with_for_update(skip_locked=True).\ order_by(models.RSEFileAssociation.tombstone) @@ -2023,25 +2046,20 @@ def list_and_mark_unlocked_replicas(limit, bytes=None, rse_id=None, delay_second @transactional_session -def update_replicas_states(replicas, nowait=False, add_tombstone=False, session=None): +def update_replicas_states(replicas, nowait=False, session=None): """ Update File replica information and state. :param replicas: The list of replicas. :param nowait: Nowait parameter for the for_update queries. - :param add_tombstone: To set a tombstone in case there is no lock on the replica. :param session: The database session in use. """ for replica in replicas: query = session.query(models.RSEFileAssociation).filter_by(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name']) - lock_cnt = 0 try: if nowait: - rep = query.with_for_update(nowait=True).one() - else: - rep = query.one() - lock_cnt = rep.lock_cnt + query.with_for_update(nowait=True).one() except NoResultFound: # remember scope, name and rse raise exception.ReplicaNotFound("No row found for scope: %s name: %s rse: %s" % (replica['scope'], replica['name'], get_rse_name(replica['rse_id'], session=session))) @@ -2060,19 +2078,12 @@ def update_replicas_states(replicas, nowait=False, add_tombstone=False, session= values['tombstone'] = OBSOLETE elif replica['state'] == ReplicaState.AVAILABLE: rucio.core.lock.successful_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'], nowait=nowait, session=session) - # If No locks we set a tombstone in the future - if add_tombstone and lock_cnt == 0: - set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session) - elif replica['state'] == ReplicaState.UNAVAILABLE: rucio.core.lock.failed_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'], error_message=replica.get('error_message', None), broken_rule_id=replica.get('broken_rule_id', None), broken_message=replica.get('broken_message', None), nowait=nowait, session=session) - # If No locks we set a tombstone in the future - if add_tombstone and lock_cnt == 0: - set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session) elif replica['state'] == ReplicaState.TEMPORARY_UNAVAILABLE: query = query.filter(or_(models.RSEFileAssociation.state == ReplicaState.AVAILABLE, models.RSEFileAssociation.state == ReplicaState.TEMPORARY_UNAVAILABLE)) diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index dc31faa57bd..b9f2e2ed7b2 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -65,7 +65,7 @@ from rucio.core.config import get as core_config_get from rucio.core.monitor import record_counter, record_timer from rucio.core.oidc import get_token_for_account_operation -from rucio.core.replica import add_replicas +from rucio.core.replica import add_replicas, tombstone_from_delay from rucio.core.request import queue_requests, set_requests_state from rucio.core.rse import get_rse_name, get_rse_vo, list_rses, get_rse_supported_checksums from rucio.core.rse_expression_parser import parse_expression @@ -118,6 +118,8 @@ WEBDAV_TRANSFER_MODE = config_get('conveyor', 'webdav_transfer_mode', False, None) +DEFAULT_MULTIHOP_TOMBSTONE_DELAY = datetime.timedelta(hours=2) + def submit_bulk_transfers(external_host, files, transfertool='fts3', job_params={}, timeout=None, user_transfer_job=False, logger=logging.log): """ @@ -1145,6 +1147,7 @@ def protocol(self, rse_id, scheme, operation): 'bytes': transfers[req_id]['file_metadata']['filesize'], 'adler32': transfers[req_id]['file_metadata']['adler32'], 'md5': transfers[req_id]['file_metadata']['md5'], + 'tombstone': tombstone_from_delay(ctx.rse_attrs(dest_rse_id).get('multihop_tombstone_delay', DEFAULT_MULTIHOP_TOMBSTONE_DELAY)), 'state': 'C'}] try: add_replicas(rse_id=hop['dest_rse_id'], diff --git a/lib/rucio/daemons/conveyor/finisher.py b/lib/rucio/daemons/conveyor/finisher.py index 963717385d6..0f4143cd49f 100644 --- a/lib/rucio/daemons/conveyor/finisher.py +++ b/lib/rucio/daemons/conveyor/finisher.py @@ -415,7 +415,7 @@ def __update_bulk_replicas(replicas, session=None, logger=logging.log): :returns commit_or_rollback: Boolean. """ try: - replica_core.update_replicas_states(replicas, nowait=True, add_tombstone=True, session=session) + replica_core.update_replicas_states(replicas, nowait=True, session=session) except ReplicaNotFound as error: logger(logging.WARNING, 'Failed to bulk update replicas, will do it one by one: %s', str(error)) raise ReplicaNotFound(error) @@ -439,7 +439,7 @@ def __update_replica(replica, session=None, logger=logging.log): """ try: - replica_core.update_replicas_states([replica], nowait=True, add_tombstone=True, session=session) + replica_core.update_replicas_states([replica], nowait=True, session=session) if not replica['archived']: request_core.archive_request(replica['request_id'], session=session) logger(logging.INFO, "HANDLED REQUEST %s DID %s:%s AT RSE %s STATE %s", replica['request_id'], replica['scope'], replica['name'], replica['rse_id'], str(replica['state'])) diff --git a/lib/rucio/tests/test_conveyor.py b/lib/rucio/tests/test_conveyor.py index 4bdd0c1f219..8446748b3aa 100644 --- a/lib/rucio/tests/test_conveyor.py +++ b/lib/rucio/tests/test_conveyor.py @@ -16,10 +16,12 @@ # Authors: # - Radu Carpa , 2021 import time +from datetime import datetime import pytest import rucio.daemons.reaper.reaper2 +from rucio.common.exception import ReplicaNotFound from rucio.core import replica as replica_core from rucio.core import request as request_core from rucio.core import rse as rse_core @@ -56,6 +58,7 @@ def __wait_for_replica_transfer(dst_rse_id, scope, name, max_wait_seconds=10): ('transfers', 'use_multihop', True) ]}], indirect=True) @pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [ + 'rucio.common.rse_attributes', # The rse attributes are cached and may override the needed multihop_tombstone_delay 'rucio.core.rse_expression_parser', # The list of multihop RSEs is retrieved by rse expression 'rucio.core.config', 'rucio.daemons.reaper.reaper2', @@ -78,6 +81,7 @@ def test_multihop_intermediate_replica_lifecycle(vo, did_factory, root_account, all_rses = [src_rse1_id, src_rse2_id, jump_rse_id, dst_rse_id] did = did_factory.upload_test_file(src_rse1_name) + rse_core.add_rse_attribute(jump_rse_id, 'multihop_tombstone_delay', -1) # Copy replica to a second source. To avoid the special case of having a unique last replica, which could be handled in a special (more careful) way rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=src_rse2_name, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) @@ -91,11 +95,12 @@ def test_multihop_intermediate_replica_lifecycle(vo, did_factory, root_account, rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse_name, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) # Submit transfers to FTS - # Ensure a replica was created on the intermediary host + # Ensure a replica was created on the intermediary host with epoch tombstone submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], partition_wait_time=None, transfertype='single', filter_transfertool=None) request = request_core.get_request_by_did(rse_id=jump_rse_id, **did) assert request['state'] == RequestState.SUBMITTED replica = replica_core.get_replica(rse_id=jump_rse_id, **did) + assert replica['tombstone'] == datetime(year=1970, month=1, day=1) assert replica['state'] == ReplicaState.COPYING # The intermediate replica is protected by its state (Copying) @@ -110,12 +115,10 @@ def test_multihop_intermediate_replica_lifecycle(vo, did_factory, root_account, # The intermediate replica is protected by an entry in the sources table # Reaper must not remove this replica, even if it has an obsolete tombstone - # - # TODO: Uncomment following lines - # rucio.daemons.reaper.reaper2.REGION.invalidate() - # reaper(once=True, rses=[], include_rses=jump_rse_name, exclude_rses=None) - # replica = replica_core.get_replica(rse_id=jump_rse_id, **did) - # assert replica + rucio.daemons.reaper.reaper2.REGION.invalidate() + reaper(once=True, rses=[], include_rses=jump_rse_name, exclude_rses=None) + replica = replica_core.get_replica(rse_id=jump_rse_id, **did) + assert replica # FTS fails the second transfer, so run submitter again to copy from jump rse to destination rse submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], partition_wait_time=None, transfertype='single', filter_transfertool=None) @@ -127,8 +130,8 @@ def test_multihop_intermediate_replica_lifecycle(vo, did_factory, root_account, rucio.daemons.reaper.reaper2.REGION.invalidate() reaper(once=True, rses=[], include_rses='test_container_xrd=True', exclude_rses=None) - # TODO: reaper must delete this replica. It is not a source anymore. - replica_core.get_replica(rse_id=jump_rse_id, **did) + with pytest.raises(ReplicaNotFound): + replica_core.get_replica(rse_id=jump_rse_id, **did) finally: @transactional_session @@ -137,3 +140,4 @@ def _cleanup_all_usage_and_limits(rse_id, session=None): session.query(models.RSEUsage).filter_by(rse_id=rse_id, source='storage').delete() _cleanup_all_usage_and_limits(rse_id=jump_rse_id) + rse_core.del_rse_attribute(jump_rse_id, 'multihop_tombstone_delay') diff --git a/lib/rucio/tests/test_conveyor_submitter.py b/lib/rucio/tests/test_conveyor_submitter.py index 9473ead6dff..ed9a7034420 100644 --- a/lib/rucio/tests/test_conveyor_submitter.py +++ b/lib/rucio/tests/test_conveyor_submitter.py @@ -25,6 +25,7 @@ from rucio.core import distance as distance_core from rucio.core import request as request_core from rucio.core import rse as rse_core +from rucio.core import replica as replica_core from rucio.core import rule as rule_core from rucio.daemons.conveyor.submitter import submitter from rucio.db.sqla.models import Request, Source @@ -114,6 +115,16 @@ def test_multihop_sources_created(rse_factory, did_factory, root_account, core_c for rse_id in jump_rses: rse_core.add_rse_attribute(rse_id, 'available_for_multihop', True) + rse_tombstone_delay = 3600 + rse_multihop_tombstone_delay = 12 * 3600 + + # if both attributes are set, the multihop one will take precedence + rse_core.add_rse_attribute(jump_rse1_id, 'tombstone_delay', rse_tombstone_delay) + rse_core.add_rse_attribute(jump_rse1_id, 'multihop_tombstone_delay', rse_multihop_tombstone_delay) + + # if multihop delay not set, it's the default multihop takes precedence. Not normal tombstone delay. + rse_core.add_rse_attribute(jump_rse2_id, 'tombstone_delay', rse_tombstone_delay) + distance_core.add_distance(src_rse_id, jump_rse1_id, ranking=10) distance_core.add_distance(jump_rse1_id, jump_rse2_id, ranking=10) distance_core.add_distance(jump_rse2_id, jump_rse3_id, ranking=10) @@ -139,3 +150,15 @@ def __ensure_source_exists(rse_id, scope, name, session=None): # Ensure that sources where created for transfers for rse_id in jump_rses + [src_rse_id]: __ensure_source_exists(rse_id, **did) + + # Ensure the tombstone is correctly set on intermediate replicas + expected_tombstone = datetime.utcnow() + timedelta(seconds=rse_multihop_tombstone_delay) + replica = replica_core.get_replica(jump_rse1_id, **did) + assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5) + + expected_tombstone = datetime.utcnow() + timedelta(hours=2) + replica = replica_core.get_replica(jump_rse2_id, **did) + assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5) + + replica = replica_core.get_replica(jump_rse3_id, **did) + assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5) diff --git a/lib/rucio/tests/test_replica.py b/lib/rucio/tests/test_replica.py index b4a766a261f..f7fe0ef6685 100644 --- a/lib/rucio/tests/test_replica.py +++ b/lib/rucio/tests/test_replica.py @@ -518,6 +518,33 @@ def test_set_tombstone(self, rse_factory, mock_scope, root_account): with pytest.raises(ReplicaNotFound): set_tombstone(rse_id, mock_scope, name) + def test_core_default_tombstone_correctly_set(self, rse_factory, did_factory, root_account): + """ REPLICA (CORE): Per-RSE default tombstone is correctly taken into consideration""" + + # One RSE has an attribute set, the other uses the default value of "None" for tombstone + rse1, rse1_id = rse_factory.make_mock_rse() + rse2, rse2_id = rse_factory.make_mock_rse() + tombstone_delay = 3600 + add_rse_attribute(rse_id=rse2_id, key='tombstone_delay', value=tombstone_delay) + + # Will use the default tombstone delay + did1 = did_factory.random_did() + add_replica(rse1_id, bytes=4, account=root_account, **did1) + assert get_replica(rse1_id, **did1)['tombstone'] is None + + # Will use the configured value on the RSE + did2 = did_factory.random_did() + add_replica(rse2_id, bytes=4, account=root_account, **did2) + tombstone = get_replica(rse2_id, **did2)['tombstone'] + expected_tombstone = datetime.utcnow() + timedelta(seconds=tombstone_delay) + assert expected_tombstone - timedelta(minutes=5) < tombstone < expected_tombstone + timedelta(minutes=5) + + # Adding rule removes the tombstone + RuleClient().add_replication_rule([{'name': did1['name'], 'scope': did1['scope'].external}], 1, rse1, locked=True) + assert get_replica(rse1_id, **did1)['tombstone'] is None + RuleClient().add_replication_rule([{'name': did2['name'], 'scope': did2['scope'].external}], 1, rse2, locked=True) + assert get_replica(rse2_id, **did2)['tombstone'] is None + def test_list_replicas_with_updated_after(self, rse_factory, mock_scope, root_account): """ REPLICA (CORE): Add and list file replicas with updated_after filter """ _, rse_id = rse_factory.make_mock_rse()