Skip to content

Commit

Permalink
Merge pull request #5404 from rcarpa/patch-5403-native_multihop
Browse files Browse the repository at this point in the history
Transfers: native multi-hop between transfertools. Closes #5403
  • Loading branch information
bari12 committed Apr 4, 2022
2 parents 7e9d562 + 538ae9e commit 2faa5ac
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 111 deletions.
10 changes: 8 additions & 2 deletions lib/rucio/core/request.py
Expand Up @@ -204,6 +204,7 @@ def requeue_and_archive(request, source_ranking_update=True, retry_protocol_mism
else:
new_req['sources'][i]['ranking'] -= 1
new_req['sources'][i]['is_using'] = False
new_req.pop('state', None)
queue_requests([new_req], session=session, logger=logger)
return new_req
else:
Expand Down Expand Up @@ -273,7 +274,8 @@ def temp_serializer(obj):
return obj.internal
raise TypeError('Could not serialise object %r' % obj)

request['state'] = RequestState.PREPARING if preparer_enabled else RequestState.QUEUED
if 'state' not in request:
request['state'] = RequestState.PREPARING if preparer_enabled else RequestState.QUEUED

new_request = {'request_type': request['request_type'],
'scope': request['scope'],
Expand All @@ -291,6 +293,8 @@ def temp_serializer(obj):
'priority': request['attributes'].get('priority', None),
'requested_at': request.get('requested_at', None),
'retry_count': request['retry_count']}
if 'transfertool' in request:
new_request['transfertool'] = request['transfertool']
if 'previous_attempt_id' in request and 'retry_count' in request:
new_request['previous_attempt_id'] = request['previous_attempt_id']
new_request['id'] = request['request_id']
Expand Down Expand Up @@ -356,6 +360,7 @@ def list_transfer_requests_and_source_replicas(
activity=None,
older_than=None,
rses=None,
multihop_rses=None,
request_type=RequestType.TRANSFER,
request_state=None,
ignore_availability=False,
Expand All @@ -371,6 +376,7 @@ def list_transfer_requests_and_source_replicas(
:param activity: Activity to be selected.
:param older_than: Only select requests older than this DateTime.
:param rses: List of rse_id to select requests.
:param multihop_rses: List of rse_id allowed to be used for multihop
:param request_type: Filter on the given request type.
:param request_state: Filter on the given request state
:param transfertool: The transfer tool as specified in rucio.cfg.
Expand Down Expand Up @@ -484,7 +490,7 @@ def list_transfer_requests_and_source_replicas(
.with_hint(models.Distance, "INDEX(DISTANCES DISTANCES_PK)", 'oracle')

# if transfertool specified, select only the requests where the source rses are set up for the transfer tool
if transfertool:
if transfertool and not multihop_rses:
query = query.subquery()
query = session.query(query) \
.join(models.RSEAttrAssociation, models.RSEAttrAssociation.rse_id == query.c.source_rse_id) \
Expand Down
52 changes: 46 additions & 6 deletions lib/rucio/core/transfer.py
Expand Up @@ -351,12 +351,21 @@ def transfer_path_str(transfer_path: "List[DirectTransferDefinition]") -> str:
if not transfer_path:
return 'empty transfer path'

multi_tt = False
if len({hop.rws.transfertool for hop in transfer_path if hop.rws.transfertool}) > 1:
# The path relies on more than one transfertool
multi_tt = True

if len(transfer_path) == 1:
return str(transfer_path[0])

path_str = str(transfer_path[0].src.rse)
for hop in transfer_path:
path_str += '--{request_id}->{destination}'.format(request_id=hop.rws.request_id or '', destination=hop.dst.rse)
path_str += '--{request_id}{transfertool}->{destination}'.format(
request_id=hop.rws.request_id or '',
transfertool=':{}'.format(hop.rws.transfertool) if multi_tt else '',
destination=hop.dst.rse,
)
return path_str


Expand Down Expand Up @@ -925,7 +934,7 @@ def __transfer_order_key(transfer_path):

@transactional_session
def get_transfer_paths(total_workers=0, worker_number=0, partition_hash_var=None, limit=None, activity=None, older_than=None, rses=None, schemes=None,
failover_schemes=None, filter_transfertool=None, request_type=RequestType.TRANSFER,
failover_schemes=None, active_transfertools=None, filter_transfertool=None, request_type=RequestType.TRANSFER,
ignore_availability=False, logger=logging.log, session=None):
"""
Get next transfers to be submitted; grouped by transfertool which can submit them
Expand All @@ -939,6 +948,7 @@ def get_transfer_paths(total_workers=0, worker_number=0, partition_hash_var=None
:param schemes: Include schemes.
:param failover_schemes: Failover schemes.
:param filter_transfertool: The transfer tool to filter requests on.
:param active_transfertools: The transfer tool names which are supported by the calling context.
:param request_type The type of requests to retrieve (Transfer/Stagein)
:param ignore_availability: Ignore blocklisted RSEs
:param logger: Optional decorated logger that can be passed from the calling daemons or servers.
Expand All @@ -948,9 +958,7 @@ def get_transfer_paths(total_workers=0, worker_number=0, partition_hash_var=None
Workflow:
"""

include_multihop = False
if filter_transfertool in ['fts3', None]:
include_multihop = core_config_get('transfers', 'use_multihop', default=False, expiration_time=600, session=session)
include_multihop = core_config_get('transfers', 'use_multihop', default=False, expiration_time=600, session=session)

multihop_rses = []
if include_multihop:
Expand Down Expand Up @@ -988,6 +996,7 @@ def get_transfer_paths(total_workers=0, worker_number=0, partition_hash_var=None
activity=activity,
older_than=older_than,
rses=rses,
multihop_rses=multihop_rses,
request_type=request_type,
request_state=RequestState.QUEUED,
ignore_availability=ignore_availability,
Expand All @@ -1005,11 +1014,29 @@ def get_transfer_paths(total_workers=0, worker_number=0, partition_hash_var=None
failover_schemes=failover_schemes,
admin_accounts=admin_accounts,
ignore_availability=ignore_availability,
active_transfertools=active_transfertools,
logger=logger,
session=session,
)


def __parse_request_transfertools(
rws: "RequestWithSources",
logger: "Callable" = logging.log,
):
"""
Parse a set of desired transfertool names from the database field request.transfertool
"""
request_transfertools = set()
try:
if rws.transfertool:
request_transfertools = {tt.strip() for tt in rws.transfertool.split(',')}
except Exception:
logger(logging.WARN, "Unable to parse requested transfertools: {}".format(request_transfertools))
request_transfertools = None
return request_transfertools


def __build_transfer_paths(
requests_with_sources: "Iterable[RequestWithSources]",
multihop_rses: "List[str]",
Expand All @@ -1018,6 +1045,7 @@ def __build_transfer_paths(
schemes: "List[str]",
failover_schemes: "List[str]",
admin_accounts: "Set[InternalAccount]",
active_transfertools: "Set[str]" = None,
ignore_availability: bool = False,
logger: "Callable" = logging.log,
session: "Optional[Session]" = None,
Expand Down Expand Up @@ -1047,6 +1075,7 @@ def __build_transfer_paths(
multihop_rses = list(set(multihop_rses).difference(unavailable_write_rse_ids).difference(unavailable_read_rse_ids))

candidate_paths_by_request_id, reqs_no_source, reqs_only_tape_source, reqs_scheme_mismatch = {}, set(), set(), set()
reqs_unsupported_transfertool = set()
for rws in requests_with_sources:

ctx.ensure_fully_loaded(rws.dest_rse)
Expand Down Expand Up @@ -1077,6 +1106,17 @@ def __build_transfer_paths(
logger(logging.WARNING, '%s: dst RSE is restricted for write. Will skip the submission', rws.request_id)
continue

request_transfertool = __parse_request_transfertools(rws, logger)
if request_transfertool is None:
logger(logging.WARNING, '%s: failed to parse transfertool from request', rws.request_id)
continue
if request_transfertool and active_transfertools and not request_transfertool.intersection(active_transfertools):
# The request explicitly asks for a transfertool which this submitter doesn't support
logger(logging.INFO, '%s: unsupported transfertool. Skipping.', rws.request_id)
reqs_unsupported_transfertool.add(rws.request_id)
reqs_no_source.remove(rws.request_id)
continue

# parse source expression
source_replica_expression = rws.attributes.get('source_replica_expression', None)
allowed_source_rses = None
Expand Down Expand Up @@ -1197,7 +1237,7 @@ def __build_transfer_paths(
candidate_paths_by_request_id[rws.request_id] = candidate_paths
reqs_no_source.remove(rws.request_id)

return candidate_paths_by_request_id, reqs_no_source, reqs_scheme_mismatch, reqs_only_tape_source
return candidate_paths_by_request_id, reqs_no_source, reqs_scheme_mismatch, reqs_only_tape_source, reqs_unsupported_transfertool


@read_session
Expand Down

0 comments on commit 2faa5ac

Please sign in to comment.