Skip to content

Commit

Permalink
Transfers: rework update_replica in finisher. Closes #5497
Browse files Browse the repository at this point in the history
Reduce the scope of try/except blocks and avoid code duplication.

Rework the recovery from add_replica: if RSEProtocolNotSupported is
raised, it's probably that the RSE configuration changed since the
submission. On deterministic RSEs we can safely recover from the issue
by creating a replica with pfn=None: our goal is to add a temporary
replica to trigger deletion by reaper and reduce probability of dark
data, so using a specific protocol for that is not a priority

Add some comments.
  • Loading branch information
rcarpa committed Jul 11, 2022
1 parent 1fca0f8 commit fc567a0
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions lib/rucio/daemons/conveyor/finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import rucio.db.sqla.util
from rucio.common.cache import make_region_memcached
from rucio.common.exception import DatabaseException, ConfigNotFound, UnsupportedOperation, ReplicaNotFound, RequestNotFound
from rucio.common.exception import DatabaseException, ConfigNotFound, UnsupportedOperation, ReplicaNotFound, RequestNotFound, RSEProtocolNotSupported
from rucio.common.logging import setup_logging
from rucio.common.types import InternalAccount
from rucio.common.utils import chunks
Expand Down Expand Up @@ -402,32 +402,48 @@ def __update_replica(replica, session=None, logger=logging.log):
:param session: The database session to use.
:returns commit_or_rollback: Boolean.
"""

try:
replica_core.update_replicas_states([replica], nowait=True, session=session)
replica_found = True
try:
replica_core.update_replicas_states([replica], nowait=True, session=session)
except ReplicaNotFound:
replica_found = False

if not replica_found and replica['state'] == ReplicaState.AVAILABLE and replica['request_type'] != RequestType.STAGEIN:
# FTS tells us that the Replica was successfully transferred, but there is no such replica in Rucio,
# this can happen if the replica was deleted from rucio in the meantime. As fts tells us that the
# replica is available, there is a high probability that we just generated dark data.
# This opportunistic workflow tries to cleanup this dark data by adding a replica with an expired
# tombstone and letting reaper take care of its deletion.
logger(logging.INFO, "Replica cannot be found. Adding a replica %s:%s AT RSE %s with tombstone=utcnow", replica['scope'], replica['name'], replica['rse_id'])
add_replica_kwargs = {
'rse_id': replica['rse_id'],
'scope': replica['scope'],
'name': replica['name'],
'bytes_': replica['bytes'],
'account': InternalAccount('root', vo=replica['scope'].vo), # it will deleted immediately, do we need to get the accurate account from rule?
'adler32': replica['adler32'],
'tombstone': datetime.datetime.utcnow(),
}
try:
try:
replica_core.add_replica(**add_replica_kwargs, pfn=replica['pfn'] if 'pfn' in replica else None, session=session)
except RSEProtocolNotSupported as error:
# The pfn cannot be matched to any of the protocols configured on the RSE.
# Most probably the RSE protocol configuration changed since the submission.
# Try again without explicit pfn. On non-deterministic RSEs it will fail
# with UnsupportedOperation exception
logger(logging.ERROR, 'Protocol not supported for DID %s:%s at RSE %s - potential dark data - %s', replica['scope'], replica['name'], replica['rse_id'], str(error))
replica_core.add_replica(**add_replica_kwargs, pfn=None, session=session)
except Exception as error:
logger(logging.ERROR, 'Cannot register replica for DID %s:%s at RSE %s - potential dark data - %s', replica['scope'], replica['name'], replica['rse_id'], str(error))
raise

if not replica['archived']:
request_core.archive_request(replica['request_id'], session=session)
logger(logging.INFO, "HANDLED REQUEST %s DID %s:%s AT RSE %s STATE %s", replica['request_id'], replica['scope'], replica['name'], replica['rse_id'], str(replica['state']))
except (UnsupportedOperation, ReplicaNotFound) as error:

except Exception as error:
logger(logging.WARNING, "ERROR WHEN HANDLING REQUEST %s DID %s:%s AT RSE %s STATE %s: %s", replica['request_id'], replica['scope'], replica['name'], replica['rse_id'], str(replica['state']), str(error))
# replica cannot be found. register it and schedule it for deletion
try:
if replica['state'] == ReplicaState.AVAILABLE and replica['request_type'] != RequestType.STAGEIN:
logger(logging.INFO, "Replica cannot be found. Adding a replica %s:%s AT RSE %s with tombstone=utcnow", replica['scope'], replica['name'], replica['rse_id'])
replica_core.add_replica(replica['rse_id'],
replica['scope'],
replica['name'],
replica['bytes'],
pfn=replica['pfn'] if 'pfn' in replica else None,
account=InternalAccount('root', vo=replica['scope'].vo), # it will deleted immediately, do we need to get the accurate account from rule?
adler32=replica['adler32'],
tombstone=datetime.datetime.utcnow(),
session=session)
if not replica['archived']:
request_core.archive_request(replica['request_id'], session=session)
logger(logging.INFO, "HANDLED REQUEST %s DID %s:%s AT RSE %s STATE %s", replica['request_id'], replica['scope'], replica['name'], replica['rse_id'], str(replica['state']))
except Exception as error:
logger(logging.ERROR, 'Cannot register replica for DID %s:%s at RSE %s - potential dark data - %s', replica['scope'], replica['name'], replica['rse_id'], str(error))
raise
raise

return True
logger(logging.INFO, "HANDLED REQUEST %s DID %s:%s AT RSE %s STATE %s", replica['request_id'], replica['scope'], replica['name'], replica['rse_id'], str(replica['state']))

0 comments on commit fc567a0

Please sign in to comment.