Skip to content

Commit

Permalink
Transfers: don't resubmit intermediate transfers. Closes rucio#4809
Browse files Browse the repository at this point in the history
  • Loading branch information
rcarpa committed Aug 31, 2021
1 parent 3b196e5 commit 7dd0e67
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
7 changes: 6 additions & 1 deletion lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ def should_retry_request(req, retry_protocol_mismatches):
:param retry_protocol_mismatches: Boolean to retry the transfer in case of protocol mismatch.
:returns: True if should retry it; False if no more retry.
"""
if req['attributes'] and req['attributes'].get('next_hop_request_id'):
# This is an intermediate request in a multi-hop transfer. It must not be re-scheduled on its own.
# If needed, it will be re-scheduled via the creation of a new multi-hop transfer.
return False
if req['state'] == RequestState.SUBMITTING:
return True
if req['state'] == RequestState.NO_SOURCES or req['state'] == RequestState.ONLY_TAPE_SOURCES:
Expand Down Expand Up @@ -583,6 +587,7 @@ def get_request(request_id, session=None):
else:
tmp = dict(tmp)
tmp.pop('_sa_instance_state')
tmp['attributes'] = json.loads(str(tmp['attributes']))
return tmp
except IntegrityError as error:
raise RucioException(error.args)
Expand Down Expand Up @@ -674,7 +679,7 @@ def archive_request(request_id, session=None):
name=req['name'],
dest_rse_id=req['dest_rse_id'],
source_rse_id=req['source_rse_id'],
attributes=req['attributes'],
attributes=json.dumps(req['attributes']) if isinstance(req['attributes'], dict) else req['attributes'],
state=req['state'],
account=req['account'],
external_id=req['external_id'],
Expand Down
12 changes: 10 additions & 2 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
from rucio.core.oidc import get_token_for_account_operation
from rucio.core.replica import add_replicas, tombstone_from_delay
from rucio.core.replica import add_replicas, tombstone_from_delay, update_replica_state
from rucio.core.request import queue_requests, set_requests_state
from rucio.core.rse import get_rse_name, get_rse_vo, list_rses, get_rse_supported_checksums_from_attributes
from rucio.core.rse_expression_parser import parse_expression
Expand Down Expand Up @@ -1598,7 +1598,10 @@ def create_missing_replicas_and_requests(
"""
creation_successful = True
created_requests = []
for hop in transfer_path:
# Iterate the path in reverse order. The last hop is the initial request, so
# next_hop.rws.request_id will always be initialized when handling the current hop.
for i in reversed(range(len(transfer_path))):
hop = transfer_path[i]
rws = hop.rws
if rws.request_id:
continue
Expand All @@ -1621,9 +1624,14 @@ def create_missing_replicas_and_requests(
ignore_availability=False,
dataset_meta=None,
session=session)
# Set replica state to Copying in case replica already existed in another state.
# Can happen when a multihop transfer failed previously, and we are re-scheduling it now.
update_replica_state(rse_id=rws.dest_rse.id, scope=rws.scope, name=rws.name, state=ReplicaState.COPYING)
except Exception as error:
logger(logging.ERROR, 'Problem adding replicas %s:%s on %s : %s', rws.scope, rws.name, rws.dest_rse, str(error))

rws.attributes['next_hop_request_id'] = transfer_path[i + 1].rws.request_id
rws.attributes['initial_request_id'] = transfer_path[-1].rws.request_id
new_req = queue_requests(requests=[{'dest_rse_id': rws.dest_rse.id,
'scope': rws.scope,
'name': rws.name,
Expand Down

0 comments on commit 7dd0e67

Please sign in to comment.