Skip to content

Commit

Permalink
Core & Internals: rework tombstone handling. rucio#4491, rucio#4436, r…
Browse files Browse the repository at this point in the history
…ucio#4188

Always set a default tombstone on any replica creation. Introduce two
rse attributes which allow to customize the behavior per rse:
- One allows to define the normal replica creation tombstone
- The other used to configure the multihop temporary replica tombstone

As a tombstone will now be added when the intermediate multihop replica
is created, remove the redundant protection which was touching the
tombstone in the finisher.
Add a submitter test which ensures that multihop sources are created;
and that the multihop request are created with a tombstone.
  • Loading branch information
rcarpa committed Apr 27, 2021
1 parent f466614 commit 62c692a
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 20 deletions.
36 changes: 21 additions & 15 deletions lib/rucio/core/replica.py
Expand Up @@ -81,6 +81,7 @@
from dogpile.cache.api import NO_VALUE

REGION = make_region().configure('dogpile.cache.memory', expiration_time=60)
DEFAULT_TOMBSTONE_DELAY = timedelta(weeks=2)


@read_session
Expand Down Expand Up @@ -1358,6 +1359,20 @@ def __bulk_add_file_dids(files, account, dataset_meta=None, session=None):
return new_files + available_files


def tombstone_from_delay(tombstone_delay):
tombstone = None

if isinstance(tombstone_delay, timedelta):
tombstone_delay = tombstone_delay.total_seconds()

if tombstone_delay:
try:
tombstone = datetime.utcnow() + timedelta(seconds=int(tombstone_delay))
except ValueError:
pass
return tombstone


@transactional_session
def __bulk_add_replicas(rse_id, files, account, session=None):
"""
Expand All @@ -1380,6 +1395,9 @@ def __bulk_add_replicas(rse_id, files, account, session=None):
filter(condition)
available_replicas = [dict([(column, getattr(row, column)) for column in row._fields]) for row in query]

default_tombstone_delay = next(iter(get_rse_attribute('tombstone_delay', rse_id=rse_id, session=session)), DEFAULT_TOMBSTONE_DELAY)
default_tombstone = tombstone_from_delay(default_tombstone_delay)

new_replicas = []
for file in files:
found = False
Expand All @@ -1396,7 +1414,7 @@ def __bulk_add_replicas(rse_id, files, account, session=None):
'state': ReplicaState(file.get('state', 'A')),
'md5': file.get('md5'), 'adler32': file.get('adler32'),
'lock_cnt': file.get('lock_cnt', 0),
'tombstone': file.get('tombstone')})
'tombstone': file.get('tombstone') or default_tombstone})
try:
new_replicas and session.bulk_insert_mappings(models.RSEFileAssociation,
new_replicas)
Expand Down Expand Up @@ -1969,25 +1987,20 @@ def list_and_mark_unlocked_replicas(limit, bytes=None, rse_id=None, delay_second


@transactional_session
def update_replicas_states(replicas, nowait=False, add_tombstone=False, session=None):
def update_replicas_states(replicas, nowait=False, session=None):
"""
Update File replica information and state.
:param replicas: The list of replicas.
:param nowait: Nowait parameter for the for_update queries.
:param add_tombstone: To set a tombstone in case there is no lock on the replica.
:param session: The database session in use.
"""

for replica in replicas:
query = session.query(models.RSEFileAssociation).filter_by(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'])
lock_cnt = 0
try:
if nowait:
rep = query.with_for_update(nowait=True).one()
else:
rep = query.one()
lock_cnt = rep.lock_cnt
query.with_for_update(nowait=True).one()
except NoResultFound:
# remember scope, name and rse
raise exception.ReplicaNotFound("No row found for scope: %s name: %s rse: %s" % (replica['scope'], replica['name'], get_rse_name(replica['rse_id'], session=session)))
Expand All @@ -2006,19 +2019,12 @@ def update_replicas_states(replicas, nowait=False, add_tombstone=False, session=
values['tombstone'] = OBSOLETE
elif replica['state'] == ReplicaState.AVAILABLE:
rucio.core.lock.successful_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'], nowait=nowait, session=session)
# If No locks we set a tombstone in the future
if add_tombstone and lock_cnt == 0:
set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session)

elif replica['state'] == ReplicaState.UNAVAILABLE:
rucio.core.lock.failed_transfer(scope=replica['scope'], name=replica['name'], rse_id=replica['rse_id'],
error_message=replica.get('error_message', None),
broken_rule_id=replica.get('broken_rule_id', None),
broken_message=replica.get('broken_message', None),
nowait=nowait, session=session)
# If No locks we set a tombstone in the future
if add_tombstone and lock_cnt == 0:
set_tombstone(rse_id=replica['rse_id'], scope=replica['scope'], name=replica['name'], tombstone=datetime.utcnow() + timedelta(hours=2), session=session)
elif replica['state'] == ReplicaState.TEMPORARY_UNAVAILABLE:
query = query.filter(or_(models.RSEFileAssociation.state == ReplicaState.AVAILABLE, models.RSEFileAssociation.state == ReplicaState.TEMPORARY_UNAVAILABLE))

Expand Down
5 changes: 3 additions & 2 deletions lib/rucio/core/transfer.py
Expand Up @@ -65,12 +65,12 @@
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
from rucio.core.oidc import get_token_for_account_operation
from rucio.core.replica import add_replicas
from rucio.core.replica import add_replicas, tombstone_from_delay
from rucio.core.request import queue_requests, set_requests_state
from rucio.core.rse import get_rse_name, get_rse_vo, list_rses, get_rse_supported_checksums
from rucio.core.rse_expression_parser import parse_expression
from rucio.db.sqla import models, filter_thread_work
from rucio.db.sqla.constants import DIDType, RequestState, RSEType, RequestType, ReplicaState
from rucio.db.sqla.constants import DIDType, RequestState, RSEType, RequestType, ReplicaState, OBSOLETE
from rucio.db.sqla.session import read_session, transactional_session
from rucio.rse import rsemanager as rsemgr
from rucio.transfertool.fts3 import FTS3Transfertool
Expand Down Expand Up @@ -1134,6 +1134,7 @@ def protocol(self, rse_id, scheme, operation):
'bytes': transfers[req_id]['file_metadata']['filesize'],
'adler32': transfers[req_id]['file_metadata']['adler32'],
'md5': transfers[req_id]['file_metadata']['md5'],
'tombstone': tombstone_from_delay(ctx.rse_attrs(dest_rse_id).get('multihop_tombstone_delay')) or OBSOLETE,
'state': 'C'}]
try:
add_replicas(rse_id=hop['dest_rse_id'],
Expand Down
4 changes: 2 additions & 2 deletions lib/rucio/daemons/conveyor/finisher.py
Expand Up @@ -414,7 +414,7 @@ def __update_bulk_replicas(replicas, session=None, logger=logging.log):
:returns commit_or_rollback: Boolean.
"""
try:
replica_core.update_replicas_states(replicas, nowait=True, add_tombstone=True, session=session)
replica_core.update_replicas_states(replicas, nowait=True, session=session)
except ReplicaNotFound as error:
logger(logging.WARNING, 'Failed to bulk update replicas, will do it one by one: %s', str(error))
raise ReplicaNotFound(error)
Expand All @@ -438,7 +438,7 @@ def __update_replica(replica, session=None, logger=logging.log):
"""

try:
replica_core.update_replicas_states([replica], nowait=True, add_tombstone=True, session=session)
replica_core.update_replicas_states([replica], nowait=True, session=session)
if not replica['archived']:
request_core.archive_request(replica['request_id'], session=session)
logger(logging.INFO, "HANDLED REQUEST %s DID %s:%s AT RSE %s STATE %s", replica['request_id'], replica['scope'], replica['name'], replica['rse_id'], str(replica['state']))
Expand Down
31 changes: 30 additions & 1 deletion lib/rucio/tests/test_replica.py
Expand Up @@ -53,7 +53,7 @@
from rucio.core.replica import (add_replica, add_replicas, delete_replicas, get_replicas_state,
get_replica, list_replicas, declare_bad_file_replicas, list_bad_replicas,
update_replica_state, get_RSEcoverage_of_dataset, get_replica_atime,
touch_replica, get_bad_pfns, set_tombstone)
touch_replica, get_bad_pfns, set_tombstone, DEFAULT_TOMBSTONE_DELAY)
from rucio.core.rse import add_protocol, add_rse_attribute, del_rse_attribute
from rucio.daemons.badreplicas.minos import run as minos_run
from rucio.daemons.badreplicas.minos_temporary_expiration import run as minos_temp_run
Expand Down Expand Up @@ -523,6 +523,35 @@ def test_set_tombstone(self, rse_factory, mock_scope, root_account):
with pytest.raises(ReplicaNotFound):
set_tombstone(rse_id, mock_scope, name)

def test_core_default_tombstone_correctly_set(self, rse_factory, did_factory, root_account):
""" REPLICA (CORE): Per-RSE default tombstone is correctly taken into consideration"""

# One RSE has an attribute set, the other uses the default value for tombstone
rse1, rse1_id = rse_factory.make_mock_rse()
rse2, rse2_id = rse_factory.make_mock_rse()
tombstone_delay = 3600
add_rse_attribute(rse_id=rse2_id, key='tombstone_delay', value=tombstone_delay)

# Will use the default tombstone delay
did1 = did_factory.random_did()
add_replica(rse1_id, bytes=4, account=root_account, **did1)
tombstone = get_replica(rse1_id, **did1)['tombstone']
expected_tombstone = datetime.utcnow() + DEFAULT_TOMBSTONE_DELAY
assert expected_tombstone - timedelta(minutes=5) < tombstone < expected_tombstone + timedelta(minutes=5)

# Will use the configured value on the RSE
did2 = did_factory.random_did()
add_replica(rse2_id, bytes=4, account=root_account, **did2)
tombstone = get_replica(rse2_id, **did2)['tombstone']
expected_tombstone = datetime.utcnow() + timedelta(seconds=tombstone_delay)
assert expected_tombstone - timedelta(minutes=5) < tombstone < expected_tombstone + timedelta(minutes=5)

# Adding rule removes the tombstone
RuleClient().add_replication_rule([{'name': did1['name'], 'scope': did1['scope'].external}], 1, rse1, locked=True)
assert get_replica(rse1_id, **did1)['tombstone'] is None
RuleClient().add_replication_rule([{'name': did2['name'], 'scope': did2['scope'].external}], 1, rse2, locked=True)
assert get_replica(rse2_id, **did2)['tombstone'] is None

def test_list_replicas_with_updated_after(self, rse_factory, mock_scope, root_account):
""" REPLICA (CORE): Add and list file replicas with updated_after filter """
_, rse_id = rse_factory.make_mock_rse()
Expand Down

0 comments on commit 62c692a

Please sign in to comment.