Skip to content

Commit

Permalink
Transfers: try to detect concurrent multihop submissions. Closes #5028
Browse files Browse the repository at this point in the history
If creation of intermediate hops fails, don't immediately consider
the request as failed. It's probable that the problem is temporary
because of another submitter working on the same partition. Just
skip the request for some time.
  • Loading branch information
rcarpa committed Jan 28, 2022
1 parent fb10334 commit 9a4e42b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 9 deletions.
27 changes: 21 additions & 6 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
# Authors:
# - Mario Lassnig <mario.lassnig@cern.ch>, 2013-2021
# - Martin Barisits <martin.barisits@cern.ch>, 2017-2021
# - Martin Barisits <martin.barisits@cern.ch>, 2017-2022
# - Vincent Garonne <vincent.garonne@cern.ch>, 2017
# - Igor Mandrichenko <rucio@fermicloud055.fnal.gov>, 2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2018-2021
Expand Down Expand Up @@ -69,7 +69,7 @@
from rucio.core.config import get as core_config_get
from rucio.core.monitor import record_counter, record_timer
from rucio.core.replica import add_replicas, tombstone_from_delay, update_replica_state
from rucio.core.request import queue_requests, set_request_state
from rucio.core.request import get_request_by_did, queue_requests, set_request_state
from rucio.core.rse import get_rse_name, get_rse_vo, list_rses
from rucio.core.rse_expression_parser import parse_expression
from rucio.db.sqla import models, filter_thread_work
Expand Down Expand Up @@ -1488,6 +1488,7 @@ def __assign_paths_to_transfertool_and_create_hops(
# intermediate hops (if it is a multihop) work correctly
best_path = None
builder_to_use = None
concurrent_submission_detected = False
for transfer_path in candidate_paths:
builder = None
if transfertools_by_name:
Expand All @@ -1499,11 +1500,19 @@ def __assign_paths_to_transfertool_and_create_hops(
if builder:
break
if builder or not transfertools_by_name:
if create_missing_replicas_and_requests(transfer_path, default_tombstone_delay, logger=logger, session=session):
created, concurrent_submission_detected = create_missing_replicas_and_requests(
transfer_path, default_tombstone_delay, logger=logger, session=session
)
if created:
best_path = transfer_path
builder_to_use = builder
if created or concurrent_submission_detected:
break

if concurrent_submission_detected:
logger(logging.INFO, '%s: Request is being handled by another submitter. Skipping for now.' % request_id)
continue

if not best_path:
reqs_no_host.add(request_id)
logger(logging.INFO, '%s: Cannot pick transfertool, or create intermediate requests' % request_id)
Expand All @@ -1528,12 +1537,13 @@ def create_missing_replicas_and_requests(
default_tombstone_delay: int,
logger: "Callable",
session: "Optional[Session]" = None
) -> bool:
) -> "Tuple[bool, bool]":
"""
Create replicas and requests in the database for the intermediate hops
"""
initial_request_id = transfer_path[-1].rws.request_id
creation_successful = True
concurrent_submission_detected = False
created_requests = []
# Iterate the path in reverse order. The last hop is the initial request, so
# next_hop.rws.request_id will always be initialized when handling the current hop.
Expand Down Expand Up @@ -1586,13 +1596,18 @@ def create_missing_replicas_and_requests(
# If a request already exists, new_req will be an empty list.
if not new_req:
creation_successful = False

existing_request = get_request_by_did(rws.scope, rws.name, rws.dest_rse.id, session=session)
if datetime.datetime.utcnow() - CONCURRENT_SUBMISSION_TOLERATION_DELAY < existing_request['requested_at']:
concurrent_submission_detected = True

break
rws.request_id = new_req[0]['id']
logger(logging.DEBUG, '%s: New request created for the transfer between %s and %s : %s', initial_request_id, transfer_path[0].src, transfer_path[-1].dst, rws.request_id)
set_request_state(rws.request_id, RequestState.QUEUED, session=session, logger=logger)
created_requests.append(rws.request_id)

if not creation_successful:
if not concurrent_submission_detected and not creation_successful:
# Need to fail all the intermediate requests
logger(logging.WARNING, '%s: Multihop : A request already exists for the transfer between %s and %s. Will cancel all the parent requests',
initial_request_id, transfer_path[0].src, transfer_path[-1].dst)
Expand All @@ -1603,7 +1618,7 @@ def create_missing_replicas_and_requests(
except UnsupportedOperation:
logger(logging.ERROR, '%s: Multihop : Cannot cancel all the parent requests : %s', initial_request_id, str(created_requests))

return creation_successful
return creation_successful, concurrent_submission_detected


@read_session
Expand Down
48 changes: 45 additions & 3 deletions lib/rucio/tests/test_transfer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2021 CERN
# Copyright 2021-2022 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,9 +14,11 @@
# limitations under the License.
#
# Authors:
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - Radu Carpa <radu.carpa@cern.ch>, 2021-2022
# - Martin Barisits <martin.barisits@cern.ch>, 2021

import pytest
from concurrent.futures import ThreadPoolExecutor

from rucio.common.exception import NoDistance
from rucio.core.distance import add_distance
Expand All @@ -26,7 +28,7 @@
from rucio.core import request as request_core
from rucio.core import rse as rse_core
from rucio.db.sqla import models
from rucio.db.sqla.constants import RSEType
from rucio.db.sqla.constants import RSEType, RequestState
from rucio.db.sqla.session import transactional_session
from rucio.common.utils import generate_uuid

Expand Down Expand Up @@ -240,6 +242,46 @@ def test_multihop_requests_created(rse_factory, did_factory, root_account, core_
assert request_core.get_request_by_did(rse_id=intermediate_rse_id, **did)


@pytest.mark.parametrize("core_config_mock", [{"table_content": [
('transfers', 'use_multihop', True)
]}], indirect=True)
@pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [
'rucio.core.rse_expression_parser.REGION', # The list of multihop RSEs is retrieved by an expression
'rucio.core.config.REGION',
]}], indirect=True)
def test_multihop_concurrent_submitters(rse_factory, did_factory, root_account, core_config_mock, caches_mock):
"""
Ensure that multiple concurrent submitters on the same multi-hop don't result in an undesired database state
"""
src_rse, src_rse_id = rse_factory.make_posix_rse()
jump_rse, jump_rse_id = rse_factory.make_posix_rse()
dst_rse, dst_rse_id = rse_factory.make_posix_rse()
rse_core.add_rse_attribute(jump_rse_id, 'available_for_multihop', True)

add_distance(src_rse_id, jump_rse_id, ranking=10)
add_distance(jump_rse_id, dst_rse_id, ranking=10)

did = did_factory.upload_test_file(src_rse)
rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None)

nb_threads = 9
nb_executions = 18
with ThreadPoolExecutor(max_workers=nb_threads) as executor:
futures = [executor.submit(next_transfers_to_submit, rses=rse_factory.created_rses) for _ in range(nb_executions)]
for f in futures:
try:
f.result()
except Exception:
pass

jmp_request = request_core.get_request_by_did(rse_id=jump_rse_id, **did)
dst_request = request_core.get_request_by_did(rse_id=dst_rse_id, **did)
assert jmp_request['state'] == dst_request['state'] == RequestState.QUEUED
assert jmp_request['attributes']['source_replica_expression'] == src_rse
assert jmp_request['attributes']['initial_request_id'] == dst_request['id']
assert jmp_request['attributes']['next_hop_request_id'] == dst_request['id']


@pytest.mark.parametrize("core_config_mock", [{"table_content": [
('transfers', 'use_multihop', True)
]}], indirect=True)
Expand Down

0 comments on commit 9a4e42b

Please sign in to comment.