Skip to content

Commit

Permalink
Core & Internals: rework tombstone handling. Closes rucio#4491, rucio…
Browse files Browse the repository at this point in the history
…#4436 and rucio#4188

Allow setting a default tombstone on any replica creation. Introduce two
rse attributes which allow to customize the behavior per rse:
- One allows to define the normal replica creation tombstone
(defaults to 0: "no tombstone")
- The other used to configure the multihop temporary replica tombstone
(defaults to 2 hours in the future)
Use a negative value to set the epoch tombstone. Or 0 to explicitly
set a null tombstone.

Protect replicas which are used as sources from deletion even if their
tombstone is set to epoch. Add an integration test which verifies this
behavior.
Add a submitter test ensuring that multihop sources are created;
and that the temp multihop replicas are created with a tombstone.

Remove the redundant behavior which touches the tombstone in the
finisher. The 2 remaining protections for intermediate multihop replicas
are now: 1) entries in the source table; 2) the default multihop
tombstone delay of 2 hours into the future.
  • Loading branch information
rcarpa committed May 19, 2021
1 parent 98c87d1 commit 3dcb9ab
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 29 deletions.
41 changes: 26 additions & 15 deletions lib/rucio/core/replica.py
Expand Up @@ -1366,6 +1366,22 @@ def __bulk_add_file_dids(files, account, dataset_meta=None, session=None):
return new_files + available_files


def tombstone_from_delay(tombstone_delay):
if not tombstone_delay:
return None

if not isinstance(tombstone_delay, timedelta):
try:
tombstone_delay = timedelta(seconds=int(tombstone_delay))
except ValueError:
return None

if tombstone_delay < timedelta(0):
return datetime(1970, 1, 1)

return datetime.utcnow() + tombstone_delay


@transactional_session
def __bulk_add_replicas(rse_id, files, account, session=None):
"""
Expand All @@ -1388,6 +1404,9 @@ def __bulk_add_replicas(rse_id, files, account, session=None):
filter(condition)
available_replicas = [dict([(column, getattr(row, column)) for column in row._fields]) for row in query]

default_tombstone_delay = next(iter(get_rse_attribute('tombstone_delay', rse_id=rse_id, session=session)), None)
default_tombstone = tombstone_from_delay(default_tombstone_delay)

new_replicas = []
for file in files:
found = False
Expand All @@ -1404,7 +1423,7 @@ def __bulk_add_replicas(rse_id, files, account, session=None):
'state': ReplicaState(file.get('state', 'A')),
'md5': file.get('md5'), 'adler32': file.get('adler32'),
'lock_cnt': file.get('lock_cnt', 0),
'tombstone': file.get('tombstone')})
'tombstone': file.get('tombstone') or default_tombstone})
try:
new_replicas and session.bulk_insert_mappings(models.RSEFileAssociation,
new_replicas)
Expand Down Expand Up @@ -1956,6 +1975,10 @@ def list_and_mark_unlocked_replicas(limit, bytes=None, rse_id=None, delay_second
filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\
filter(or_(models.RSEFileAssociation.state.in_((ReplicaState.AVAILABLE, ReplicaState.UNAVAILABLE, ReplicaState.BAD)),
and_(models.RSEFileAssociation.state == ReplicaState.BEING_DELETED, models.RSEFileAssociation.updated_at < datetime.utcnow() - timedelta(seconds=delay_seconds)))).\
filter(~exists(select([1]).prefix_with("/*+ INDEX(SOURCES SOURCES_SC_NM_DST_IDX) */", dialect='oracle')
.where(and_(models.RSEFileAssociation.scope == models.Source.scope,
models.RSEFileAssociation.name == models.Source.name,
models.RSEFileAssociation.rse_id == models.Source.rse_id)))).\
with_for_update(skip_locked=True).\
order_by(models.RSEFileAssociation.tombstone)

Expand Down Expand Up @@ -2023,25 +2046,20 @@ def list_and_mark_unlocked_replicas(limit, bytes=None, rse_id=None, delay_second


@transactional_session
def update_replicas_states(replicas, nowait=False, add_tombstone=False, session=None):
def update_replicas_states(replicas, nowait=False, session=None):
"""
Update File replica information and state.
:param replicas: The list of replicas.
:param nowait: Nowait parameter for the for_update queries.
:param add_tombstone: To set a tombstone in case there is no lock on the replica.
:param session: The database session in use.
"""

for replica in replicas:
query = session.query(models.RSEFileAssociation).filter_by(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'])
lock_cnt = 0
try:
if nowait:
rep = query.with_for_update(nowait=True).one()
else:
rep = query.one()
lock_cnt = rep.lock_cnt
query.with_for_update(nowait=True).one()
except NoResultFound:
# remember scope, name and rse
raise exception.ReplicaNotFound("No row found for scope: %s name: %s rse: %s" % (replica['scope'], replica['name'], get_rse_name(replica['rse_id'], session=session)))
Expand All @@ -2060,19 +2078,12 @@ def update_replicas_states(replicas, nowait=False, add_tombstone=False, session=
values['tombstone'] = OBSOLETE
elif replica['state'] == ReplicaState.AVAILABLE:
rucio.core.lock.successful_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'], nowait=nowait, session=session)
# If No locks we set a tombstone in the future
if add_tombstone and lock_cnt == 0:
set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session)

elif replica['state'] == ReplicaState.UNAVAILABLE:
rucio.core.lock.failed_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'],
error_message=replica.get('error_message', None),
broken_rule_id=replica.get('broken_rule_id', None),
broken_message=replica.get('broken_message', None),
nowait=nowait, session=session)
# If No locks we set a tombstone in the future
if add_tombstone and lock_cnt == 0:
set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session)
elif replica['state'] == ReplicaState.TEMPORARY_UNAVAILABLE:
query = query.filter(or_(models.RSEFileAssociation.state == ReplicaState.AVAILABLE, models.RSEFileAssociation.state == ReplicaState.TEMPORARY_UNAVAILABLE))

Expand Down
5 changes: 4 additions & 1 deletion lib/rucio/core/transfer.py
Expand Up @@ -65,7 +65,7 @@
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
from rucio.core.oidc import get_token_for_account_operation
from rucio.core.replica import add_replicas
from rucio.core.replica import add_replicas, tombstone_from_delay
from rucio.core.request import queue_requests, set_requests_state
from rucio.core.rse import get_rse_name, get_rse_vo, list_rses, get_rse_supported_checksums
from rucio.core.rse_expression_parser import parse_expression
Expand Down Expand Up @@ -118,6 +118,8 @@

WEBDAV_TRANSFER_MODE = config_get('conveyor', 'webdav_transfer_mode', False, None)

DEFAULT_MULTIHOP_TOMBSTONE_DELAY = datetime.timedelta(hours=2)


def submit_bulk_transfers(external_host, files, transfertool='fts3', job_params={}, timeout=None, user_transfer_job=False, logger=logging.log):
"""
Expand Down Expand Up @@ -1145,6 +1147,7 @@ def protocol(self, rse_id, scheme, operation):
'bytes': transfers[req_id]['file_metadata']['filesize'],
'adler32': transfers[req_id]['file_metadata']['adler32'],
'md5': transfers[req_id]['file_metadata']['md5'],
'tombstone': tombstone_from_delay(ctx.rse_attrs(dest_rse_id).get('multihop_tombstone_delay', DEFAULT_MULTIHOP_TOMBSTONE_DELAY)),
'state': 'C'}]
try:
add_replicas(rse_id=hop['dest_rse_id'],
Expand Down
4 changes: 2 additions & 2 deletions lib/rucio/daemons/conveyor/finisher.py
Expand Up @@ -415,7 +415,7 @@ def __update_bulk_replicas(replicas, session=None, logger=logging.log):
:returns commit_or_rollback: Boolean.
"""
try:
replica_core.update_replicas_states(replicas, nowait=True, add_tombstone=True, session=session)
replica_core.update_replicas_states(replicas, nowait=True, session=session)
except ReplicaNotFound as error:
logger(logging.WARNING, 'Failed to bulk update replicas, will do it one by one: %s', str(error))
raise ReplicaNotFound(error)
Expand All @@ -439,7 +439,7 @@ def __update_replica(replica, session=None, logger=logging.log):
"""

try:
replica_core.update_replicas_states([replica], nowait=True, add_tombstone=True, session=session)
replica_core.update_replicas_states([replica], nowait=True, session=session)
if not replica['archived']:
request_core.archive_request(replica['request_id'], session=session)
logger(logging.INFO, "HANDLED REQUEST %s DID %s:%s AT RSE %s STATE %s", replica['request_id'], replica['scope'], replica['name'], replica['rse_id'], str(replica['state']))
Expand Down
21 changes: 12 additions & 9 deletions lib/rucio/tests/test_conveyor.py
Expand Up @@ -16,10 +16,12 @@
# Authors:
# - Radu Carpa <radu.carpa@cern.ch>, 2021
import time
from datetime import datetime

import pytest

import rucio.daemons.reaper.reaper2
from rucio.common.exception import ReplicaNotFound
from rucio.core import replica as replica_core
from rucio.core import request as request_core
from rucio.core import rse as rse_core
Expand Down Expand Up @@ -85,17 +87,19 @@ def test_multihop_intermediate_replica_lifecycle(vo, did_factory, root_account,
replica = __wait_for_replica_transfer(dst_rse_id=src_rse2_id, **did)
assert replica['state'] == ReplicaState.AVAILABLE

rse_core.add_rse_attribute(jump_rse_id, 'multihop_tombstone_delay', -1)
rse_core.set_rse_limits(rse_id=jump_rse_id, name='MinFreeSpace', value=1)
rse_core.set_rse_usage(rse_id=jump_rse_id, source='storage', used=1, free=0)
try:
rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse_name, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None)

# Submit transfers to FTS
# Ensure a replica was created on the intermediary host
# Ensure a replica was created on the intermediary host with default tombstone delay a couple hours into the future
submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], partition_wait_time=None, transfertype='single', filter_transfertool=None)
request = request_core.get_request_by_did(rse_id=jump_rse_id, **did)
assert request['state'] == RequestState.SUBMITTED
replica = replica_core.get_replica(rse_id=jump_rse_id, **did)
assert replica['tombstone'] == datetime(year=1970, month=1, day=1)
assert replica['state'] == ReplicaState.COPYING

# The intermediate replica is protected by its state (Copying)
Expand All @@ -110,12 +114,10 @@ def test_multihop_intermediate_replica_lifecycle(vo, did_factory, root_account,

# The intermediate replica is protected by an entry in the sources table
# Reaper must not remove this replica, even if it has an obsolete tombstone
#
# TODO: Uncomment following lines
# rucio.daemons.reaper.reaper2.REGION.invalidate()
# reaper(once=True, rses=[], include_rses=jump_rse_name, exclude_rses=None)
# replica = replica_core.get_replica(rse_id=jump_rse_id, **did)
# assert replica
rucio.daemons.reaper.reaper2.REGION.invalidate()
reaper(once=True, rses=[], include_rses=jump_rse_name, exclude_rses=None)
replica = replica_core.get_replica(rse_id=jump_rse_id, **did)
assert replica

# FTS fails the second transfer, so run submitter again to copy from jump rse to destination rse
submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], partition_wait_time=None, transfertype='single', filter_transfertool=None)
Expand All @@ -127,8 +129,8 @@ def test_multihop_intermediate_replica_lifecycle(vo, did_factory, root_account,
rucio.daemons.reaper.reaper2.REGION.invalidate()
reaper(once=True, rses=[], include_rses='test_container_xrd=True', exclude_rses=None)

# TODO: reaper must delete this replica. It is not a source anymore.
replica_core.get_replica(rse_id=jump_rse_id, **did)
with pytest.raises(ReplicaNotFound):
replica_core.get_replica(rse_id=jump_rse_id, **did)
finally:

@transactional_session
Expand All @@ -137,3 +139,4 @@ def _cleanup_all_usage_and_limits(rse_id, session=None):
session.query(models.RSEUsage).filter_by(rse_id=rse_id, source='storage').delete()

_cleanup_all_usage_and_limits(rse_id=jump_rse_id)
rse_core.del_rse_attribute(jump_rse_id, 'multihop_tombstone_delay')
23 changes: 23 additions & 0 deletions lib/rucio/tests/test_conveyor_submitter.py
Expand Up @@ -25,6 +25,7 @@
from rucio.core import distance as distance_core
from rucio.core import request as request_core
from rucio.core import rse as rse_core
from rucio.core import replica as replica_core
from rucio.core import rule as rule_core
from rucio.daemons.conveyor.submitter import submitter
from rucio.db.sqla.models import Request, Source
Expand Down Expand Up @@ -114,6 +115,16 @@ def test_multihop_sources_created(rse_factory, did_factory, root_account, core_c
for rse_id in jump_rses:
rse_core.add_rse_attribute(rse_id, 'available_for_multihop', True)

rse_tombstone_delay = 3600
rse_multihop_tombstone_delay = 12 * 3600

# if both attributes are set, the multihop one will take precedence
rse_core.add_rse_attribute(jump_rse1_id, 'tombstone_delay', rse_tombstone_delay)
rse_core.add_rse_attribute(jump_rse1_id, 'multihop_tombstone_delay', rse_multihop_tombstone_delay)

# if multihop delay not set, it's the default multihop takes precedence. Not normal tombstone delay.
rse_core.add_rse_attribute(jump_rse2_id, 'tombstone_delay', rse_tombstone_delay)

distance_core.add_distance(src_rse_id, jump_rse1_id, ranking=10)
distance_core.add_distance(jump_rse1_id, jump_rse2_id, ranking=10)
distance_core.add_distance(jump_rse2_id, jump_rse3_id, ranking=10)
Expand All @@ -139,3 +150,15 @@ def __ensure_source_exists(rse_id, scope, name, session=None):
# Ensure that sources where created for transfers
for rse_id in jump_rses + [src_rse_id]:
__ensure_source_exists(rse_id, **did)

# Ensure the tombstone is correctly set on intermediate replicas
expected_tombstone = datetime.utcnow() + timedelta(seconds=rse_multihop_tombstone_delay)
replica = replica_core.get_replica(jump_rse1_id, **did)
assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5)

expected_tombstone = datetime.utcnow() + timedelta(hours=2)
replica = replica_core.get_replica(jump_rse2_id, **did)
assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5)

replica = replica_core.get_replica(jump_rse3_id, **did)
assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5)
31 changes: 29 additions & 2 deletions lib/rucio/tests/test_replica.py
Expand Up @@ -55,7 +55,7 @@
from rucio.core.replica import (add_replica, add_replicas, delete_replicas, get_replicas_state,
get_replica, list_replicas, declare_bad_file_replicas, list_bad_replicas,
update_replica_state, get_RSEcoverage_of_dataset, get_replica_atime,
touch_replica, get_bad_pfns, set_tombstone)
touch_replica, get_bad_pfns, set_tombstone, DEFAULT_TOMBSTONE_DELAY)
from rucio.core.rse import add_protocol, add_rse_attribute, del_rse_attribute
from rucio.daemons.badreplicas.minos import run as minos_run
from rucio.daemons.badreplicas.minos_temporary_expiration import run as minos_temp_run
Expand Down Expand Up @@ -518,6 +518,33 @@ def test_set_tombstone(self, rse_factory, mock_scope, root_account):
with pytest.raises(ReplicaNotFound):
set_tombstone(rse_id, mock_scope, name)

def test_core_default_tombstone_correctly_set(self, rse_factory, did_factory, root_account):
""" REPLICA (CORE): Per-RSE default tombstone is correctly taken into consideration"""

# One RSE has an attribute set, the other uses the default value of "None" for tombstone
rse1, rse1_id = rse_factory.make_mock_rse()
rse2, rse2_id = rse_factory.make_mock_rse()
tombstone_delay = 3600
add_rse_attribute(rse_id=rse2_id, key='tombstone_delay', value=tombstone_delay)

# Will use the default tombstone delay
did1 = did_factory.random_did()
add_replica(rse1_id, bytes=4, account=root_account, **did1)
assert get_replica(rse1_id, **did1)['tombstone'] is None

# Will use the configured value on the RSE
did2 = did_factory.random_did()
add_replica(rse2_id, bytes=4, account=root_account, **did2)
tombstone = get_replica(rse2_id, **did2)['tombstone']
expected_tombstone = datetime.utcnow() + timedelta(seconds=tombstone_delay)
assert expected_tombstone - timedelta(minutes=5) < tombstone < expected_tombstone + timedelta(minutes=5)

# Adding rule removes the tombstone
RuleClient().add_replication_rule([{'name': did1['name'], 'scope': did1['scope'].external}], 1, rse1, locked=True)
assert get_replica(rse1_id, **did1)['tombstone'] is None
RuleClient().add_replication_rule([{'name': did2['name'], 'scope': did2['scope'].external}], 1, rse2, locked=True)
assert get_replica(rse2_id, **did2)['tombstone'] is None

def test_list_replicas_with_updated_after(self, rse_factory, mock_scope, root_account):
""" REPLICA (CORE): Add and list file replicas with updated_after filter """
_, rse_id = rse_factory.make_mock_rse()
Expand Down Expand Up @@ -956,7 +983,7 @@ def test_client_set_tombstone(rse_factory, mock_scope, root_account, replica_cli
rse, rse_id = rse_factory.make_mock_rse()
name = generate_uuid()
add_replica(rse_id, mock_scope, name, 4, root_account)
assert get_replica(rse_id, mock_scope, name)['tombstone'] is None
assert get_replica(rse_id, mock_scope, name)['tombstone'] > datetime.utcnow()
replica_client.set_tombstone([{'rse': rse, 'scope': mock_scope.external, 'name': name}])
assert get_replica(rse_id, mock_scope, name)['tombstone'] == OBSOLETE

Expand Down

0 comments on commit 3dcb9ab

Please sign in to comment.