Skip to content

Commit

Permalink
Transfers: use objects instead of dicts for submission preparation. #857
Browse files Browse the repository at this point in the history


Pass transfer definition objects through to transfertool.submit()
The transfertool API arguments are now constructed in this method.
This allows to have a code path which is mostly free of transfertool
specifics all the way till the call to submit().

There are still some relics from previous behavior. For example,
job_params which is still set for globus while not needed. I intend to
work on that in further refactorings to avoid growing this, already big,
commit even further.

Get rid of the legacy_transfer_definition from the objects. It was
very fts-specific. It was also used to store both attributes needed for
`params`; and for `files` FTS API arguments:
https://fts3-docs.web.cern.ch/fts3-docs/fts-rest/docs/bulk.html
Directly generate each of these two API arguments in the corresponding
places and try to not mix the two. A small hack was needed for
checksum handling, because it is used in both params and files.
I'll try to get rid of this hack in further refactorings.
  • Loading branch information
rcarpa committed Aug 26, 2021
1 parent fb766f3 commit 30f2a77
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 460 deletions.
324 changes: 116 additions & 208 deletions lib/rucio/core/transfer.py

Large diffs are not rendered by default.

366 changes: 153 additions & 213 deletions lib/rucio/daemons/conveyor/common.py

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions lib/rucio/daemons/conveyor/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,13 @@ def stager(once=False, rses=None, bulk=100, group_bulk=1, group_policy='rule',
for external_host, transfer_paths in transfers.items():
logger(logging.INFO, 'Starting to group transfers for %s (%s)' % (activity, external_host))
start_time = time.time()
for transfer_path in transfer_paths:
for i, hop in enumerate(transfer_path):
hop.init_legacy_transfer_definition(bring_online=bring_online, default_lifetime=-1, logger=logger)
grouped_jobs = bulk_group_transfers_for_fts(transfer_paths, group_policy, group_bulk, source_strategy, max_time_in_queue)
grouped_jobs = bulk_group_transfers_for_fts(transfer_paths, group_policy, group_bulk, source_strategy, max_time_in_queue,
bring_online=bring_online, default_lifetime=-1)
record_timer('daemons.conveyor.stager.bulk_group_transfer', (time.time() - start_time) * 1000 / (len(transfer_paths) or 1))

logger(logging.INFO, 'Starting to submit transfers for %s (%s)' % (activity, external_host))
for job in grouped_jobs:
submit_transfer(external_host=external_host, job=job, submitter='transfer_submitter', logger=logger)
submit_transfer(external_host=external_host, transfers=job['transfers'], job_params=job['job_params'], submitter='transfer_submitter', logger=logger)

if total_transfers < group_bulk:
logger(logging.INFO, 'Only %s transfers for %s which is less than group bulk %s, sleep %s seconds' % (total_transfers, activity, group_bulk, sleep_time))
Expand Down
12 changes: 3 additions & 9 deletions lib/rucio/daemons/conveyor/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,9 @@ def submitter(once=False, rses=None, partition_wait_time=10,
for external_host, transfer_paths in transfers.items():
start_time = time.time()
logger(logging.INFO, 'Starting to group transfers for %s (%s)', activity, external_host)
for transfer_path in transfer_paths:
for i, hop in enumerate(transfer_path):
hop.init_legacy_transfer_definition(bring_online=bring_online, default_lifetime=172800, logger=logger)
if len(transfer_path) > 1:
hop['multihop'] = True
hop['initial_request_id'] = transfer_path[-1].rws.request_id
hop['parent_request'] = transfer_path[i - 1].rws.request_id if i > 0 else None
if transfertool in ['fts3', 'mock']:
grouped_jobs = bulk_group_transfers_for_fts(transfer_paths, group_policy, group_bulk, source_strategy, max_time_in_queue, archive_timeout_override=archive_timeout_override)
grouped_jobs = bulk_group_transfers_for_fts(transfer_paths, group_policy, group_bulk, source_strategy, max_time_in_queue,
bring_online=bring_online, default_lifetime=172800, archive_timeout_override=archive_timeout_override)
elif transfertool == 'globus':
grouped_jobs = bulk_group_transfers_for_globus(transfer_paths, transfertype, group_bulk)
else:
Expand All @@ -197,7 +191,7 @@ def submitter(once=False, rses=None, partition_wait_time=10,
logger(logging.INFO, 'Starting to submit transfers for %s (%s)', activity, external_host)
for job in grouped_jobs:
logger(logging.DEBUG, 'submitjob: %s' % job)
submit_transfer(external_host=external_host, job=job, submitter='transfer_submitter',
submit_transfer(external_host=external_host, transfers=job['transfers'], job_params=job['job_params'], submitter='transfer_submitter',
timeout=timeout, logger=logger, transfertool=transfertool)

if total_transfers < group_bulk:
Expand Down
11 changes: 7 additions & 4 deletions lib/rucio/tests/test_conveyor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#
# Authors:
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - Mayank Sharma <imptodefeat@gmail.com>, 2021
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021

import threading
import time
from datetime import datetime
Expand Down Expand Up @@ -523,12 +526,12 @@ def test_overwrite_on_tape(rse_factory, did_factory, root_account, core_config_m
rule_core.add_rule(dids=[did1, did2], account=root_account, copies=1, rse_expression=rse3, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None)

# Wrap dest url generation to add size_pre=2 query parameter
non_mocked_dest_url = transfer_core.DirectTransferDefinition._dest_url
non_mocked_generate_dest_url = transfer_core.DirectTransferDefinition._generate_dest_url

def mocked_dest_url(cls, *args):
return set_query_parameters(non_mocked_dest_url(*args), {'size_pre': 2})
def mocked_generate_dest_url(cls, *args):
return set_query_parameters(non_mocked_generate_dest_url(*args), {'size_pre': 2})

with patch('rucio.core.transfer.DirectTransferDefinition._dest_url', new=mocked_dest_url):
with patch('rucio.core.transfer.DirectTransferDefinition._generate_dest_url', new=mocked_generate_dest_url):
submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=10, partition_wait_time=None, transfertype='single', filter_transfertool=None)

request = __wait_for_request_state(dst_rse_id=rse3_id, state=RequestState.FAILED, **did1)
Expand Down
3 changes: 2 additions & 1 deletion lib/rucio/tests/test_conveyor_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
# Authors:
# - Radu Carpa <radu.carpa@cern.ch>, 2021

import pytest

import itertools
Expand Down Expand Up @@ -81,7 +82,7 @@ def _forge_requests_creation_time(session=None):
requests_id_in_submission_order = []
with patch('rucio.transfertool.mock.MockTransfertool.submit') as mock_transfertool_submit:
# Record the order of requests passed to MockTranfertool.submit()
mock_transfertool_submit.side_effect = lambda jobs, _: requests_id_in_submission_order.extend([j['metadata']['request_id'] for j in jobs])
mock_transfertool_submit.side_effect = lambda transfers, _: requests_id_in_submission_order.extend([t.rws.request_id for t in transfers])

submitter(once=True, rses=[{'id': rse_id} for _, rse_id in dst_rses], partition_wait_time=None, transfertool='mock', transfertype='single', filter_transfertool=None)

Expand Down
8 changes: 4 additions & 4 deletions lib/rucio/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def test_s3s_fts_src(self):
grouping='NONE', weight=None, lifetime=None, locked=False, subscription_id=None)
[[_host, [transfer_path]]] = next_transfers_to_submit(rses=[self.rsenons3_id]).items()
assert transfer_path[0].rws.rule_id == rule_id[0]
assert transfer_path[0]['sources'][0][1] == expected_src_url
assert transfer_path[0]['dest_urls'][0] == expected_dst_url
assert transfer_path[0].legacy_sources[0][1] == expected_src_url
assert transfer_path[0].dest_url == expected_dst_url

def test_s3s_fts_dst(self):
""" S3: TPC a file from storage to S3 """
Expand All @@ -114,5 +114,5 @@ def test_s3s_fts_dst(self):

[[_host, [transfer_path]]] = next_transfers_to_submit(rses=[self.rses3_id]).items()
assert transfer_path[0].rws.rule_id == rule_id[0]
assert transfer_path[0]['sources'][0][1] == expected_src_url
assert transfer_path[0]['dest_urls'][0] == expected_dst_url
assert transfer_path[0].legacy_sources[0][1] == expected_src_url
assert transfer_path[0].dest_url == expected_dst_url
4 changes: 2 additions & 2 deletions lib/rucio/tests/test_tpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ def test_tpc(containerized_rses, root_account, test_scope, did_factory, rse_clie

[[_host, [transfer_path]]] = next_transfers_to_submit(rses=[rse1_id, rse2_id]).items()
assert transfer_path[0].rws.rule_id == rule_id[0]
src_url = transfer_path[0]['sources'][0][1]
dest_url = transfer_path[0]['dest_urls'][0]
src_url = transfer_path[0].legacy_sources[0][1]
dest_url = transfer_path[0].dest_url
check_url(src_url, rse1_hostname, test_file_expected_pfn)
check_url(dest_url, rse2_hostname, test_file_expected_pfn)

Expand Down
16 changes: 8 additions & 8 deletions lib/rucio/tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ def __fake_source_ranking(source_rse_id, new_ranking, session=None):

# On equal priority and distance, disk should be preferred over tape. Both disk sources will be returned
[[_host, [transfer]]] = next_transfers_to_submit(rses=all_rses).items()
assert len(transfer[0]['sources']) == 2
assert transfer[0]['sources'][0][0] in (disk1_rse_name, disk2_rse_name)
assert len(transfer[0].legacy_sources) == 2
assert transfer[0].legacy_sources[0][0] in (disk1_rse_name, disk2_rse_name)

# Change the rating of the disk RSEs. Tape RSEs must now be preferred.
# Multiple tape sources are not allowed. Only one tape RSE source must be returned.
Expand All @@ -191,19 +191,19 @@ def __fake_source_ranking(source_rse_id, new_ranking, session=None):
[[_host, transfers]] = next_transfers_to_submit(rses=all_rses).items()
assert len(transfers) == 1
transfer = transfers[0]
assert len(transfer[0]['sources']) == 1
assert transfer[0]['sources'][0][0] in (tape1_rse_name, tape2_rse_name)
assert len(transfer[0].legacy_sources) == 1
assert transfer[0].legacy_sources[0][0] in (tape1_rse_name, tape2_rse_name)

# On equal source ranking, but different distance; the smaller distance is preferred
[[_host, [transfer]]] = next_transfers_to_submit(rses=all_rses).items()
assert len(transfer[0]['sources']) == 1
assert transfer[0]['sources'][0][0] == tape2_rse_name
assert len(transfer[0].legacy_sources) == 1
assert transfer[0].legacy_sources[0][0] == tape2_rse_name

# On different source ranking, the bigger ranking is preferred
__fake_source_ranking(tape2_rse_id, -1)
[[_host, [transfer]]] = next_transfers_to_submit(rses=all_rses).items()
assert len(transfer[0]['sources']) == 1
assert transfer[0]['sources'][0][0] == tape1_rse_name
assert len(transfer[0].legacy_sources) == 1
assert transfer[0].legacy_sources[0][0] == tape1_rse_name


@pytest.mark.parametrize("core_config_mock", [{"table_content": [
Expand Down
45 changes: 42 additions & 3 deletions lib/rucio/transfertool/fts3.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
# - Brandon White <bjwhite@fnal.gov>, 2019
# - Joaquín Bogado <jbogado@linti.unlp.edu.ar>, 2020
# - Jaroslav Guenther <jaroslav.guenther@cern.ch>, 2019
# - Thomas Beermann <thomas.beermann@cern.ch>, 2020
# - Thomas Beermann <thomas.beermann@cern.ch>, 2020-2021
# - Radu Carpa <radu.carpa@cern.ch>, 2021

from __future__ import absolute_import, division
import datetime
Expand Down Expand Up @@ -57,7 +58,7 @@
from rucio.common.config import config_get, config_get_bool
from rucio.common.constants import FTS_STATE
from rucio.common.exception import TransferToolTimeout, TransferToolWrongAnswer, DuplicateFileTransferSubmission
from rucio.common.utils import APIEncoder
from rucio.common.utils import APIEncoder, set_checksum_value
from rucio.core.monitor import record_counter, record_timer
from rucio.transfertool.transfertool import Transfertool

Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(self, external_host, token=None):
self.cert = None
self.verify = True # True is the default setting of a requests.* method

def submit(self, files, job_params, timeout=None):
def submit(self, transfers, job_params, timeout=None):
"""
Submit transfers to FTS3 via JSON.
Expand All @@ -119,6 +120,44 @@ def submit(self, files, job_params, timeout=None):
:param timeout: Timeout in seconds.
:returns: FTS transfer identifier.
"""
files = []
for transfer in transfers:
if isinstance(transfer, dict):
# Compatibility with scripts form /tools which directly use transfertools and pass a dict to it instead of transfer definitions
# TODO: ensure that those scripts are still used and get rid of this compatibility otherwise
files.append(transfer)
continue

rws = transfer.rws
t_file = {
'sources': [s[1] for s in transfer.legacy_sources],
'destinations': [transfer.dest_url],
'metadata': {
'request_id': rws.request_id,
'scope': rws.scope,
'name': rws.name,
'activity': rws.activity,
'request_type': rws.request_type,
'src_type': "TAPE" if transfer.src.rse.is_tape_or_staging_required() else 'DISK',
'dst_type': "TAPE" if transfer.dst.rse.is_tape() else 'DISK',
'src_rse': transfer.src.rse.name,
'dst_rse': transfer.dst.rse.name,
'src_rse_id': transfer.src.rse.id,
'dest_rse_id': transfer.dst.rse.id,
'filesize': rws.byte_count,
'md5': rws.md5,
'adler32': rws.adler32
},
'filesize': rws.byte_count,
'checksum': None,
'verify_checksum': job_params['verify_checksum'],
'selection_strategy': transfer['selection_strategy'],
'request_type': rws.request_type,
'activity': rws.activity
}
if t_file['verify_checksum'] != 'none':
set_checksum_value(t_file, transfer['checksums_to_use'])
files.append(t_file)

# FTS3 expects 'davs' as the scheme identifier instead of https
for transfer_file in files:
Expand Down
25 changes: 22 additions & 3 deletions lib/rucio/transfertool/globus.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2019-2020 CERN for the benefit of the ATLAS collaboration.
# -*- coding: utf-8 -*-
# Copyright 2019-2021 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,9 +14,10 @@
# limitations under the License.
#
# Authors:
# - Matt Snyder <msnyder@rcf.rhic.bnl.gov>, 2019-2021
# - Matt Snyder <msnyder@bnl.gov>, 2019-2021
# - Martin Barisits <martin.barisits@cern.ch>, 2019
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
# - Radu Carpa <radu.carpa@cern.ch>, 2021

import logging

Expand Down Expand Up @@ -65,7 +67,7 @@ def submit(self, files, job_params, timeout=None):

return task_id

def bulk_submit(self, submitjob, timeout=None):
def bulk_submit(self, transfers, timeout=None):
"""
Submit a bulk transfer to globus API
Expand All @@ -76,6 +78,23 @@ def bulk_submit(self, submitjob, timeout=None):
"""

# TODO: support passing a recursive parameter to Globus
submitjob = [
{
# Some dict elements are not needed by globus transfertool, but are accessed by further common fts/globus code
'sources': [s[1] for s in transfer.legacy_sources],
'destinations': [transfer.dest_url],
'metadata': {
'src_rse': transfer.src.rse.name,
'dst_rse': transfer.dst.rse.name,
'scope': str(transfer.rws.scope),
'name': transfer.rws.name,
'source_globus_endpoint_id': transfer.src.rse.attributes['globus_endpoint_id'],
'dest_globus_endpoint_id': transfer.dst.rse.attributes['globus_endpoint_id'],
'filesize': transfer.rws.byte_count,
},
}
for transfer in transfers
]
task_id = bulk_submit_xfer(submitjob, recursive=False)

return task_id
Expand Down

0 comments on commit 30f2a77

Please sign in to comment.