Skip to content

Commit

Permalink
Transfers: native multi-hop between transfertools. Closes rucio#5403
Browse files Browse the repository at this point in the history
Adapt the transfertool argument of the submitter to also accept
a comma-separated list of transfertool names. Most of the needed
functionality to accept such a list was already implemented.
I perform a small refactoring to put the "external_name" of each
transfertool class as a class attribute. This allows for a more
generic code in some parts.

Also adapt the logic which prepares the path submission to a
transfertool. Until now, the submitter was forced to submit the
full path to a single transfertool. This commit changes the
behavior. Now a full path can be splitted into sub-paths, each
of them handled by a different transfertool. In one submitter
iteration, only the first sub-path will be submitted, the rest
will be left in a "queued" state, but protected from being
picked by another submitter thanks to the new "transfer_hops"
table. When the first sub-path will be completed, submission
will resume naturally.

To ensure intermediate hops left in the queued state are picked
by the correct submitter, we now set the "transfertool" field
on intermediate requests. Obviously, the value of this field
can be different from that of the initial request.

Because of the experimental nature of the globus transfertool,
I add a hard-coded protection which ensures that submission is
only done to globus if it's the first element in the submitters
"transfertool" argument.
  • Loading branch information
rcarpa committed Mar 30, 2022
1 parent 8ea0202 commit 14e6bde
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 81 deletions.
10 changes: 8 additions & 2 deletions lib/rucio/core/request.py
Expand Up @@ -204,6 +204,7 @@ def requeue_and_archive(request, source_ranking_update=True, retry_protocol_mism
else:
new_req['sources'][i]['ranking'] -= 1
new_req['sources'][i]['is_using'] = False
new_req.pop('state', None)
queue_requests([new_req], session=session, logger=logger)
return new_req
else:
Expand Down Expand Up @@ -273,7 +274,8 @@ def temp_serializer(obj):
return obj.internal
raise TypeError('Could not serialise object %r' % obj)

request['state'] = RequestState.PREPARING if preparer_enabled else RequestState.QUEUED
if 'state' not in request:
request['state'] = RequestState.PREPARING if preparer_enabled else RequestState.QUEUED

new_request = {'request_type': request['request_type'],
'scope': request['scope'],
Expand All @@ -291,6 +293,8 @@ def temp_serializer(obj):
'priority': request['attributes'].get('priority', None),
'requested_at': request.get('requested_at', None),
'retry_count': request['retry_count']}
if 'transfertool' in request:
new_request['transfertool'] = request['transfertool']
if 'previous_attempt_id' in request and 'retry_count' in request:
new_request['previous_attempt_id'] = request['previous_attempt_id']
new_request['id'] = request['request_id']
Expand Down Expand Up @@ -356,6 +360,7 @@ def list_transfer_requests_and_source_replicas(
activity=None,
older_than=None,
rses=None,
multihop_rses=None,
request_type=RequestType.TRANSFER,
request_state=None,
ignore_availability=False,
Expand All @@ -371,6 +376,7 @@ def list_transfer_requests_and_source_replicas(
:param activity: Activity to be selected.
:param older_than: Only select requests older than this DateTime.
:param rses: List of rse_id to select requests.
:param multihop_rses: List of rse_id allowed to be used for multihop
:param request_type: Filter on the given request type.
:param request_state: Filter on the given request state
:param transfertool: The transfer tool as specified in rucio.cfg.
Expand Down Expand Up @@ -484,7 +490,7 @@ def list_transfer_requests_and_source_replicas(
.with_hint(models.Distance, "+ index(distances DISTANCES_PK)", 'oracle')

# if transfertool specified, select only the requests where the source rses are set up for the transfer tool
if transfertool:
if transfertool and not multihop_rses:
query = query.subquery()
query = session.query(query) \
.join(models.RSEAttrAssociation, models.RSEAttrAssociation.rse_id == query.c.source_rse_id) \
Expand Down
16 changes: 12 additions & 4 deletions lib/rucio/core/transfer.py
Expand Up @@ -351,12 +351,21 @@ def transfer_path_str(transfer_path: "List[DirectTransferDefinition]") -> str:
if not transfer_path:
return 'empty transfer path'

multi_tt = False
if len({hop.rws.transfertool for hop in transfer_path if hop.rws.transfertool}) > 1:
# The path relies on more than one transfertool
multi_tt = True

if len(transfer_path) == 1:
return str(transfer_path[0])

path_str = str(transfer_path[0].src.rse)
for hop in transfer_path:
path_str += '--{request_id}->{destination}'.format(request_id=hop.rws.request_id or '', destination=hop.dst.rse)
path_str += '--{request_id}{transfertool}->{destination}'.format(
request_id=hop.rws.request_id or '',
transfertool=':{}'.format(hop.rws.transfertool) if multi_tt else '',
destination=hop.dst.rse,
)
return path_str


Expand Down Expand Up @@ -942,9 +951,7 @@ def get_transfer_paths(total_workers=0, worker_number=0, partition_hash_var=None
Workflow:
"""

include_multihop = False
if filter_transfertool in ['fts3', None]:
include_multihop = core_config_get('transfers', 'use_multihop', default=False, expiration_time=600, session=session)
include_multihop = core_config_get('transfers', 'use_multihop', default=False, expiration_time=600, session=session)

multihop_rses = []
if include_multihop:
Expand Down Expand Up @@ -982,6 +989,7 @@ def get_transfer_paths(total_workers=0, worker_number=0, partition_hash_var=None
activity=activity,
older_than=older_than,
rses=rses,
multihop_rses=multihop_rses,
request_type=request_type,
request_state=RequestState.QUEUED,
ignore_availability=ignore_availability,
Expand Down
131 changes: 90 additions & 41 deletions lib/rucio/daemons/conveyor/common.py
Expand Up @@ -184,7 +184,7 @@ def run_conveyor_daemon(once, graceful_stop, executable, logger_prefix, partitio

@transactional_session
def next_transfers_to_submit(total_workers=0, worker_number=0, partition_hash_var=None, limit=None, activity=None, older_than=None, rses=None, schemes=None,
failover_schemes=None, filter_transfertool=None, transfertools_by_name=None, request_type=RequestType.TRANSFER,
failover_schemes=None, filter_transfertool=None, transfertool_classes=None, request_type=RequestType.TRANSFER,
ignore_availability=False, logger=logging.log, session=None):
"""
Get next transfers to be submitted; grouped by transfertool which can submit them
Expand All @@ -197,7 +197,7 @@ def next_transfers_to_submit(total_workers=0, worker_number=0, partition_hash_va
:param rses: Include RSES.
:param schemes: Include schemes.
:param failover_schemes: Failover schemes.
:param transfertools_by_name: Dict: {transfertool_name_str: transfertool class}
:param transfertool_classes: List of transfertool classes which can be used by this submitter
:param filter_transfertool: The transfer tool to filter requests on.
:param request_type The type of requests to retrieve (Transfer/Stagein)
:param ignore_availability: Ignore blocklisted RSEs
Expand Down Expand Up @@ -226,7 +226,7 @@ def next_transfers_to_submit(total_workers=0, worker_number=0, partition_hash_va
# if the chosen best path is a multihop, create intermediate replicas and the intermediate transfer requests
paths_by_transfertool_builder, reqs_no_host, reqs_unsupported_transfertool = __assign_paths_to_transfertool_and_create_hops(
candidate_paths,
transfertools_by_name=transfertools_by_name,
transfertool_classes=transfertool_classes,
logger=logger,
session=session,
)
Expand Down Expand Up @@ -264,9 +264,49 @@ def __parse_request_transfertools(
return request_transfertools


def __assign_to_transfertool(
transfer_path: "List[DirectTransferDefinition]",
transfertool_classes: "List[Type[Transfertool]]",
logger: "Callable",
) -> "List[Tuple[List[DirectTransferDefinition], Optional[TransferToolBuilder]]]":
"""
Iterate over a multihop path and assign sub-paths to transfertools in chucks from left to right.
Assignment is done in a greedy way. At each step, the first transfertool which can submit any non-empty prefix
is selected. No backtracking is done to find better alternatives.
For example, for a path A->B->C->D->E, A->B->C may be assigned to transfertool1; while C->D->E to transfertool2.
This even if transfertool2 could submit the full path in one step without splitting it.
"""
if transfertool_classes is None:
return [(transfer_path, None)]

remaining_hops = transfer_path
tt_builder_for_hops = []
while remaining_hops:
tt_builder = None
assigned_hops = []
for transfertool_cls in transfertool_classes:
assigned_hops, tt_builder = transfertool_cls.submission_builder_for_path(remaining_hops, logger=logger)
if assigned_hops:
break

if not assigned_hops:
break

remaining_hops = remaining_hops[len(assigned_hops):]
tt_builder_for_hops.append((assigned_hops, tt_builder))

if remaining_hops:
# We cannot submit the whole path
return []

return tt_builder_for_hops


def __assign_paths_to_transfertool_and_create_hops(
candidate_paths_by_request_id: "Dict[str: List[DirectTransferDefinition]]",
transfertools_by_name: "Optional[Dict[str, Type[Transfertool]]]" = None,
transfertool_classes: "Optional[List[Type[Transfertool]]]" = None,
logger: "Callable" = logging.log,
session: "Optional[Session]" = None,
) -> "Tuple[Dict[TransferToolBuilder, List[DirectTransferDefinition]], Set[str], Set[str]]":
Expand All @@ -282,40 +322,45 @@ def __assign_paths_to_transfertool_and_create_hops(
# Get the rws object from any candidate path. It is the same for all candidate paths. For multihop, the initial request is the last hop
rws = candidate_paths[0][-1].rws

request_transfertools = __parse_request_transfertools(rws, logger)
if request_transfertools is None:
initial_request_transfertool = __parse_request_transfertools(rws, logger)
if initial_request_transfertool is None:
# Parsing failed
reqs_no_host.add(request_id)
continue
if request_transfertools and transfertools_by_name and not request_transfertools.intersection(transfertools_by_name):
if initial_request_transfertool and transfertool_classes and \
not initial_request_transfertool.intersection(t.external_name for t in transfertool_classes):
# The request explicitly asks for a transfertool which this submitter doesn't support
reqs_unsupported_transfertool.add(request_id)
continue

# Selects the first path which can be submitted by a supported transfertool and for which the creation of
# intermediate hops (if it is a multihop) work correctly
# Selects the first path which can be submitted using a chain of supported transfertools
# and for which the creation of intermediate hops (if it is a multihop) works correctly
best_path = None
builder_to_use = None
hops_to_submit = []
must_skip_submission = False
for transfer_path in candidate_paths:
builder = None
if transfertools_by_name:
transfertools_to_try = set(transfertools_by_name)
if request_transfertools:
transfertools_to_try = transfertools_to_try.intersection(request_transfertools)
for transfertool in transfertools_to_try:
builder = transfertools_by_name[transfertool].submission_builder_for_path(transfer_path, logger=logger)
if builder:
break
if builder or not transfertools_by_name:
created, must_skip_submission = __create_missing_replicas_and_requests(
transfer_path, default_tombstone_delay, logger=logger, session=session
)
if created:
best_path = transfer_path
builder_to_use = builder
if created or must_skip_submission:
break

tt_assignments = [(transfer_path, __assign_to_transfertool(transfer_path, transfertool_classes, logger=logger))
for transfer_path in candidate_paths]
# Prioritize the paths which need less transfertool transitions.
# Ideally, the entire path should be submitted to a single transfertool
for transfer_path, tt_assignment in sorted(tt_assignments, key=lambda t: len(t[1])):
# Set the 'transfertool' field on the intermediate hops which should be created in the database
for sub_path, tt_builder in tt_assignment:
if tt_builder:
for hop in sub_path:
if hop is not transfer_path[-1]:
hop.rws.transfertool = tt_builder.transfertool_class.external_name
created, must_skip_submission = __create_missing_replicas_and_requests(
transfer_path, default_tombstone_delay, logger=logger, session=session
)
if created:
best_path = transfer_path
# Only the first sub-path will be submitted to the corresponding transfertool,
# the rest of the hops will wait for first hops to be transferred
hops_to_submit, builder_to_use = tt_assignment[0]
if created or must_skip_submission:
break

if not best_path:
reqs_no_host.add(request_id)
Expand All @@ -337,7 +382,10 @@ def __assign_paths_to_transfertool_and_create_hops(
logger(logging.INFO, '%s: Part of the transfer is already being handled. Skip for now.' % request_id)
continue

paths_by_transfertool_builder.setdefault(builder_to_use, []).append(best_path)
if len(hops_to_submit) < len(best_path):
logger(logging.INFO, '%s: Only first %d hops will be submitted', request_id, len(hops_to_submit))

paths_by_transfertool_builder.setdefault(builder_to_use, []).append(hops_to_submit)
return paths_by_transfertool_builder, reqs_no_host, reqs_unsupported_transfertool


Expand Down Expand Up @@ -397,21 +445,22 @@ def __create_missing_replicas_and_requests(
rws.attributes['next_hop_request_id'] = transfer_path[i + 1].rws.request_id
rws.attributes['initial_request_id'] = initial_request_id
rws.attributes['source_replica_expression'] = hop.src.rse.name
new_req = queue_requests(requests=[{'dest_rse_id': rws.dest_rse.id,
'scope': rws.scope,
'name': rws.name,
'rule_id': '00000000000000000000000000000000', # Dummy Rule ID used for multihop. TODO: Replace with actual rule_id once we can flag intermediate requests
'attributes': rws.attributes,
'request_type': rws.request_type,
'retry_count': rws.retry_count,
'account': rws.account,
'requested_at': datetime.datetime.now()}], session=session)
req_to_queue = {'dest_rse_id': rws.dest_rse.id,
'state': RequestState.QUEUED,
'scope': rws.scope,
'name': rws.name,
'rule_id': '00000000000000000000000000000000', # Dummy Rule ID used for multihop. TODO: Replace with actual rule_id once we can flag intermediate requests
'attributes': rws.attributes,
'request_type': rws.request_type,
'retry_count': rws.retry_count,
'account': rws.account,
'requested_at': datetime.datetime.now()}
if rws.transfertool:
req_to_queue['transfertool'] = rws.transfertool
new_req = queue_requests(requests=[req_to_queue], session=session)
# If a request already exists, new_req will be an empty list.
if new_req:
db_req = new_req[0]
# queue_request may put it in PREPARING state; we don't want that.
# TODO: maybe better to not relly on queue_request here?
set_request_state(db_req['id'], RequestState.QUEUED, session=session, logger=logger)
logger(logging.DEBUG, '%s: New request created for the transfer between %s and %s : %s', initial_request_id, transfer_path[0].src, transfer_path[-1].dst, db_req['id'])
else:
db_req = request_core.get_request_by_did(rws.scope, rws.name, rws.dest_rse.id, session=session)
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/daemons/conveyor/stager.py
Expand Up @@ -62,7 +62,7 @@ def run_once(bulk, group_bulk, rse_ids, scheme, failover_scheme, transfertool_kw
activity=activity,
rses=rse_ids,
schemes=scheme,
transfertools_by_name={'fts3': FTS3Transfertool},
transfertool_classes=[FTS3Transfertool],
older_than=None,
request_type=RequestType.STAGEIN,
logger=logger,
Expand Down
27 changes: 21 additions & 6 deletions lib/rucio/daemons/conveyor/submitter.py
Expand Up @@ -52,6 +52,7 @@
from rucio.common.logging import setup_logging
from rucio.common.schema import get_schema_value
from rucio.core.monitor import MultiCounter, record_timer
from rucio.core.transfer import transfer_path_str
from rucio.daemons.conveyor.common import submit_transfer, get_conveyor_rses, run_conveyor_daemon, next_transfers_to_submit
from rucio.db.sqla.constants import RequestType
from rucio.transfertool.fts3 import FTS3Transfertool
Expand All @@ -68,13 +69,13 @@
documentation='Number of transfers retrieved')

TRANSFERTOOL_CLASSES_BY_NAME = {
'fts3': FTS3Transfertool,
'globus': GlobusTransferTool,
'mock': MockTransfertool,
FTS3Transfertool.external_name: FTS3Transfertool,
GlobusTransferTool.external_name: GlobusTransferTool,
MockTransfertool.external_name: MockTransfertool,
}


def run_once(bulk, group_bulk, filter_transfertool, transfertool, ignore_availability, rse_ids,
def run_once(bulk, group_bulk, filter_transfertool, transfertools, ignore_availability, rse_ids,
scheme, failover_scheme, partition_hash_var, timeout, transfertool_kwargs,
heartbeat_handler, activity):
worker_number, total_workers, logger = heartbeat_handler.live()
Expand All @@ -90,7 +91,7 @@ def run_once(bulk, group_bulk, filter_transfertool, transfertool, ignore_availab
rses=rse_ids,
schemes=scheme,
filter_transfertool=filter_transfertool,
transfertools_by_name={transfertool: TRANSFERTOOL_CLASSES_BY_NAME[transfertool]},
transfertool_classes=[TRANSFERTOOL_CLASSES_BY_NAME[transfertool] for transfertool in transfertools],
older_than=None,
request_type=RequestType.TRANSFER,
ignore_availability=ignore_availability,
Expand All @@ -104,6 +105,19 @@ def run_once(bulk, group_bulk, filter_transfertool, transfertool, ignore_availab
logger(logging.INFO, 'Got %s transfers for %s in %s seconds', total_transfers, activity, time.time() - start_time)

for builder, transfer_paths in transfers.items():
# Globus Transfertool is not yet production-ready, but we need to partially activate it
# in all submitters if we want to enable native multi-hopping between transfertools.
# This "if" can be triggered in a FTS submitter if it tries to multi-hop from
# a globus-only RSE via a dual-stack RSE towards an FTS-only RSE.
#
# Just ignore this transfer and keep it in a queued state, so that it's picked up
# latter by that special submitter instance dedicated to globus transfers.
#
# TODO: remove this "if"
if transfertools[0] != GlobusTransferTool.external_name and builder.transfertool_class == GlobusTransferTool:
logger(logging.INFO, 'Skipping submission of following transfers: %s', [transfer_path_str(p) for p in transfer_paths])
continue

transfertool_obj = builder.make_transfertool(logger=logger, **transfertool_kwargs.get(builder.transfertool_class, {}))
start_time = time.time()
logger(logging.DEBUG, 'Starting to group transfers for %s (%s)', activity, transfertool_obj)
Expand Down Expand Up @@ -181,6 +195,7 @@ def submitter(once=False, rses=None, partition_wait_time=10,
else:
rse_ids = None

transfertools = transfertool.split(',')
transfertool_kwargs = {
FTS3Transfertool: {
'group_policy': group_policy,
Expand Down Expand Up @@ -209,7 +224,7 @@ def submitter(once=False, rses=None, partition_wait_time=10,
bulk=bulk,
group_bulk=group_bulk,
filter_transfertool=filter_transfertool,
transfertool=transfertool,
transfertools=transfertools,
ignore_availability=ignore_availability,
scheme=scheme,
failover_scheme=failover_scheme,
Expand Down

0 comments on commit 14e6bde

Please sign in to comment.