From 2bede945e2e5d7342ac856022479e5a5512d210b Mon Sep 17 00:00:00 2001 From: James Perry Date: Tue, 15 Feb 2022 09:25:34 +0000 Subject: [PATCH] Bug: Fix objectstore uploads on multi VO #5235 --- lib/rucio/client/uploadclient.py | 10 +++---- lib/rucio/daemons/automatix/automatix.py | 5 ++-- lib/rucio/rse/rsemanager.py | 14 +++++----- lib/rucio/tests/rsemgr_api_test.py | 33 +++++++++++++----------- 4 files changed, 34 insertions(+), 28 deletions(-) diff --git a/lib/rucio/client/uploadclient.py b/lib/rucio/client/uploadclient.py index bcba4a51acd..0e4c73ee41e 100644 --- a/lib/rucio/client/uploadclient.py +++ b/lib/rucio/client/uploadclient.py @@ -24,7 +24,7 @@ # - Tomas Javurek , 2018-2020 # - Ale Di Girolamo , 2018 # - Hannes Hansen , 2018 -# - James Perry , 2019-2020 +# - James Perry , 2019-2022 # - Boris Bauermeister , 2019 # - David Cameron , 2019 # - Gabriele Fronze' , 2019 @@ -230,7 +230,7 @@ def _pick_random_rse(rse_expression): # if register_after_upload, file should be overwritten if it is not registered # otherwise if file already exists on RSE we're done if register_after_upload: - if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, logger=logger): + if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger): try: self.client.get_did(file['did_scope'], file['did_name']) logger(logging.INFO, 'File already registered. Skipping upload.') @@ -240,16 +240,16 @@ def _pick_random_rse(rse_expression): logger(logging.INFO, 'File already exists on RSE. Previous left overs will be overwritten.') delete_existing = True elif not is_deterministic and not no_register: - if rsemgr.exists(rse_settings, pfn, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, logger=logger): + if rsemgr.exists(rse_settings, pfn, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger): logger(logging.INFO, 'File already exists on RSE with given pfn. Skipping upload. Existing replica has to be removed first.') trace['stateReason'] = 'File already exists' continue - elif rsemgr.exists(rse_settings, file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, logger=logger): + elif rsemgr.exists(rse_settings, file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger): logger(logging.INFO, 'File already exists on RSE with different pfn. Skipping upload.') trace['stateReason'] = 'File already exists' continue else: - if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, logger=logger): + if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger): logger(logging.INFO, 'File already exists on RSE. Skipping upload') trace['stateReason'] = 'File already exists' continue diff --git a/lib/rucio/daemons/automatix/automatix.py b/lib/rucio/daemons/automatix/automatix.py index a772a40e7a1..d0397cf7df6 100644 --- a/lib/rucio/daemons/automatix/automatix.py +++ b/lib/rucio/daemons/automatix/automatix.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2018-2021 CERN +# Copyright 2018-2022 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ # - Benedikt Ziemons , 2020 # - David PoblaciĆ³n Criado , 2021 # - Radu Carpa , 2021 +# - James Perry , 2022 from __future__ import division @@ -86,7 +87,7 @@ def upload(files, scope, metadata, rse, account, source_dir, dataset_lifetime, d try: success_upload = True for cnt in range(0, 3): - rows = rsemgr.upload(rse_info, lfns=lfns, source_dir=source_dir, logger=logger) + rows = rsemgr.upload(rse_info, lfns=lfns, source_dir=source_dir, vo=client.vo, logger=logger) # temporary hack global_status, ret = rows['success'], rows[1] logger(logging.INFO, 'Returned global status : %s, Returned : %s', str(global_status), str(ret)) diff --git a/lib/rucio/rse/rsemanager.py b/lib/rucio/rse/rsemanager.py index c62940d3649..9f1bf616d7b 100644 --- a/lib/rucio/rse/rsemanager.py +++ b/lib/rucio/rse/rsemanager.py @@ -28,7 +28,7 @@ # - Nicolo Magini , 2018 # - Tomas Javurek , 2018-2020 # - Hannes Hansen , 2018-2019 -# - James Perry , 2019 +# - James Perry , 2019-2022 # - Andrew Lister , 2019 # - Gabriele Fronze' , 2019 # - Jaroslav Guenther , 2019-2020 @@ -265,7 +265,7 @@ def parse_pfns(rse_settings, pfns, operation='read', domain='wan', auth_token=No return create_protocol(rse_settings, operation, urlparse(pfns[0]).scheme, domain, auth_token=auth_token).parse_pfns(pfns) -def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token=None, logger=logging.log): +def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token=None, vo='def', logger=logging.log): """ Checks if a file is present at the connected storage. Providing a list indicates the bulk mode. @@ -276,6 +276,7 @@ def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token E.g. {'name': '2_rse_remote_get.raw', 'scope': 'user.jdoe'}, {'name': 'user/jdoe/5a/98/3_rse_remote_get.raw'} :param domain: The network domain, either 'wan' (default) or 'lan' :param auth_token: Optionally passing JSON Web Token (OIDC) string for authentication + :param vo: The VO for the RSE :param logger: Optional decorated logger that can be passed from the calling daemons or servers. :returns: True/False for a single file or a dict object with 'scope:name' for LFNs or 'name' for PFNs as keys and True or the exception as value for each file in bulk mode @@ -309,7 +310,7 @@ def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token logger(logging.DEBUG, 'Checking if %s exists', pfn) # deal with URL signing if required if rse_settings['sign_url'] is not None and pfn[:5] == 'https': - pfn = __get_signed_url(rse_settings['rse'], rse_settings['sign_url'], 'read', pfn) # NOQA pylint: disable=undefined-variable + pfn = __get_signed_url(rse_settings['rse'], rse_settings['sign_url'], 'read', pfn, vo) # NOQA pylint: disable=undefined-variable exists = protocol.exists(pfn) ret[f['scope'] + ':' + f['name']] = exists else: @@ -325,7 +326,7 @@ def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token return [gs, ret] -def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None, auth_token=None, logger=logging.log, impl=None): +def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None, auth_token=None, vo='def', logger=logging.log, impl=None): """ Uploads a file to the connected storage. Providing a list indicates the bulk mode. @@ -345,6 +346,7 @@ def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, fo :param transfer_timeout: set this timeout (in seconds) for the transfers, for protocols that support it :param sign_service: use the given service (e.g. gcs, s3, swift) to sign the URL :param auth_token: Optionally passing JSON Web Token (OIDC) string for authentication + :param vo: The VO for the RSE :param logger: Optional decorated logger that can be passed from the calling daemons or servers. :returns: True/False for a single file or a dict object with 'scope:name' as keys and True or the exception as value for each file in bulk mode @@ -385,8 +387,8 @@ def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, fo readpfn = pfn if sign_service is not None: # need a separate signed URL for read operations (exists and stat) - readpfn = __get_signed_url(rse_settings['rse'], sign_service, 'read', pfn) # NOQA pylint: disable=undefined-variable - pfn = __get_signed_url(rse_settings['rse'], sign_service, 'write', pfn) # NOQA pylint: disable=undefined-variable + readpfn = __get_signed_url(rse_settings['rse'], sign_service, 'read', pfn, vo) # NOQA pylint: disable=undefined-variable + pfn = __get_signed_url(rse_settings['rse'], sign_service, 'write', pfn, vo) # NOQA pylint: disable=undefined-variable # First check if renaming operation is supported if protocol.renaming: diff --git a/lib/rucio/tests/rsemgr_api_test.py b/lib/rucio/tests/rsemgr_api_test.py index e8b9bbaf8a4..34b9ade30b2 100644 --- a/lib/rucio/tests/rsemgr_api_test.py +++ b/lib/rucio/tests/rsemgr_api_test.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2012-2021 CERN +# Copyright 2012-2022 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ # - Mayank Sharma , 2021 # - Radu Carpa , 2021 # - Rakshita Varadarajan , 2021 +# - James Perry , 2022 from __future__ import print_function @@ -72,6 +73,7 @@ def __init__(self, tmpdir, rse_tag, user, static_file, vo='def', impl=None): self.impl = 'rucio.rse.protocols.' + impl + '.Default' else: self.impl = 'rucio.rse.protocols.' + impl + self.vo = vo def setup_scheme(self, scheme): """(RSE/PROTOCOLS): Make mgr to select this scheme first.""" @@ -138,7 +140,8 @@ def test_put_mgr_ok_multi(self): {'name': '2_rse_local_put.raw', 'scope': 'user.%s' % self.user, 'md5': md5(str(self.tmpdir) + '/2_rse_local_put.raw'), 'filesize': os.stat('%s/2_rse_local_put.raw' % self.tmpdir)[ - os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, impl=self.impl) + os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, vo=self.vo, + impl=self.impl) else: result = mgr.upload(self.rse_settings, [{'name': '1_rse_local_put.raw', 'scope': 'user.%s' % self.user, 'adler32': adler32('%s/1_rse_local_put.raw' % self.tmpdir), @@ -147,7 +150,7 @@ def test_put_mgr_ok_multi(self): {'name': '2_rse_local_put.raw', 'scope': 'user.%s' % self.user, 'adler32': adler32('%s/2_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/2_rse_local_put.raw' % self.tmpdir)[ - os.path.stat.ST_SIZE]}], source_dir=self.tmpdir) + os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, vo=self.vo) status = result[0] details = result[1] @@ -158,17 +161,17 @@ def test_put_mgr_ok_single(self): """(RSE/PROTOCOLS): Put a single file to storage (Success)""" if self.rse_settings['protocols'][0]['hostname'] == 'ssh1': mgr.upload(self.rse_settings, {'name': '3_rse_local_put.raw', 'scope': 'user.%s' % self.user, - 'md5': md5('%s/3_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/3_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}, source_dir=self.tmpdir, impl=self.impl) + 'md5': md5('%s/3_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/3_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}, source_dir=self.tmpdir, vo=self.vo, impl=self.impl) else: mgr.upload(self.rse_settings, {'name': '3_rse_local_put.raw', 'scope': 'user.%s' % self.user, - 'adler32': adler32('%s/3_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/3_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}, source_dir=self.tmpdir) + 'adler32': adler32('%s/3_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/3_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}, source_dir=self.tmpdir, vo=self.vo) def test_put_mgr_SourceNotFound_multi(self): """(RSE/PROTOCOLS): Put multiple files to storage (SourceNotFound)""" result = mgr.upload(self.rse_settings, [{'name': 'not_existing_data.raw', 'scope': 'user.%s' % self.user, 'adler32': 'some_random_stuff', 'filesize': 4711}, {'name': '4_rse_local_put.raw', 'scope': 'user.%s' % self.user, - 'adler32': adler32('%s/4_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/4_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, impl=self.impl) + 'adler32': adler32('%s/4_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/4_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, vo=self.vo, impl=self.impl) status = result[0] details = result[1] if details['user.%s:4_rse_local_put.raw' % self.user]: @@ -178,12 +181,12 @@ def test_put_mgr_SourceNotFound_multi(self): def test_put_mgr_SourceNotFound_single(self): """(RSE/PROTOCOLS): Put a single file to storage (SourceNotFound)""" - mgr.upload(self.rse_settings, {'name': 'not_existing_data2.raw', 'scope': 'user.%s' % self.user, 'adler32': 'random_stuff', 'filesize': 0}, source_dir=self.tmpdir, impl=self.impl) + mgr.upload(self.rse_settings, {'name': 'not_existing_data2.raw', 'scope': 'user.%s' % self.user, 'adler32': 'random_stuff', 'filesize': 0}, source_dir=self.tmpdir, vo=self.vo, impl=self.impl) def test_put_mgr_FileReplicaAlreadyExists_multi(self): """(RSE/PROTOCOLS): Put multiple files to storage (FileReplicaAlreadyExists)""" result = mgr.upload(self.rse_settings, [{'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': "bla-bla", 'filesize': 4711}, - {'name': '2_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': "bla-bla", 'filesize': 4711}], source_dir=self.tmpdir, impl=self.impl) + {'name': '2_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': "bla-bla", 'filesize': 4711}], source_dir=self.tmpdir, vo=self.vo, impl=self.impl) status = result[0] details = result[1] if details['user.%s:1_rse_remote_get.raw' % self.user]: @@ -193,7 +196,7 @@ def test_put_mgr_FileReplicaAlreadyExists_multi(self): def test_put_mgr_FileReplicaAlreadyExists_single(self): """(RSE/PROTOCOLS): Put a single file to storage (FileReplicaAlreadyExists)""" - mgr.upload(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': 'bla-bla', 'filesize': 4711}, source_dir=self.tmpdir, impl=self.impl) + mgr.upload(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': 'bla-bla', 'filesize': 4711}, source_dir=self.tmpdir, vo=self.vo, impl=self.impl) # MGR-Tests: DELETE def test_delete_mgr_ok_multi(self): @@ -235,18 +238,18 @@ def test_exists_mgr_ok_multi(self): status, details = mgr.exists(self.rse_settings, [{'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, {'name': '2_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, {'name': pfn_a}, - {'name': pfn_b}], impl=self.impl) + {'name': pfn_b}], impl=self.impl, vo=self.vo) if not (status and details['user.%s:1_rse_remote_get.raw' % self.user] and details['user.%s:2_rse_remote_get.raw' % self.user] and details[pfn_a] and details[pfn_b]): raise Exception('Return not as expected: %s, %s' % (status, details)) def test_exists_mgr_ok_single_lfn(self): """(RSE/PROTOCOLS): Check a single file on storage using LFN (Success)""" - mgr.exists(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, impl=self.impl) + mgr.exists(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, impl=self.impl, vo=self.vo) def test_exists_mgr_ok_single_pfn(self): """(RSE/PROTOCOLS): Check a single file on storage using PFN (Success)""" pfn = list(mgr.lfns2pfns(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, impl=self.impl).values())[0] - mgr.exists(self.rse_settings, {'name': pfn}, impl=self.impl) + mgr.exists(self.rse_settings, {'name': pfn}, impl=self.impl, vo=self.vo) def test_exists_mgr_false_multi(self): """(RSE/PROTOCOLS): Check multiple files on storage (Fail)""" @@ -255,18 +258,18 @@ def test_exists_mgr_false_multi(self): status, details = mgr.exists(self.rse_settings, [{'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, {'name': 'not_existing_data.raw', 'scope': 'user.%s' % self.user}, {'name': pfn_a}, - {'name': pfn_b}], impl=self.impl) + {'name': pfn_b}], impl=self.impl, vo=self.vo) if status or not details['user.%s:1_rse_remote_get.raw' % self.user] or details['user.%s:not_existing_data.raw' % self.user] or not details[pfn_a] or details[pfn_b]: raise Exception('Return not as expected: %s, %s' % (status, details)) def test_exists_mgr_false_single_lfn(self): """(RSE/PROTOCOLS): Check a single file on storage using LFN (Fail)""" - not mgr.exists(self.rse_settings, {'name': 'not_existing_data.raw', 'scope': 'user.%s' % self.user}, impl=self.impl) + not mgr.exists(self.rse_settings, {'name': 'not_existing_data.raw', 'scope': 'user.%s' % self.user}, impl=self.impl, vo=self.vo) def test_exists_mgr_false_single_pfn(self): """(RSE/PROTOCOLS): Check a single file on storage using PFN (Fail)""" pfn = list(mgr.lfns2pfns(self.rse_settings, {'name': '1_rse_not_existing.raw', 'scope': 'user.%s' % self.user}, impl=self.impl).values())[0] - not mgr.exists(self.rse_settings, {'name': pfn}, impl=self.impl) + not mgr.exists(self.rse_settings, {'name': pfn}, impl=self.impl, vo=self.vo) # MGR-Tests: RENAME def test_rename_mgr_ok_multi(self):