Skip to content

Commit

Permalink
Merge pull request #5175 from rcarpa/patch-5170-source_replica_expres…
Browse files Browse the repository at this point in the history
…sion_multihops

Transfers: rework concurrent multihop handling. Closes #5170. Closes #5028
  • Loading branch information
bari12 committed Feb 1, 2022
2 parents c9ea1c2 + 9a4e42b commit e66822f
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 16 deletions.
8 changes: 4 additions & 4 deletions lib/rucio/core/request.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2013-2021 CERN
# Copyright 2013-2022 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@
# - Mario Lassnig <mario.lassnig@cern.ch>, 2013-2021
# - Vincent Garonne <vincent.garonne@cern.ch>, 2013-2017
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2020
# - Martin Barisits <martin.barisits@cern.ch>, 2014-2021
# - Martin Barisits <martin.barisits@cern.ch>, 2014-2022
# - Wen Guan <wen.guan@cern.ch>, 2014-2016
# - Joaquín Bogado <jbogado@linti.unlp.edu.ar>, 2015-2019
# - Thomas Beermann <thomas.beermann@cern.ch>, 2016-2021
Expand All @@ -28,12 +28,11 @@
# - Andrew Lister <andrew.lister@stfc.ac.uk>, 2019
# - Brandon White <bjwhite@fnal.gov>, 2019
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020-2021
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - Radu Carpa <radu.carpa@cern.ch>, 2021-2022
# - Matt Snyder <msnyder@bnl.gov>, 2021
# - Sahan Dilshan <32576163+sahandilshan@users.noreply.github.com>, 2021
# - Nick Smith <nick.smith@cern.ch>, 2021
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021
# - Nick Smith <nick.smith@cern.ch>, 2021

import datetime
import json
Expand Down Expand Up @@ -641,6 +640,7 @@ def get_request_by_did(scope, name, rse_id, request_type=None, session=None):

tmp['source_rse'] = get_rse_name(rse_id=tmp['source_rse_id'], session=session) if tmp['source_rse_id'] is not None else None
tmp['dest_rse'] = get_rse_name(rse_id=tmp['dest_rse_id'], session=session) if tmp['dest_rse_id'] is not None else None
tmp['attributes'] = json.loads(str(tmp['attributes']))

return tmp
except IntegrityError as error:
Expand Down
73 changes: 64 additions & 9 deletions lib/rucio/core/transfer.py
Expand Up @@ -15,7 +15,7 @@
#
# Authors:
# - Mario Lassnig <mario.lassnig@cern.ch>, 2013-2021
# - Martin Barisits <martin.barisits@cern.ch>, 2017-2021
# - Martin Barisits <martin.barisits@cern.ch>, 2017-2022
# - Vincent Garonne <vincent.garonne@cern.ch>, 2017
# - Igor Mandrichenko <rucio@fermicloud055.fnal.gov>, 2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2018-2021
Expand Down Expand Up @@ -69,7 +69,7 @@
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
from rucio.core.replica import add_replicas, tombstone_from_delay, update_replica_state
from rucio.core.request import queue_requests, set_request_state
from rucio.core.request import get_request_by_did, queue_requests, set_request_state
from rucio.core.rse import get_rse_name, get_rse_vo, list_rses
from rucio.core.rse_expression_parser import parse_expression
from rucio.db.sqla import models, filter_thread_work
Expand Down Expand Up @@ -102,6 +102,9 @@

DEFAULT_MULTIHOP_TOMBSTONE_DELAY = int(datetime.timedelta(hours=2).total_seconds())

# For how much time to skip handling a request when a concurrent submission by multiple submitters is suspected
CONCURRENT_SUBMISSION_TOLERATION_DELAY = datetime.timedelta(minutes=5)


class RseData:
"""
Expand Down Expand Up @@ -178,7 +181,7 @@ def __str__(self):

class RequestWithSources:
def __init__(self, id_, request_type, rule_id, scope, name, md5, adler32, byte_count, activity, attributes,
previous_attempt_id, dest_rse_data, account, retry_count, priority, transfertool):
previous_attempt_id, dest_rse_data, account, retry_count, priority, transfertool, requested_at=None):

self.request_id = id_
self.request_type = request_type
Expand All @@ -197,6 +200,7 @@ def __init__(self, id_, request_type, rule_id, scope, name, md5, adler32, byte_c
self.retry_count = retry_count or 0
self.priority = priority if priority is not None else 3
self.transfertool = transfertool
self.requested_at = requested_at if requested_at else datetime.datetime.utcnow()

self.sources = []

Expand Down Expand Up @@ -1144,6 +1148,35 @@ def __transfer_order_key(transfer_path):
yield from sorted(candidate_paths, key=__transfer_order_key)


def __handle_intermediate_hop_requests(
requests_with_sources: "Iterable[RequestWithSources]",
logger: "Callable" = logging.log,
) -> "Generator[RequestWithSources]":
"""
Intermediate request of a multihop shouldn't stay in the QUEUED state for too long.
They should be transited to SUBMITTED state by the submitter who created them
almost immediately after creation.
Due to the distributed nature of rucio, the short time window can be enough for
this intermediate request to be picked by another submitter which will start
working on it. This function takes care that this "other" submitter doesn't try
to submit this intermediate request. It is also responsible to cleanup such
intermediate requests which stays in a queued state for "too long". This probably
means that the initial submitter, who created them, crashed before submission.
"""
now = datetime.datetime.utcnow()
for rws in requests_with_sources:
if rws.attributes.get('initial_request_id'):
# This is an intermediate hop, don't consider it for submission
if rws.requested_at < now - CONCURRENT_SUBMISSION_TOLERATION_DELAY:
logger(logging.WARNING, '%s: marking stalled intermediate hop as submission_failed', rws.request_id)
set_request_state(request_id=rws.request_id, new_state=RequestState.SUBMISSION_FAILED)
else:
logger(logging.WARNING, '%s: skipping intermediate hop from being submitted on its own', rws.request_id)
else:
yield rws


@transactional_session
def next_transfers_to_submit(total_workers=0, worker_number=0, partition_hash_var=None, limit=None, activity=None, older_than=None, rses=None, schemes=None,
failover_schemes=None, filter_transfertool=None, transfertools_by_name=None, request_type=RequestType.TRANSFER,
Expand Down Expand Up @@ -1203,6 +1236,9 @@ def next_transfers_to_submit(total_workers=0, worker_number=0, partition_hash_va
session=session,
)

# Filter (and maybe mark as failed) intermediate hop requests
request_with_sources = list(__handle_intermediate_hop_requests(request_with_sources, logger))

# for each source, compute the (possibly multihop) path between it and the transfer destination
candidate_paths, reqs_no_source, reqs_scheme_mismatch, reqs_only_tape_source = __build_transfer_paths(
request_with_sources,
Expand Down Expand Up @@ -1452,6 +1488,7 @@ def __assign_paths_to_transfertool_and_create_hops(
# intermediate hops (if it is a multihop) work correctly
best_path = None
builder_to_use = None
concurrent_submission_detected = False
for transfer_path in candidate_paths:
builder = None
if transfertools_by_name:
Expand All @@ -1463,11 +1500,19 @@ def __assign_paths_to_transfertool_and_create_hops(
if builder:
break
if builder or not transfertools_by_name:
if create_missing_replicas_and_requests(transfer_path, default_tombstone_delay, logger=logger, session=session):
created, concurrent_submission_detected = create_missing_replicas_and_requests(
transfer_path, default_tombstone_delay, logger=logger, session=session
)
if created:
best_path = transfer_path
builder_to_use = builder
if created or concurrent_submission_detected:
break

if concurrent_submission_detected:
logger(logging.INFO, '%s: Request is being handled by another submitter. Skipping for now.' % request_id)
continue

if not best_path:
reqs_no_host.add(request_id)
logger(logging.INFO, '%s: Cannot pick transfertool, or create intermediate requests' % request_id)
Expand All @@ -1492,12 +1537,13 @@ def create_missing_replicas_and_requests(
default_tombstone_delay: int,
logger: "Callable",
session: "Optional[Session]" = None
) -> bool:
) -> "Tuple[bool, bool]":
"""
Create replicas and requests in the database for the intermediate hops
"""
initial_request_id = transfer_path[-1].rws.request_id
creation_successful = True
concurrent_submission_detected = False
created_requests = []
# Iterate the path in reverse order. The last hop is the initial request, so
# next_hop.rws.request_id will always be initialized when handling the current hop.
Expand Down Expand Up @@ -1537,6 +1583,7 @@ def create_missing_replicas_and_requests(

rws.attributes['next_hop_request_id'] = transfer_path[i + 1].rws.request_id
rws.attributes['initial_request_id'] = initial_request_id
rws.attributes['source_replica_expression'] = hop.src.rse.name
new_req = queue_requests(requests=[{'dest_rse_id': rws.dest_rse.id,
'scope': rws.scope,
'name': rws.name,
Expand All @@ -1549,13 +1596,18 @@ def create_missing_replicas_and_requests(
# If a request already exists, new_req will be an empty list.
if not new_req:
creation_successful = False

existing_request = get_request_by_did(rws.scope, rws.name, rws.dest_rse.id, session=session)
if datetime.datetime.utcnow() - CONCURRENT_SUBMISSION_TOLERATION_DELAY < existing_request['requested_at']:
concurrent_submission_detected = True

break
rws.request_id = new_req[0]['id']
logger(logging.DEBUG, '%s: New request created for the transfer between %s and %s : %s', initial_request_id, transfer_path[0].src, transfer_path[-1].dst, rws.request_id)
set_request_state(rws.request_id, RequestState.QUEUED, session=session, logger=logger)
created_requests.append(rws.request_id)

if not creation_successful:
if not concurrent_submission_detected and not creation_successful:
# Need to fail all the intermediate requests
logger(logging.WARNING, '%s: Multihop : A request already exists for the transfer between %s and %s. Will cancel all the parent requests',
initial_request_id, transfer_path[0].src, transfer_path[-1].dst)
Expand All @@ -1566,7 +1618,7 @@ def create_missing_replicas_and_requests(
except UnsupportedOperation:
logger(logging.ERROR, '%s: Multihop : Cannot cancel all the parent requests : %s', initial_request_id, str(created_requests))

return creation_successful
return creation_successful, concurrent_submission_detected


@read_session
Expand Down Expand Up @@ -1621,6 +1673,7 @@ def __list_transfer_requests_and_source_replicas(
models.Request.retry_count,
models.Request.account,
models.Request.created_at,
models.Request.requested_at,
models.Request.priority,
models.Request.transfertool) \
.with_hint(models.Request, "INDEX(REQUESTS REQUESTS_TYP_STA_UPD_IDX)", 'oracle') \
Expand Down Expand Up @@ -1680,6 +1733,7 @@ def __list_transfer_requests_and_source_replicas(
sub_requests.c.retry_count,
sub_requests.c.priority,
sub_requests.c.transfertool,
sub_requests.c.requested_at,
models.RSE.id.label("source_rse_id"),
models.RSE.rse,
models.RSEFileAssociation.path,
Expand Down Expand Up @@ -1711,7 +1765,7 @@ def __list_transfer_requests_and_source_replicas(

requests_by_id = {}
for (request_id, rule_id, scope, name, md5, adler32, byte_count, activity, attributes, previous_attempt_id, dest_rse_id, account, retry_count,
priority, transfertool, source_rse_id, source_rse_name, file_path, source_ranking, source_url, distance_ranking) in query:
priority, transfertool, requested_at, source_rse_id, source_rse_name, file_path, source_ranking, source_url, distance_ranking) in query:

# If we didn't pre-filter using temporary tables on database side, perform the filtering here
if not use_temp_tables and rses and dest_rse_id not in rses:
Expand All @@ -1722,7 +1776,8 @@ def __list_transfer_requests_and_source_replicas(
request = RequestWithSources(id_=request_id, request_type=request_type, rule_id=rule_id, scope=scope, name=name,
md5=md5, adler32=adler32, byte_count=byte_count, activity=activity, attributes=attributes,
previous_attempt_id=previous_attempt_id, dest_rse_data=RseData(id_=dest_rse_id),
account=account, retry_count=retry_count, priority=priority, transfertool=transfertool)
account=account, retry_count=retry_count, priority=priority, transfertool=transfertool,
requested_at=requested_at)
requests_by_id[request_id] = request

if source_rse_id is not None:
Expand Down
1 change: 1 addition & 0 deletions lib/rucio/tests/test_conveyor.py
Expand Up @@ -272,6 +272,7 @@ def test_fts_non_recoverable_failures_handled_on_multihop(vo, did_factory, root_
assert request['state'] == RequestState.FAILED
request = request_core.get_request_by_did(rse_id=jump_rse_id, **did)
assert request['state'] == RequestState.FAILED
assert request['attributes']['source_replica_expression'] == src_rse

# Each hop is a separate transfer, which will be handled by the poller and marked as failed
assert metrics_mock.get_sample_value('rucio_daemons_conveyor_poller_update_request_state_total', labels={'updated': 'True'}) >= 2
Expand Down
48 changes: 45 additions & 3 deletions lib/rucio/tests/test_transfer.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2021 CERN
# Copyright 2021-2022 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,9 +14,11 @@
# limitations under the License.
#
# Authors:
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - Radu Carpa <radu.carpa@cern.ch>, 2021-2022
# - Martin Barisits <martin.barisits@cern.ch>, 2021

import pytest
from concurrent.futures import ThreadPoolExecutor

from rucio.common.exception import NoDistance
from rucio.core.distance import add_distance
Expand All @@ -26,7 +28,7 @@
from rucio.core import request as request_core
from rucio.core import rse as rse_core
from rucio.db.sqla import models
from rucio.db.sqla.constants import RSEType
from rucio.db.sqla.constants import RSEType, RequestState
from rucio.db.sqla.session import transactional_session
from rucio.common.utils import generate_uuid

Expand Down Expand Up @@ -240,6 +242,46 @@ def test_multihop_requests_created(rse_factory, did_factory, root_account, core_
assert request_core.get_request_by_did(rse_id=intermediate_rse_id, **did)


@pytest.mark.parametrize("core_config_mock", [{"table_content": [
('transfers', 'use_multihop', True)
]}], indirect=True)
@pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [
'rucio.core.rse_expression_parser.REGION', # The list of multihop RSEs is retrieved by an expression
'rucio.core.config.REGION',
]}], indirect=True)
def test_multihop_concurrent_submitters(rse_factory, did_factory, root_account, core_config_mock, caches_mock):
"""
Ensure that multiple concurrent submitters on the same multi-hop don't result in an undesired database state
"""
src_rse, src_rse_id = rse_factory.make_posix_rse()
jump_rse, jump_rse_id = rse_factory.make_posix_rse()
dst_rse, dst_rse_id = rse_factory.make_posix_rse()
rse_core.add_rse_attribute(jump_rse_id, 'available_for_multihop', True)

add_distance(src_rse_id, jump_rse_id, ranking=10)
add_distance(jump_rse_id, dst_rse_id, ranking=10)

did = did_factory.upload_test_file(src_rse)
rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None)

nb_threads = 9
nb_executions = 18
with ThreadPoolExecutor(max_workers=nb_threads) as executor:
futures = [executor.submit(next_transfers_to_submit, rses=rse_factory.created_rses) for _ in range(nb_executions)]
for f in futures:
try:
f.result()
except Exception:
pass

jmp_request = request_core.get_request_by_did(rse_id=jump_rse_id, **did)
dst_request = request_core.get_request_by_did(rse_id=dst_rse_id, **did)
assert jmp_request['state'] == dst_request['state'] == RequestState.QUEUED
assert jmp_request['attributes']['source_replica_expression'] == src_rse
assert jmp_request['attributes']['initial_request_id'] == dst_request['id']
assert jmp_request['attributes']['next_hop_request_id'] == dst_request['id']


@pytest.mark.parametrize("core_config_mock", [{"table_content": [
('transfers', 'use_multihop', True)
]}], indirect=True)
Expand Down

0 comments on commit e66822f

Please sign in to comment.