Skip to content

Commit

Permalink
Core & Internals: rework tombstone handling. rucio#4491, rucio#4436, r…
Browse files Browse the repository at this point in the history
…ucio#4188

Always set a default tombstone on any replica creation. Introduce two
rse attributes which allow to customize the behavior per rse:
- One allows to define the normal replica creation tombstone
- The other used to configure the multihop temporary replica tombstone

As a tombstone will now be added when the intermediate multihop replica
is created, remove the redundant protection which was touching the
tombstone in the finisher.
Add a submitter test which ensures that multihop sources are created;
and that the multihop request are created with a tombstone.
  • Loading branch information
rcarpa committed Apr 26, 2021
1 parent f1b8fc6 commit aad3c42
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 17 deletions.
36 changes: 21 additions & 15 deletions lib/rucio/core/replica.py
Expand Up @@ -81,6 +81,7 @@
from dogpile.cache.api import NO_VALUE

REGION = make_region().configure('dogpile.cache.memory', expiration_time=60)
DEFAULT_TOMBSTONE_DELAY = timedelta(days=1)


@read_session
Expand Down Expand Up @@ -1358,6 +1359,20 @@ def __bulk_add_file_dids(files, account, dataset_meta=None, session=None):
return new_files + available_files


def tombstone_from_delay(tombstone_delay):
tombstone = None

if isinstance(tombstone_delay, timedelta):
tombstone_delay = tombstone_delay.total_seconds()

if tombstone_delay:
try:
tombstone = datetime.utcnow() + timedelta(seconds=int(tombstone_delay))
except ValueError:
pass
return tombstone


@transactional_session
def __bulk_add_replicas(rse_id, files, account, session=None):
"""
Expand All @@ -1380,6 +1395,9 @@ def __bulk_add_replicas(rse_id, files, account, session=None):
filter(condition)
available_replicas = [dict([(column, getattr(row, column)) for column in row._fields]) for row in query]

default_tombstone_delay = next(iter(get_rse_attribute('tombstone_delay', rse_id=rse_id, session=session)), DEFAULT_TOMBSTONE_DELAY)
default_tombstone = tombstone_from_delay(default_tombstone_delay)

new_replicas = []
for file in files:
found = False
Expand All @@ -1396,7 +1414,7 @@ def __bulk_add_replicas(rse_id, files, account, session=None):
'state': ReplicaState(file.get('state', 'A')),
'md5': file.get('md5'), 'adler32': file.get('adler32'),
'lock_cnt': file.get('lock_cnt', 0),
'tombstone': file.get('tombstone')})
'tombstone': file.get('tombstone') or default_tombstone})
try:
new_replicas and session.bulk_insert_mappings(models.RSEFileAssociation,
new_replicas)
Expand Down Expand Up @@ -1969,25 +1987,20 @@ def list_and_mark_unlocked_replicas(limit, bytes=None, rse_id=None, delay_second


@transactional_session
def update_replicas_states(replicas, nowait=False, add_tombstone=False, session=None):
def update_replicas_states(replicas, nowait=False, session=None):
"""
Update File replica information and state.
:param replicas: The list of replicas.
:param nowait: Nowait parameter for the for_update queries.
:param add_tombstone: To set a tombstone in case there is no lock on the replica.
:param session: The database session in use.
"""

for replica in replicas:
query = session.query(models.RSEFileAssociation).filter_by(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'])
lock_cnt = 0
try:
if nowait:
rep = query.with_for_update(nowait=True).one()
else:
rep = query.one()
lock_cnt = rep.lock_cnt
query.with_for_update(nowait=True).one()
except NoResultFound:
# remember scope, name and rse
raise exception.ReplicaNotFound("No row found for scope: %s name: %s rse: %s" % (replica['scope'], replica['name'], get_rse_name(replica['rse_id'], session=session)))
Expand All @@ -2006,19 +2019,12 @@ def update_replicas_states(replicas, nowait=False, add_tombstone=False, session=
values['tombstone'] = OBSOLETE
elif replica['state'] == ReplicaState.AVAILABLE:
rucio.core.lock.successful_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'], nowait=nowait, session=session)
# If No locks we set a tombstone in the future
if add_tombstone and lock_cnt == 0:
set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session)

elif replica['state'] == ReplicaState.UNAVAILABLE:
rucio.core.lock.failed_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'],
error_message=replica.get('error_message', None),
broken_rule_id=replica.get('broken_rule_id', None),
broken_message=replica.get('broken_message', None),
nowait=nowait, session=session)
# If No locks we set a tombstone in the future
if add_tombstone and lock_cnt == 0:
set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session)
elif replica['state'] == ReplicaState.TEMPORARY_UNAVAILABLE:
query = query.filter(or_(models.RSEFileAssociation.state == ReplicaState.AVAILABLE, models.RSEFileAssociation.state == ReplicaState.TEMPORARY_UNAVAILABLE))

Expand Down
6 changes: 5 additions & 1 deletion lib/rucio/core/transfer.py
Expand Up @@ -65,7 +65,7 @@
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
from rucio.core.oidc import get_token_for_account_operation
from rucio.core.replica import add_replicas
from rucio.core.replica import add_replicas, tombstone_from_delay
from rucio.core.request import queue_requests, set_requests_state
from rucio.core.rse import get_rse_name, get_rse_vo, list_rses, get_rse_supported_checksums
from rucio.core.rse_expression_parser import parse_expression
Expand Down Expand Up @@ -108,6 +108,7 @@

WEBDAV_TRANSFER_MODE = config_get('conveyor', 'webdav_transfer_mode', False, None)

DEFAULT_MULTIHOP_TOMBSTONE_DELAY = datetime.timedelta(hours=2)

def submit_bulk_transfers(external_host, files, transfertool='fts3', job_params={}, timeout=None, user_transfer_job=False, logger=logging.log):
"""
Expand Down Expand Up @@ -779,6 +780,8 @@ def protocol(self, rse_id, scheme, operation):
except InvalidRSEExpression:
multihop_rses = []

default_multihop_tombstone_delay = core_config_get('transfers', 'multihop_tombstone_delay', default=DEFAULT_MULTIHOP_TOMBSTONE_DELAY, session=session)

for req_id, rule_id, scope, name, md5, adler32, bytes, activity, attributes, previous_attempt_id, dest_rse_id, account, source_rse_id, rse, deterministic, rse_type, path, retry_count, src_url, ranking, link_ranking in req_sources:

if ranking is None:
Expand Down Expand Up @@ -1134,6 +1137,7 @@ def protocol(self, rse_id, scheme, operation):
'bytes': transfers[req_id]['file_metadata']['filesize'],
'adler32': transfers[req_id]['file_metadata']['adler32'],
'md5': transfers[req_id]['file_metadata']['md5'],
'tombstone': tombstone_from_delay(ctx.rse_attrs(dest_rse_id).get('multihop_tombstone_delay', default_multihop_tombstone_delay)),
'state': 'C'}]
try:
add_replicas(rse_id=hop['dest_rse_id'],
Expand Down
25 changes: 25 additions & 0 deletions lib/rucio/tests/test_conveyor_submitter.py
Expand Up @@ -26,7 +26,9 @@
from rucio.core import distance as distance_core
from rucio.core import request as request_core
from rucio.core import rse as rse_core
from rucio.core import replica as replica_core
from rucio.core import rule as rule_core
from rucio.core import transfer as transfer_core
from rucio.daemons.conveyor.submitter import submitter
from rucio.db.sqla.models import Request, Source
from rucio.db.sqla.constants import RequestState
Expand Down Expand Up @@ -114,6 +116,16 @@ def test_multihop_sources_created(rse_factory, did_factory, root_account, core_c
for rse_id in jump_rses:
rse_core.add_rse_attribute(rse_id, 'available_for_multihop', True)

rse_tombstone_delay = 3600
rse_multihop_tombstone_delay = 12 * 3600

# if both attributes are set, the multihop one will take precedence
rse_core.add_rse_attribute(jump_rse1_id, 'tombstone_delay', rse_tombstone_delay)
rse_core.add_rse_attribute(jump_rse1_id, 'multihop_tombstone_delay', rse_multihop_tombstone_delay)

# if multihop delay not set, it's the default multihop takes precedence. Not normal tombstone delay.
rse_core.add_rse_attribute(jump_rse2_id, 'tombstone_delay', rse_tombstone_delay)

distance_core.add_distance(src_rse_id, jump_rse1_id, ranking=10)
distance_core.add_distance(jump_rse1_id, jump_rse2_id, ranking=10)
distance_core.add_distance(jump_rse2_id, jump_rse3_id, ranking=10)
Expand All @@ -139,3 +151,16 @@ def __ensure_source_exists(rse_id, scope, name, session=None):
# Ensure that sources where created for transfers
for rse_id in jump_rses + [src_rse_id]:
__ensure_source_exists(rse_id, **did)

# Ensure the tombstone is correctly set on intermediate replicas
replica = replica_core.get_replica(jump_rse1_id, **did)
expected_tombstone = datetime.utcnow() + timedelta(seconds=rse_multihop_tombstone_delay)
assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5)

replica = replica_core.get_replica(jump_rse2_id, **did)
expected_tombstone = datetime.utcnow() + transfer_core.DEFAULT_MULTIHOP_TOMBSTONE_DELAY
assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5)

replica = replica_core.get_replica(jump_rse3_id, **did)
expected_tombstone = datetime.utcnow() + transfer_core.DEFAULT_MULTIHOP_TOMBSTONE_DELAY
assert expected_tombstone - timedelta(minutes=5) < replica['tombstone'] < expected_tombstone + timedelta(minutes=5)
31 changes: 30 additions & 1 deletion lib/rucio/tests/test_replica.py
Expand Up @@ -52,7 +52,7 @@
from rucio.core.replica import (add_replica, add_replicas, delete_replicas, get_replicas_state,
get_replica, list_replicas, declare_bad_file_replicas, list_bad_replicas,
update_replica_state, get_RSEcoverage_of_dataset, get_replica_atime,
touch_replica, get_bad_pfns, set_tombstone)
touch_replica, get_bad_pfns, set_tombstone, DEFAULT_TOMBSTONE_DELAY)
from rucio.core.rse import add_protocol, add_rse_attribute, del_rse_attribute
from rucio.daemons.badreplicas.minos import run as minos_run
from rucio.daemons.badreplicas.minos_temporary_expiration import run as minos_temp_run
Expand Down Expand Up @@ -522,6 +522,35 @@ def test_set_tombstone(self, rse_factory, mock_scope, root_account):
with pytest.raises(ReplicaNotFound):
set_tombstone(rse_id, mock_scope, name)

def test_core_default_tombstone_correctly_set(rse_factory, did_factory, root_account):
""" REPLICA (CORE): Per-RSE default tombstone is correctly taken into consideration"""

# One RSE has an attribute set, the other uses the default value for tombstone
rse1, rse1_id = rse_factory.make_mock_rse()
rse2, rse2_id = rse_factory.make_mock_rse()
tombstone_delay = 3600
add_rse_attribute(rse_id=rse2_id, key='tombstone_delay', value=tombstone_delay)

# Will use the default tombstone delay
did1 = did_factory.random_did()
add_replica(rse1_id, bytes=4, account=root_account, **did1)
tombstone = get_replica(rse1_id, **did1)['tombstone']
expected_tombstone = datetime.utcnow() + DEFAULT_TOMBSTONE_DELAY
assert expected_tombstone - timedelta(minutes=5) < tombstone < expected_tombstone + timedelta(minutes=5)

# Will use the configured value on the RSE
did2 = did_factory.random_did()
add_replica(rse2_id, bytes=4, account=root_account, **did2)
tombstone = get_replica(rse2_id, **did2)['tombstone']
expected_tombstone = datetime.utcnow() + timedelta(seconds=tombstone_delay)
assert expected_tombstone - timedelta(minutes=5) < tombstone < expected_tombstone + timedelta(minutes=5)

# Adding rule removes the tombstone
RuleClient().add_replication_rule([{'name': did1['name'], 'scope': did1['scope'].external}], 1, rse1, locked=True)
assert get_replica(rse1_id, **did1)['tombstone'] is None
RuleClient().add_replication_rule([{'name': did2['name'], 'scope': did2['scope'].external}], 1, rse2, locked=True)
assert get_replica(rse2_id, **did2)['tombstone'] is None

def test_list_replicas_with_updated_after(self, rse_factory, mock_scope, root_account):
""" REPLICA (CORE): Add and list file replicas with updated_after filter """
_, rse_id = rse_factory.make_mock_rse()
Expand Down

0 comments on commit aad3c42

Please sign in to comment.