Skip to content

Commit

Permalink
Clients: correctly propagate selected domain in uploadclient. rucio#4639
Browse files Browse the repository at this point in the history


The upload client has some logic to select the 'lan' domain if the
client and rse 'site' are identical. This results into correctly
selecting the `scheme` which can be used for a lan transfer.
However, the domain is not enforced when the protocol object is
latter created. This will result in using the default "wan" domain
for the upload if a protocol with the same scheme is available on wan.
  • Loading branch information
rcarpa committed May 26, 2021
1 parent c057604 commit cff6334
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 7 deletions.
15 changes: 8 additions & 7 deletions lib/rucio/client/uploadclient.py
Expand Up @@ -272,6 +272,7 @@ def _pick_random_rse(rse_expression):
rse_attributes=rse_attributes,
lfn=lfn,
source_dir=file['dirname'],
domain=domain,
force_scheme=cur_scheme,
force_pfn=pfn,
transfer_timeout=file.get('transfer_timeout'),
Expand Down Expand Up @@ -538,7 +539,7 @@ def _convert_file_for_api(self, file):
replica['pfn'] = pfn
return replica

def _upload_item(self, rse_settings, rse_attributes, lfn, source_dir=None, force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None):
def _upload_item(self, rse_settings, rse_attributes, lfn, source_dir=None, domain='wan', force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None):
"""
Uploads a file to the connected storage.
Expand All @@ -559,8 +560,8 @@ def _upload_item(self, rse_settings, rse_attributes, lfn, source_dir=None, force
logger = self.logger

# Construct protocol for write and read operation.
protocol_write = self._create_protocol(rse_settings, 'write', force_scheme=force_scheme)
protocol_read = self._create_protocol(rse_settings, 'read')
protocol_write = self._create_protocol(rse_settings, 'write', force_scheme=force_scheme, domain=domain)
protocol_read = self._create_protocol(rse_settings, 'read', domain=domain)

base_name = lfn.get('filename', lfn['name'])
name = lfn.get('name', base_name)
Expand Down Expand Up @@ -603,7 +604,7 @@ def _upload_item(self, rse_settings, rse_attributes, lfn, source_dir=None, force
logger(logging.DEBUG, 'Removing remains of previous upload attemtps.')
try:
# Construct protocol for delete operation.
protocol_delete = self._create_protocol(rse_settings, 'delete')
protocol_delete = self._create_protocol(rse_settings, 'delete', domain=domain)
protocol_delete.delete('%s.rucio.upload' % list(protocol_delete.lfns2pfns(make_valid_did(lfn)).values())[0])
protocol_delete.close()
except Exception as e:
Expand All @@ -614,7 +615,7 @@ def _upload_item(self, rse_settings, rse_attributes, lfn, source_dir=None, force
logger(logging.DEBUG, 'Removing not-registered remains of previous upload attemtps.')
try:
# Construct protocol for delete operation.
protocol_delete = self._create_protocol(rse_settings, 'delete')
protocol_delete = self._create_protocol(rse_settings, 'delete', domain=domain)
protocol_delete.delete('%s' % list(protocol_delete.lfns2pfns(make_valid_did(lfn)).values())[0])
protocol_delete.close()
except Exception as error:
Expand Down Expand Up @@ -693,7 +694,7 @@ def _retry_protocol_stat(self, protocol, pfn):
time.sleep(2**attempt)
return protocol.stat(pfn)

def _create_protocol(self, rse_settings, operation, force_scheme=None):
def _create_protocol(self, rse_settings, operation, force_scheme=None, domain='wan'):
"""
Protol construction.
:param: rse_settings rse_settings
Expand All @@ -702,7 +703,7 @@ def _create_protocol(self, rse_settings, operation, force_scheme=None):
:param auth_token: Optionally passing JSON Web Token (OIDC) string for authentication
"""
try:
protocol = rsemgr.create_protocol(rse_settings, operation, scheme=force_scheme, auth_token=self.auth_token, logger=self.logger)
protocol = rsemgr.create_protocol(rse_settings, operation, scheme=force_scheme, domain=domain, auth_token=self.auth_token, logger=self.logger)
protocol.connect()
except Exception as error:
self.logger(logging.WARNING, 'Failed to create protocol for operation: %s' % operation)
Expand Down
75 changes: 75 additions & 0 deletions lib/rucio/tests/test_upload.py
@@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
# Copyright 2021 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# - Radu Carpa <radu.carpa@cern.ch>, 2021

import json
import os
from rucio.common.utils import generate_uuid
from rucio.tests.common import file_generator
from rucio.core.rse import add_protocol, add_rse_attribute

def test_multiple_protocols_same_scheme(rse_factory, did_factory, mock_scope, tmp_path):
""" Upload (CLIENT): Ensure domain correctly selected when multiple protocols exist with the same scheme """

rse, rse_id = rse_factory.make_rse()

# Ensure client site and rse site are identical. So that "lan" is preferred.
add_rse_attribute(rse_id, 'site', 'ROAMING')

add_protocol(rse_id, {'scheme': 'file',
'hostname': 'file-wan.aperture.com',
'port': 0,
'prefix': '/prefix1/',
'impl': 'rucio.rse.protocols.posix.Default',
'domains': {
'lan': {'read': 0, 'write': 0, 'delete': 0},
'wan': {'read': 1, 'write': 1, 'delete': 1}}})
add_protocol(rse_id, {'scheme': 'file',
'hostname': 'file-lan.aperture.com',
'port': 0,
'prefix': '/prefix2/',
'impl': 'rucio.rse.protocols.posix.Default',
'domains': {
'lan': {'read': 1, 'write': 1, 'delete': 1},
'wan': {'read': 0, 'write': 0, 'delete': 0}}})
add_protocol(rse_id, {'scheme': 'root',
'hostname': 'root.aperture.com',
'port': 1403,
'prefix': '/prefix3/',
'impl': 'rucio.rse.protocols.xrootd.Default',
'domains': {
'lan': {'read': 2, 'write': 2, 'delete': 2},
'wan': {'read': 2, 'write': 2, 'delete': 2}}})


# Upload a file
path = file_generator()
name = os.path.basename(path)
item = {
'path': path,
'rse': rse,
'did_scope': str(mock_scope),
'did_name': name,
'guid': generate_uuid(),
}
summary_path = tmp_path / 'summary'
did_factory.upload_client.upload([item], summary_file_path=summary_path)

# Verify that the lan protocol was used for the upload
with open(summary_path) as json_file:
data = json.load(json_file)
assert 'file-lan.aperture.com' in data['{}:{}'.format(mock_scope, name)]['pfn']

0 comments on commit cff6334

Please sign in to comment.