Skip to content

Commit

Permalink
Merge pull request #4878 from rcarpa/patch-4809-dont_resubmit_interme…
Browse files Browse the repository at this point in the history
…diate

Transfers: don't resubmit intermediate transfers. Closes #4809
  • Loading branch information
bari12 committed Oct 18, 2021
2 parents f78cbc4 + 3e27e96 commit c3e6552
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
7 changes: 6 additions & 1 deletion lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def should_retry_request(req, retry_protocol_mismatches):
:param retry_protocol_mismatches: Boolean to retry the transfer in case of protocol mismatch.
:returns: True if should retry it; False if no more retry.
"""
if req['attributes'] and req['attributes'].get('next_hop_request_id'):
# This is an intermediate request in a multi-hop transfer. It must not be re-scheduled on its own.
# If needed, it will be re-scheduled via the creation of a new multi-hop transfer.
return False
if req['state'] == RequestState.SUBMITTING:
return True
if req['state'] == RequestState.NO_SOURCES or req['state'] == RequestState.ONLY_TAPE_SOURCES:
Expand Down Expand Up @@ -584,6 +588,7 @@ def get_request(request_id, session=None):
else:
tmp = dict(tmp)
tmp.pop('_sa_instance_state')
tmp['attributes'] = json.loads(str(tmp['attributes']))
return tmp
except IntegrityError as error:
raise RucioException(error.args)
Expand Down Expand Up @@ -675,7 +680,7 @@ def archive_request(request_id, session=None):
name=req['name'],
dest_rse_id=req['dest_rse_id'],
source_rse_id=req['source_rse_id'],
attributes=req['attributes'],
attributes=json.dumps(req['attributes']) if isinstance(req['attributes'], dict) else req['attributes'],
state=req['state'],
account=req['account'],
external_id=req['external_id'],
Expand Down
12 changes: 10 additions & 2 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
from rucio.core.oidc import get_token_for_account_operation
from rucio.core.replica import add_replicas, tombstone_from_delay
from rucio.core.replica import add_replicas, tombstone_from_delay, update_replica_state
from rucio.core.request import queue_requests, set_requests_state
from rucio.core.rse import get_rse_name, get_rse_vo, list_rses, get_rse_supported_checksums_from_attributes
from rucio.core.rse_expression_parser import parse_expression
Expand Down Expand Up @@ -1568,7 +1568,10 @@ def create_missing_replicas_and_requests(
"""
creation_successful = True
created_requests = []
for hop in transfer_path:
# Iterate the path in reverse order. The last hop is the initial request, so
# next_hop.rws.request_id will always be initialized when handling the current hop.
for i in reversed(range(len(transfer_path))):
hop = transfer_path[i]
rws = hop.rws
if rws.request_id:
continue
Expand All @@ -1591,9 +1594,14 @@ def create_missing_replicas_and_requests(
ignore_availability=False,
dataset_meta=None,
session=session)
# Set replica state to Copying in case replica already existed in another state.
# Can happen when a multihop transfer failed previously, and we are re-scheduling it now.
update_replica_state(rse_id=rws.dest_rse.id, scope=rws.scope, name=rws.name, state=ReplicaState.COPYING, session=session)
except Exception as error:
logger(logging.ERROR, 'Problem adding replicas %s:%s on %s : %s', rws.scope, rws.name, rws.dest_rse, str(error))

rws.attributes['next_hop_request_id'] = transfer_path[i + 1].rws.request_id
rws.attributes['initial_request_id'] = transfer_path[-1].rws.request_id
new_req = queue_requests(requests=[{'dest_rse_id': rws.dest_rse.id,
'scope': rws.scope,
'name': rws.name,
Expand Down
16 changes: 15 additions & 1 deletion lib/rucio/tests/test_conveyor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import pytest

import rucio.daemons.reaper.reaper
from rucio.common.exception import ReplicaNotFound
from rucio.common.exception import ReplicaNotFound, RequestNotFound
from rucio.core import distance as distance_core
from rucio.core import replica as replica_core
from rucio.core import request as request_core
Expand Down Expand Up @@ -229,6 +229,13 @@ def test_fts_non_recoverable_failures_handled_on_multihop(vo, did_factory, root_
# Each hop is a separate transfer, which will be handled by the poller and marked as failed
assert metrics_mock.get_sample_value('rucio_daemons_conveyor_poller_update_request_state_total', labels={'updated': 'True'}) >= 2

finisher(once=True, partition_wait_time=None)
# The intermediate request must not be re-scheduled by finisher
with pytest.raises(RequestNotFound):
request_core.get_request_by_did(rse_id=jump_rse_id, **did)
request = request_core.get_request_by_did(rse_id=dst_rse_id, **did)
assert request['state'] == RequestState.QUEUED


@skip_rse_tests_with_accounts
@pytest.mark.dirty(reason="leaves files in XRD containers")
Expand Down Expand Up @@ -443,6 +450,13 @@ def test_multihop_receiver_on_failure(vo, did_factory, replica_client, root_acco
# First hop will be handled by receiver; second hop by poller
assert metrics_mock.get_sample_value('rucio_daemons_conveyor_receiver_update_request_state_total', labels={'updated': 'True'}) >= 1
assert metrics_mock.get_sample_value('rucio_daemons_conveyor_poller_update_request_state_total', labels={'updated': 'True'}) >= 1

finisher(once=True, partition_wait_time=None)
# The intermediate request must not be re-scheduled by finisher
with pytest.raises(RequestNotFound):
request_core.get_request_by_did(rse_id=jump_rse_id, **did)
request = request_core.get_request_by_did(rse_id=dst_rse_id, **did)
assert request['state'] == RequestState.QUEUED
finally:
receiver_graceful_stop.set()
receiver_thread.join(timeout=5)
Expand Down

0 comments on commit c3e6552

Please sign in to comment.