Skip to content

Commit

Permalink
Bug: Fix objectstore uploads on multi VO rucio#5235
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesp-epcc committed Feb 15, 2022
1 parent e7f1f44 commit 35c0b07
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 29 deletions.
12 changes: 6 additions & 6 deletions lib/rucio/client/uploadclient.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2012-2021 CERN
# Copyright 2012-2022 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@
# - Tomas Javurek <tomas.javurek@cern.ch>, 2018-2020
# - Ale Di Girolamo <alessandro.di.girolamo@cern.ch>, 2018
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - James Perry <j.perry@epcc.ed.ac.uk>, 2019-2020
# - James Perry <j.perry@epcc.ed.ac.uk>, 2019-2022
# - Boris Bauermeister <boris.bauermeister@fysik.su.se>, 2019
# - David Cameron <david.cameron@cern.ch>, 2019
# - Gabriele Fronze' <gfronze@cern.ch>, 2019
Expand Down Expand Up @@ -226,7 +226,7 @@ def _pick_random_rse(rse_expression):
# if register_after_upload, file should be overwritten if it is not registered
# otherwise if file already exists on RSE we're done
if register_after_upload:
if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, logger=logger):
if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger):
try:
self.client.get_did(file['did_scope'], file['did_name'])
logger(logging.INFO, 'File already registered. Skipping upload.')
Expand All @@ -236,16 +236,16 @@ def _pick_random_rse(rse_expression):
logger(logging.INFO, 'File already exists on RSE. Previous left overs will be overwritten.')
delete_existing = True
elif not is_deterministic and not no_register:
if rsemgr.exists(rse_settings, pfn, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, logger=logger):
if rsemgr.exists(rse_settings, pfn, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger):
logger(logging.INFO, 'File already exists on RSE with given pfn. Skipping upload. Existing replica has to be removed first.')
trace['stateReason'] = 'File already exists'
continue
elif rsemgr.exists(rse_settings, file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, logger=logger):
elif rsemgr.exists(rse_settings, file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger):
logger(logging.INFO, 'File already exists on RSE with different pfn. Skipping upload.')
trace['stateReason'] = 'File already exists'
continue
else:
if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, logger=logger):
if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger):
logger(logging.INFO, 'File already exists on RSE. Skipping upload')
trace['stateReason'] = 'File already exists'
continue
Expand Down
5 changes: 3 additions & 2 deletions lib/rucio/daemons/automatix/automatix.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2018-2021 CERN
# Copyright 2018-2022 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - James Perry <j.perry@epcc.ed.ac.uk>, 2022

from __future__ import division

Expand Down Expand Up @@ -86,7 +87,7 @@ def upload(files, scope, metadata, rse, account, source_dir, dataset_lifetime, d
try:
success_upload = True
for cnt in range(0, 3):
rows = rsemgr.upload(rse_info, lfns=lfns, source_dir=source_dir, logger=logger)
rows = rsemgr.upload(rse_info, lfns=lfns, source_dir=source_dir, vo=client.vo, logger=logger)
# temporary hack
global_status, ret = rows['success'], rows[1]
logger(logging.INFO, 'Returned global status : %s, Returned : %s', str(global_status), str(ret))
Expand Down
14 changes: 8 additions & 6 deletions lib/rucio/rse/rsemanager.py
Expand Up @@ -28,7 +28,7 @@
# - Nicolo Magini <nicolo.magini@cern.ch>, 2018
# - Tomas Javurek <tomas.javurek@cern.ch>, 2018-2020
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
# - James Perry <j.perry@epcc.ed.ac.uk>, 2019
# - James Perry <j.perry@epcc.ed.ac.uk>, 2019-2022
# - Andrew Lister <andrew.lister@stfc.ac.uk>, 2019
# - Gabriele Fronze' <gfronze@cern.ch>, 2019
# - Jaroslav Guenther <jaroslav.guenther@cern.ch>, 2019-2020
Expand Down Expand Up @@ -265,7 +265,7 @@ def parse_pfns(rse_settings, pfns, operation='read', domain='wan', auth_token=No
return create_protocol(rse_settings, operation, urlparse(pfns[0]).scheme, domain, auth_token=auth_token).parse_pfns(pfns)


def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token=None, logger=logging.log):
def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token=None, vo='def', logger=logging.log):
"""
Checks if a file is present at the connected storage.
Providing a list indicates the bulk mode.
Expand All @@ -276,6 +276,7 @@ def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token
E.g. {'name': '2_rse_remote_get.raw', 'scope': 'user.jdoe'}, {'name': 'user/jdoe/5a/98/3_rse_remote_get.raw'}
:param domain: The network domain, either 'wan' (default) or 'lan'
:param auth_token: Optionally passing JSON Web Token (OIDC) string for authentication
:param vo: The VO for the RSE
:param logger: Optional decorated logger that can be passed from the calling daemons or servers.
:returns: True/False for a single file or a dict object with 'scope:name' for LFNs or 'name' for PFNs as keys and True or the exception as value for each file in bulk mode
Expand Down Expand Up @@ -309,7 +310,7 @@ def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token
logger(logging.DEBUG, 'Checking if %s exists', pfn)
# deal with URL signing if required
if rse_settings['sign_url'] is not None and pfn[:5] == 'https':
pfn = __get_signed_url(rse_settings['rse'], rse_settings['sign_url'], 'read', pfn) # NOQA pylint: disable=undefined-variable
pfn = __get_signed_url(rse_settings['rse'], rse_settings['sign_url'], 'read', pfn, vo) # NOQA pylint: disable=undefined-variable
exists = protocol.exists(pfn)
ret[f['scope'] + ':' + f['name']] = exists
else:
Expand All @@ -325,7 +326,7 @@ def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token
return [gs, ret]


def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None, auth_token=None, logger=logging.log, impl=None):
def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None, auth_token=None, vo='def', logger=logging.log, impl=None):
"""
Uploads a file to the connected storage.
Providing a list indicates the bulk mode.
Expand All @@ -345,6 +346,7 @@ def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, fo
:param transfer_timeout: set this timeout (in seconds) for the transfers, for protocols that support it
:param sign_service: use the given service (e.g. gcs, s3, swift) to sign the URL
:param auth_token: Optionally passing JSON Web Token (OIDC) string for authentication
:param vo: The VO for the RSE
:param logger: Optional decorated logger that can be passed from the calling daemons or servers.
:returns: True/False for a single file or a dict object with 'scope:name' as keys and True or the exception as value for each file in bulk mode
Expand Down Expand Up @@ -385,8 +387,8 @@ def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, fo
readpfn = pfn
if sign_service is not None:
# need a separate signed URL for read operations (exists and stat)
readpfn = __get_signed_url(rse_settings['rse'], sign_service, 'read', pfn) # NOQA pylint: disable=undefined-variable
pfn = __get_signed_url(rse_settings['rse'], sign_service, 'write', pfn) # NOQA pylint: disable=undefined-variable
readpfn = __get_signed_url(rse_settings['rse'], sign_service, 'read', pfn, vo) # NOQA pylint: disable=undefined-variable
pfn = __get_signed_url(rse_settings['rse'], sign_service, 'write', pfn, vo) # NOQA pylint: disable=undefined-variable

# First check if renaming operation is supported
if protocol.renaming:
Expand Down
33 changes: 18 additions & 15 deletions lib/rucio/tests/rsemgr_api_test.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2012-2021 CERN
# Copyright 2012-2022 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
# - Mayank Sharma <mayank.sharma@cern.ch>, 2021
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - Rakshita Varadarajan <rakshitajps@gmail.com>, 2021
# - James Perry <j.perry@epcc.ed.ac.uk>, 2022

from __future__ import print_function

Expand Down Expand Up @@ -72,6 +73,7 @@ def __init__(self, tmpdir, rse_tag, user, static_file, vo='def', impl=None):
self.impl = 'rucio.rse.protocols.' + impl + '.Default'
else:
self.impl = 'rucio.rse.protocols.' + impl
self.vo = vo

def setup_scheme(self, scheme):
"""(RSE/PROTOCOLS): Make mgr to select this scheme first."""
Expand Down Expand Up @@ -138,7 +140,8 @@ def test_put_mgr_ok_multi(self):
{'name': '2_rse_local_put.raw', 'scope': 'user.%s' % self.user,
'md5': md5(str(self.tmpdir) + '/2_rse_local_put.raw'),
'filesize': os.stat('%s/2_rse_local_put.raw' % self.tmpdir)[
os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, impl=self.impl)
os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, vo=self.vo,
impl=self.impl)
else:
result = mgr.upload(self.rse_settings, [{'name': '1_rse_local_put.raw', 'scope': 'user.%s' % self.user,
'adler32': adler32('%s/1_rse_local_put.raw' % self.tmpdir),
Expand All @@ -147,7 +150,7 @@ def test_put_mgr_ok_multi(self):
{'name': '2_rse_local_put.raw', 'scope': 'user.%s' % self.user,
'adler32': adler32('%s/2_rse_local_put.raw' % self.tmpdir),
'filesize': os.stat('%s/2_rse_local_put.raw' % self.tmpdir)[
os.path.stat.ST_SIZE]}], source_dir=self.tmpdir)
os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, vo=self.vo)

status = result[0]
details = result[1]
Expand All @@ -158,17 +161,17 @@ def test_put_mgr_ok_single(self):
"""(RSE/PROTOCOLS): Put a single file to storage (Success)"""
if self.rse_settings['protocols'][0]['hostname'] == 'ssh1':
mgr.upload(self.rse_settings, {'name': '3_rse_local_put.raw', 'scope': 'user.%s' % self.user,
'md5': md5('%s/3_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/3_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}, source_dir=self.tmpdir, impl=self.impl)
'md5': md5('%s/3_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/3_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}, source_dir=self.tmpdir, vo=self.vo, impl=self.impl)
else:
mgr.upload(self.rse_settings, {'name': '3_rse_local_put.raw', 'scope': 'user.%s' % self.user,
'adler32': adler32('%s/3_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/3_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}, source_dir=self.tmpdir)
'adler32': adler32('%s/3_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/3_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}, source_dir=self.tmpdir, vo=self.vo)

def test_put_mgr_SourceNotFound_multi(self):
"""(RSE/PROTOCOLS): Put multiple files to storage (SourceNotFound)"""
result = mgr.upload(self.rse_settings, [{'name': 'not_existing_data.raw', 'scope': 'user.%s' % self.user,
'adler32': 'some_random_stuff', 'filesize': 4711},
{'name': '4_rse_local_put.raw', 'scope': 'user.%s' % self.user,
'adler32': adler32('%s/4_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/4_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, impl=self.impl)
'adler32': adler32('%s/4_rse_local_put.raw' % self.tmpdir), 'filesize': os.stat('%s/4_rse_local_put.raw' % self.tmpdir)[os.path.stat.ST_SIZE]}], source_dir=self.tmpdir, vo=self.vo, impl=self.impl)
status = result[0]
details = result[1]
if details['user.%s:4_rse_local_put.raw' % self.user]:
Expand All @@ -178,12 +181,12 @@ def test_put_mgr_SourceNotFound_multi(self):

def test_put_mgr_SourceNotFound_single(self):
"""(RSE/PROTOCOLS): Put a single file to storage (SourceNotFound)"""
mgr.upload(self.rse_settings, {'name': 'not_existing_data2.raw', 'scope': 'user.%s' % self.user, 'adler32': 'random_stuff', 'filesize': 0}, source_dir=self.tmpdir, impl=self.impl)
mgr.upload(self.rse_settings, {'name': 'not_existing_data2.raw', 'scope': 'user.%s' % self.user, 'adler32': 'random_stuff', 'filesize': 0}, source_dir=self.tmpdir, vo=self.vo, impl=self.impl)

def test_put_mgr_FileReplicaAlreadyExists_multi(self):
"""(RSE/PROTOCOLS): Put multiple files to storage (FileReplicaAlreadyExists)"""
result = mgr.upload(self.rse_settings, [{'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': "bla-bla", 'filesize': 4711},
{'name': '2_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': "bla-bla", 'filesize': 4711}], source_dir=self.tmpdir, impl=self.impl)
{'name': '2_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': "bla-bla", 'filesize': 4711}], source_dir=self.tmpdir, vo=self.vo, impl=self.impl)
status = result[0]
details = result[1]
if details['user.%s:1_rse_remote_get.raw' % self.user]:
Expand All @@ -193,7 +196,7 @@ def test_put_mgr_FileReplicaAlreadyExists_multi(self):

def test_put_mgr_FileReplicaAlreadyExists_single(self):
"""(RSE/PROTOCOLS): Put a single file to storage (FileReplicaAlreadyExists)"""
mgr.upload(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': 'bla-bla', 'filesize': 4711}, source_dir=self.tmpdir, impl=self.impl)
mgr.upload(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user, 'adler32': 'bla-bla', 'filesize': 4711}, source_dir=self.tmpdir, vo=self.vo, impl=self.impl)

# MGR-Tests: DELETE
def test_delete_mgr_ok_multi(self):
Expand Down Expand Up @@ -235,18 +238,18 @@ def test_exists_mgr_ok_multi(self):
status, details = mgr.exists(self.rse_settings, [{'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user},
{'name': '2_rse_remote_get.raw', 'scope': 'user.%s' % self.user},
{'name': pfn_a},
{'name': pfn_b}], impl=self.impl)
{'name': pfn_b}], impl=self.impl, vo=self.vo)
if not (status and details['user.%s:1_rse_remote_get.raw' % self.user] and details['user.%s:2_rse_remote_get.raw' % self.user] and details[pfn_a] and details[pfn_b]):
raise Exception('Return not as expected: %s, %s' % (status, details))

def test_exists_mgr_ok_single_lfn(self):
"""(RSE/PROTOCOLS): Check a single file on storage using LFN (Success)"""
mgr.exists(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, impl=self.impl)
mgr.exists(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, impl=self.impl, vo=self.vo)

def test_exists_mgr_ok_single_pfn(self):
"""(RSE/PROTOCOLS): Check a single file on storage using PFN (Success)"""
pfn = list(mgr.lfns2pfns(self.rse_settings, {'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user}, impl=self.impl).values())[0]
mgr.exists(self.rse_settings, {'name': pfn}, impl=self.impl)
mgr.exists(self.rse_settings, {'name': pfn}, impl=self.impl, vo=self.vo)

def test_exists_mgr_false_multi(self):
"""(RSE/PROTOCOLS): Check multiple files on storage (Fail)"""
Expand All @@ -255,18 +258,18 @@ def test_exists_mgr_false_multi(self):
status, details = mgr.exists(self.rse_settings, [{'name': '1_rse_remote_get.raw', 'scope': 'user.%s' % self.user},
{'name': 'not_existing_data.raw', 'scope': 'user.%s' % self.user},
{'name': pfn_a},
{'name': pfn_b}], impl=self.impl)
{'name': pfn_b}], impl=self.impl, vo=self.vo)
if status or not details['user.%s:1_rse_remote_get.raw' % self.user] or details['user.%s:not_existing_data.raw' % self.user] or not details[pfn_a] or details[pfn_b]:
raise Exception('Return not as expected: %s, %s' % (status, details))

def test_exists_mgr_false_single_lfn(self):
"""(RSE/PROTOCOLS): Check a single file on storage using LFN (Fail)"""
not mgr.exists(self.rse_settings, {'name': 'not_existing_data.raw', 'scope': 'user.%s' % self.user}, impl=self.impl)
not mgr.exists(self.rse_settings, {'name': 'not_existing_data.raw', 'scope': 'user.%s' % self.user}, impl=self.impl, vo=self.vo)

def test_exists_mgr_false_single_pfn(self):
"""(RSE/PROTOCOLS): Check a single file on storage using PFN (Fail)"""
pfn = list(mgr.lfns2pfns(self.rse_settings, {'name': '1_rse_not_existing.raw', 'scope': 'user.%s' % self.user}, impl=self.impl).values())[0]
not mgr.exists(self.rse_settings, {'name': pfn}, impl=self.impl)
not mgr.exists(self.rse_settings, {'name': pfn}, impl=self.impl, vo=self.vo)

# MGR-Tests: RENAME
def test_rename_mgr_ok_multi(self):
Expand Down

0 comments on commit 35c0b07

Please sign in to comment.