Skip to content

Commit

Permalink
Merge pull request #585 from bbockelm/patch-570-transfers_pluggable_l…
Browse files Browse the repository at this point in the history
…fn2pfn

Transfers: Implement pluggable LFN2PFN algorithms. Fixes #570
  • Loading branch information
bbockelm committed Feb 9, 2018
2 parents 8c5cba5 + e9ec835 commit aba7638
Show file tree
Hide file tree
Showing 11 changed files with 548 additions and 54 deletions.
6 changes: 5 additions & 1 deletion bin/rucio
Expand Up @@ -18,6 +18,7 @@
- Evangelia Liotiri <evangelia.liotiri@cern.ch>, 2015
- Tobias Wegner <tobias.wegner@cern.ch>, 2017
- Nicolo Magini <nicolo.magini@cern.ch>, 2018
- Brian Bockelman <bbockelm@cse.unl.edu>, 2018
"""

import argparse
Expand Down Expand Up @@ -1111,7 +1112,10 @@ def upload(args):
logger.debug(error)
logger.error("Some of the files already exist in the catalog. No one will be added.")
except RucioException:
logger.error("A Rucio exception occurred when processing a file for upload.")
if logger.isEnabledFor(logging.DEBUG):
logger.exception("A Rucio exception occurred when processing a file for upload.")
else:
logger.error("A Rucio exception occurred when processing a file for upload.")
raise
logger.debug("Finished uploading files to RSE.")

Expand Down
6 changes: 4 additions & 2 deletions bin/rucio-admin
Expand Up @@ -572,8 +572,10 @@ def add_protocol_rse(args):
proto.setdefault('extended_attributes', {})
if args.ext_attr_json:
proto['extended_attributes'] = args.ext_attr_json
if proto['scheme'] == 'srm' and (not args.space_token or not args.web_service_path):
print 'Error: space-token and web-service-path must be provided for SRM endpoints.'
# Originally, both web-service-path and space-token were required. However,
# many CMS sites don't use space tokens, so we relax this for now.
if proto['scheme'] == 'srm' and not args.web_service_path:
print 'Error: web-service-path must be provided for SRM endpoints.'
return FAILURE
if args.space_token:
proto['extended_attributes']['space_token'] = args.space_token
Expand Down
1 change: 1 addition & 0 deletions etc/rucio.cfg.template
Expand Up @@ -188,6 +188,7 @@ scope = user.vzavrtan
[policy]
permission=atlas
schema=atlas
lfn2pfn_algorithm_default=hash

[webui]
usercert = /opt/rucio/etc/usercert_with_key.pem
39 changes: 39 additions & 0 deletions lib/rucio/client/rseclient.py
Expand Up @@ -271,6 +271,45 @@ def get_protocols(self, rse, protocol_domain='ALL', operation=None, default=Fals
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def lfns2pfns(self, rse, lfns, protocol_domain='ALL', operation=None, scheme=None):
"""
Returns PFNs that should be used at a RSE, corresponding to requested LFNs.
The PFNs are generated for the RSE *regardless* of whether a replica exists for the LFN.
:param rse: the RSE name
:param lfns: A list of LFN strings to translate to PFNs.
:param protocol_domain: The scope of the protocol. Supported are 'LAN', 'WAN', and 'ALL' (as default).
:param operation: The name of the requested operation (read, write, or delete).
If None, all operations are queried.
:param scheme: The identifier of the requested protocol (gsiftp, https, davs, etc).
:returns: A dictionary of LFN / PFN pairs.
:raises RSENotFound: if the RSE doesn't exist.
:raises RSEProtocolNotSupported: if no matching protocol entry could be found.
:raises RSEOperationNotSupported: if no matching protocol entry for the requested
operation could be found.
"""
path = '/'.join([self.RSE_BASEURL, rse, 'lfns2pfns'])
params = []
if scheme:
params.append(('scheme', scheme))
if protocol_domain != 'ALL':
params.append(('domain', protocol_domain))
if operation:
params.append(('operation', operation))
for lfn in lfns:
params.append(('lfn', lfn))

url = build_url(choice(self.list_hosts), path=path, params=params, doseq=True)

r = self._send_request(url, type='GET')
if r.status_code == codes.ok:
pfns = loads(r.text)
return pfns
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)

def delete_protocols(self, rse, scheme, hostname=None, port=None):
"""
Deletes matching protocols from RSE. Protocols using the same identifier can be
Expand Down
54 changes: 53 additions & 1 deletion lib/rucio/common/config.py
Expand Up @@ -9,6 +9,7 @@
# - Mario Lassnig, <mario.lassnig@cern.ch>, 2012
# - Vincent Garonne, <vincent.garonne@cern.ch>, 2013-2018
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2013
# - Brian Bockelman, <bbockelm@cse.unl.edu>, 2018

"""
Get the configuration file from /opt/rucio/etc/rucio.cfg
Expand All @@ -32,10 +33,25 @@ def config_get(section, option):


def config_has_section(section):
"""Indicates whether the named section is present in the configuration. The DEFAULT section is not acknowledged.)"""
"""
Indicates whether the named section is present in the configuration. The DEFAULT section is not acknowledged.)
:param section: Name of section in the Rucio config to verify.
:returns: True if the section exists in the configuration; False otherwise
"""
return __CONFIG.has_section(section)


def config_add_section(section):
"""
Add a new section to the configuration object. Throws DuplicateSectionError if it already exists.
:param section: Name of section in the Rucio config to add.
:returns: None
"""
return __CONFIG.add_section(section)


def config_get_int(section, option):
"""Return the integer value for a given option in a section"""
return __CONFIG.getint(section, option)
Expand All @@ -61,6 +77,32 @@ def config_get_items(section):
return __CONFIG.items(section)


def config_remove_option(section, option):
"""
Remove the specified option from a given section.
:param section: Name of section in the Rucio config.
:param option: Name of option to remove from Rucio configuration.
:returns: True if the option existed in the configuration, False otherwise.
:raises NoSectionError: If the section does not exist.
"""
return __CONFIG.remove_option(section, option)


def config_set(section, option, value):
"""
Set a configuration option in a given section.
:param section: Name of section in the Rucio config.
:param option: Name of option to set in the Rucio configuration.
:param value: New value for the option.
:raises NoSectionError: If the section does not exist.
"""
return __CONFIG.set(section, option, value)


def get_config_dir():
"""Return the rucio configuration directory"""
configdirs = ['/opt/rucio/etc/', ]
Expand All @@ -76,6 +118,16 @@ def get_config_dir():
return configdir


def get_lfn2pfn_algorithm_default():
"""Returns the default algorithm name for LFN2PFN translation for this server."""
default_lfn2pfn = "hash"
try:
default_lfn2pfn = config_get('policy', 'lfn2pfn_algorithm_default')
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
pass
return default_lfn2pfn


def get_schema_dir():
"""Return the rucio json schema directory"""
configdir = get_config_dir()
Expand Down
8 changes: 6 additions & 2 deletions lib/rucio/core/rse.py
Expand Up @@ -13,6 +13,7 @@
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2013-2016
# - Thomas Beermann, <thomas.beermann@cern.ch>, 2014, 2017
# - Wen Guan, <wen.guan@cern.ch>, 2015-2016
# - Brian Bockelman, <bbockelm@cse.unl.edu>, 2018

from re import match
from StringIO import StringIO
Expand All @@ -34,6 +35,7 @@
from rucio.core.rse_counter import add_counter

from rucio.common import exception, utils
from rucio.common.config import get_lfn2pfn_algorithm_default
from rucio.db.sqla import models
from rucio.db.sqla.constants import RSEType
from rucio.db.sqla.session import read_session, transactional_session, stream_session
Expand Down Expand Up @@ -688,7 +690,7 @@ def add_protocol(rse, parameter, session=None):
pass # String is not JSON

if parameter['scheme'] == 'srm':
if ('space_token' not in parameter['extended_attributes']) or ('web_service_path' not in parameter['extended_attributes']):
if 'web_service_path' not in parameter['extended_attributes']:
raise exception.InvalidObject('Missing values! For SRM, extended_attributes and web_service_path must be specified')

try:
Expand Down Expand Up @@ -729,7 +731,9 @@ def get_rse_protocols(rse, schemes=None, session=None):
raise exception.RSENotFound('RSE \'%s\' not found')

lfn2pfn_algorithms = get_rse_attribute('lfn2pfn_algorithm', rse_id=_rse.id, session=session)
lfn2pfn_algorithm = 'default'
# Resolve LFN2PFN default algorithm as soon as possible. This way, we can send back the actual
# algorithm name in response to REST queries.
lfn2pfn_algorithm = get_lfn2pfn_algorithm_default()
if lfn2pfn_algorithms:
lfn2pfn_algorithm = lfn2pfn_algorithms[0]

Expand Down

0 comments on commit aba7638

Please sign in to comment.