Skip to content

Commit

Permalink
Merge pull request #3710 from cserf/patch-3692-Add_dirac_add_files_me…
Browse files Browse the repository at this point in the history
…thod_for_the_Rucio_Dirac_integration

Add dirac_add_files method for the Rucio/Dirac integration + Belleii : Adding permission file + Define extract_scope based on policy : Closes #3679, #3690, #3692
  • Loading branch information
bari12 committed Jun 16, 2020
2 parents c995b60 + ebce3b6 commit ccaadef
Show file tree
Hide file tree
Showing 16 changed files with 1,694 additions and 22 deletions.
6 changes: 3 additions & 3 deletions bin/rucio
Expand Up @@ -19,7 +19,7 @@
# - Martin Barisits, <martin.barisits@cern.ch>, 2012-2019
# - Thomas Beermann, <thomas.beermann@cern.ch>, 2012-2017
# - Yun-Pin Sun, <winter0128@gmail.com>, 2012-2013
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2013-2018
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2013-2020
# - Ralph Vigne, <ralph.vigne@cern.ch>, 2013
# - David Cameron, <d.g.cameron@gmail.com>, 2014
# - Tomas Kouba, <tomas.kouba@cern.ch>, 2014
Expand Down Expand Up @@ -68,7 +68,7 @@ from rucio.common.exception import (DataIdentifierAlreadyExists, AccessDenied, D
RSENotFound, InvalidRSEExpression, InputValidationError, DuplicateContent,
RuleNotFound, CannotAuthenticate, MissingDependency, UnsupportedOperation,
RucioException, DuplicateRule, InvalidType)
from rucio.common.utils import sizefmt, Color, detect_client_location, chunks, parse_did_filter_from_string
from rucio.common.utils import sizefmt, Color, detect_client_location, chunks, parse_did_filter_from_string, extract_scope

SUCCESS = 0
FAILURE = 1
Expand Down Expand Up @@ -135,7 +135,7 @@ def signal_handler(sig, frame):
signal.signal(signal.SIGINT, signal_handler)


def extract_scope(did):
def extract_scope_old(did):
# Try to extract the scope from the DSN
if did.find(':') > -1:
if len(did.split(':')) > 2:
Expand Down
4 changes: 2 additions & 2 deletions bin/rucio-admin
Expand Up @@ -64,7 +64,7 @@ from rucio.common.exception import (AccountNotFound, DataIdentifierAlreadyExists
RSENotFound, RSEOperationNotSupported, InvalidRSEExpression,
DuplicateContent, RuleNotFound, CannotAuthenticate,
Duplicate, ReplicaIsLocked, ConfigNotFound)
from rucio.common.utils import chunks, construct_surl, sizefmt, get_bytes_value_from_string, render_json, parse_response
from rucio.common.utils import chunks, construct_surl, sizefmt, get_bytes_value_from_string, render_json, parse_response, extract_scope
from rucio import version
from rucio.rse import rsemanager as rsemgr

Expand Down Expand Up @@ -244,7 +244,7 @@ def get_client(args):
return client


def extract_scope(did):
def extract_scope_old(did):
# Try to extract the scope from the DSN
if did.find(':') > -1:
scope, name = did.split(':')[0], did.split(':')[1]
Expand Down
5 changes: 3 additions & 2 deletions etc/web/aliases-py27.conf
Expand Up @@ -7,11 +7,12 @@ WSGIScriptAlias /archives /opt/rucio/.venv/lib/python2.7/site-pac
WSGIScriptAlias /config /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/config.py
WSGIScriptAlias /tmp_dids /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/temporary_did.py
WSGIScriptAlias /dids /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/did.py
WSGIScriptAlias /dirac /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/dirac.py
WSGIScriptAlias /identities /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/identity.py
WSGIScriptAlias /heartbeats /opt/rucio/.venv/lib/python2.6/site-packages/rucio/web/rest/heartbeat.py
WSGIScriptAlias /heartbeats /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/heartbeat.py
WSGIScriptAlias /locks /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/lock.py
WSGIScriptAlias /meta /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/meta.py
WSGIScriptAlias /nongrid_traces /opt/rucio/.venv/lib/python2.6/site-packages/rucio/web/rest/nongrid_trace.py
WSGIScriptAlias /nongrid_traces /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/nongrid_trace.py
WSGIScriptAlias /ping /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/ping.py
WSGIScriptAlias /redirect /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/redirect.py
WSGIScriptAlias /replicas /opt/rucio/.venv/lib/python2.7/site-packages/rucio/web/rest/replica.py
Expand Down
71 changes: 71 additions & 0 deletions lib/rucio/api/dirac.py
@@ -0,0 +1,71 @@
#!/usr/bin/env python
# Copyright 2020 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# - Cedric Serfon <cedric.serfon@cern.ch>, 2020
#
# PY3K COMPATIBLE

from __future__ import print_function

from rucio.api.permission import has_permission

from rucio.core.rse import get_rse_id
from rucio.core.scope import list_scopes
from rucio.core import dirac
from rucio.common.exception import AccessDenied
from rucio.common.types import InternalAccount
from rucio.common.utils import extract_scope


def add_files(lfns, issuer, ignore_availability):
"""
Bulk add files :
- Create the file and replica.
- If doesn't exist create the dataset containing the file as well as a rule on the dataset on ANY sites.
- Create all the ascendants of the dataset if they do not exist
:param lfns: List of lfn (dictionary {'lfn': <lfn>, 'rse': <rse>, 'bytes': <bytes>, 'adler32': <adler32>, 'guid': <guid>, 'pfn': <pfn>}
:param issuer: The issuer account.
:param ignore_availability: A boolean to ignore blacklisted sites.
"""
scopes = list_scopes()
dids = []
rses = {}
for lfn in lfns:
scope, name = extract_scope(lfn['lfn'], scopes)
dids.append({'scope': scope, 'name': name})
rse = lfn['rse']
if rse not in rses:
rse_id = get_rse_id(rse=rse)
rses[rse] = rse_id
lfn['rse_id'] = rses[rse]

# Check if the issuer can add dids and use skip_availabitlity
for rse in rses:
rse_id = rses[rse]
kwargs = {'rse': rse, 'rse_id': rse_id}
if not has_permission(issuer=issuer, action='add_replicas', kwargs=kwargs):
raise AccessDenied('Account %s can not add file replicas on %s' % (issuer, rse))
if not has_permission(issuer=issuer, action='skip_availability_check', kwargs=kwargs):
ignore_availability = False

# Check if the issuer can add the files
kwargs = {'issuer': issuer, 'dids': dids}
if not has_permission(issuer=issuer, action='add_dids', kwargs=kwargs):
raise AccessDenied('Account %s can not bulk add data identifier' % (issuer))

issuer = InternalAccount(issuer)
dirac.add_files(lfns=lfns, account=issuer, ignore_availability=ignore_availability, session=None)
20 changes: 19 additions & 1 deletion lib/rucio/client/client.py
Expand Up @@ -43,9 +43,27 @@
from rucio.client.configclient import ConfigClient
from rucio.client.touchclient import TouchClient
from rucio.client.credentialclient import CredentialClient
from rucio.client.diracclient import DiracClient


class Client(AccountClient, AccountLimitClient, MetaClient, PingClient, ReplicaClient, RequestClient, RSEClient, ScopeClient, DIDClient, RuleClient, SubscriptionClient, LockClient, ConfigClient, TouchClient, ImportClient, ExportClient, CredentialClient):
class Client(AccountClient,
AccountLimitClient,
MetaClient,
PingClient,
ReplicaClient,
RequestClient,
RSEClient,
ScopeClient,
DIDClient,
RuleClient,
SubscriptionClient,
LockClient,
ConfigClient,
TouchClient,
ImportClient,
ExportClient,
CredentialClient,
DiracClient):

"""Main client class for accessing Rucio resources. Handles the authentication."""

Expand Down
59 changes: 59 additions & 0 deletions lib/rucio/client/diracclient.py
@@ -0,0 +1,59 @@
#!/usr/bin/env python
# Copyright 2020 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# - Cedric Serfon <cedric.serfon@cern.ch>, 2020
#
# PY3K COMPATIBLE

from __future__ import print_function

from json import dumps
from requests.status_codes import codes

from rucio.client.baseclient import BaseClient
from rucio.client.baseclient import choice
from rucio.common.utils import build_url


class DiracClient(BaseClient):

"""DataIdentifier client class for working with data identifiers"""

DIRAC_BASEURL = 'dirac'

def __init__(self, rucio_host=None, auth_host=None, account=None, ca_cert=None,
auth_type=None, creds=None, timeout=600, user_agent='rucio-clients'):
super(DiracClient, self).__init__(rucio_host, auth_host, account, ca_cert,
auth_type, creds, timeout, user_agent)

def add_files(self, lfns, ignore_availability=False):
"""
Bulk add files :
- Create the file and replica.
- If doesn't exist create the dataset containing the file as well as a rule on the dataset on ANY sites.
- Create all the ascendants of the dataset if they do not exist
:param lfns: List of lfn (dictionary {'lfn': <lfn>, 'rse': <rse>, 'bytes': <bytes>, 'adler32': <adler32>, 'guid': <guid>, 'pfn': <pfn>}
:param ignore_availability: A boolean to ignore blacklisted sites.
"""
path = '/'.join([self.DIRAC_BASEURL, 'addfiles'])
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='POST', data=dumps({'lfns': lfns, 'ignore_availability': ignore_availability}))
if r.status_code == codes.created:
return True
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
72 changes: 69 additions & 3 deletions lib/rucio/common/utils.py
Expand Up @@ -16,7 +16,7 @@
# - Vincent Garonne <vgaronne@gmail.com>, 2012-2018
# - Thomas Beermann <thomas.beermann@cern.ch>, 2012-2018
# - Mario Lassnig <mario.lassnig@cern.ch>, 2012-2019
# - Cedric Serfon <cedric.serfon@cern.ch>, 2013-2017
# - Cedric Serfon <cedric.serfon@cern.ch>, 2013-2020
# - Ralph Vigne <ralph.vigne@cern.ch>, 2013
# - Joaquin Bogado <jbogado@linti.unlp.edu.ar>, 2015-2018
# - Martin Barisits <martin.barisits@cern.ch>, 2016-2019
Expand Down Expand Up @@ -84,7 +84,7 @@
import urllib.parse as urlparse

from rucio.common.config import config_get
from rucio.common.exception import MissingModuleException, InvalidType, InputValidationError, MetalinkJsonParsingError
from rucio.common.exception import MissingModuleException, InvalidType, InputValidationError, MetalinkJsonParsingError, RucioException
from rucio.common.types import InternalAccount, InternalScope
# delay import until function to avoid circular dependancy (note here for reference)
# from rucio.core.rse import get_rse_name
Expand Down Expand Up @@ -395,7 +395,7 @@ def val_to_space_sep_str(vallist):
return u" ".join(vallist)
else:
return unicode(vallist)
except:
except Exception:
return u''


Expand Down Expand Up @@ -736,6 +736,72 @@ def clean_surls(surls):
return res


_EXTRACT_SCOPE_ALGORITHMS = {}
_DEFAULT_EXTRACT = 'atlas'


def extract_scope_atlas(did, scopes):
# Try to extract the scope from the DSN
if did.find(':') > -1:
if len(did.split(':')) > 2:
raise RucioException('Too many colons. Cannot extract scope and name')
scope, name = did.split(':')[0], did.split(':')[1]
if name.endswith('/'):
name = name[:-1]
return scope, name
else:
scope = did.split('.')[0]
if did.startswith('user') or did.startswith('group'):
scope = ".".join(did.split('.')[0:2])
if did.endswith('/'):
did = did[:-1]
return scope, did


def extract_scope_belleii(did, scopes):
split_did = did.split('/')
if did.find('/belle/MC/') > -1:
if len(split_did) > 5:
if split_did[4] in ['fab', 'merge1', 'skim']:
return 'mc_tmp', did
return 'mc', did
if did.find('/belle/Raw/') > -1:
return 'raw', did
if did.find('/belle/user/') > -1:
if len(split_did) > 5:
if len(split_did[4]) == 1 and 'user.%s' % (split_did[5]) in scopes:
return 'user.%s' % split_did[5], did
if len(split_did) > 4:
if 'user.%s' % (split_did[4]) in scopes:
return 'user.%s' % split_did[4], did
return 'user', did
if did.find('/belle/data/') > -1:
if len(split_did) > 5:
if split_did[4] in ['fab', 'skim']:
return 'data_tmp', did
return 'data', did
if did.find('/belle/ddm/functional_tests/') > -1:
return 'test', did
return 'other', did


def register_extract_scope_algorithm(extract_callable, name=[]):
if name is None:
name = extract_callable.__name__
_EXTRACT_SCOPE_ALGORITHMS[name] = extract_callable


register_extract_scope_algorithm(extract_scope_atlas, 'atlas')
register_extract_scope_algorithm(extract_scope_belleii, 'belleii')


def extract_scope(did, scopes=None):
extract_scope_convention = config_get('common', 'extract_scope', False, None)
if extract_scope_convention is None or extract_scope_convention not in _EXTRACT_SCOPE_ALGORITHMS:
extract_scope_convention = _DEFAULT_EXTRACT
return _EXTRACT_SCOPE_ALGORITHMS[extract_scope_convention](did=did, scopes=scopes)


def pid_exists(pid):
"""
Check whether pid exists in the current process table.
Expand Down

0 comments on commit ccaadef

Please sign in to comment.