Skip to content

Commit

Permalink
Core : provide the possibility to specify PFNs or DIDs in declare_bad…
Browse files Browse the repository at this point in the history
…_file_replicas : Close #5033
  • Loading branch information
cserf authored and bari12 committed Feb 23, 2022
1 parent a5cdda3 commit c7a6e0c
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 97 deletions.
16 changes: 14 additions & 2 deletions lib/rucio/api/replica.py
Expand Up @@ -15,7 +15,7 @@
#
# Authors:
# - Vincent Garonne <vincent.garonne@cern.ch>, 2013-2016
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2019
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2021
# - Thomas Beermann <thomas.beermann@cern.ch>, 2014
# - Mario Lassnig <mario.lassnig@cern.ch>, 2017-2019
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
Expand Down Expand Up @@ -80,17 +80,29 @@ def declare_bad_file_replicas(pfns, reason, issuer, vo='def'):
"""
Declare a list of bad replicas.
:param pfns: The list of PFNs.
:param pfns: Either a list of PFNs (string) or a list of replicas {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id>}.
:param reason: The reason of the loss.
:param issuer: The issuer account.
:param vo: The VO to act on.
"""
kwargs = {}
rse_map = {}
if not permission.has_permission(issuer=issuer, vo=vo, action='declare_bad_file_replicas', kwargs=kwargs):
raise exception.AccessDenied('Account %s can not declare bad replicas' % (issuer))

issuer = InternalAccount(issuer, vo=vo)

type_ = type(pfns[0])
for pfn in pfns:
if not isinstance(pfn, type_):
raise exception.InvalidType('The PFNs must be either a list of string or list of dict')
if type_ == dict:
rse = pfn['rse']
if rse not in rse_map:
rse_id = get_rse_id(rse=rse, vo=vo)
rse_map[rse] = rse_id
pfn['rse_id'] = rse_map[rse]
pfn['scope'] = InternalScope(pfn['scope'], vo=vo)
replicas = replica.declare_bad_file_replicas(pfns=pfns, reason=reason, issuer=issuer, status=BadFilesStatus.BAD)

for k in list(replicas):
Expand Down
4 changes: 2 additions & 2 deletions lib/rucio/client/replicaclient.py
Expand Up @@ -16,7 +16,7 @@
# Authors:
# - Vincent Garonne <vincent.garonne@cern.ch>, 2013-2018
# - Mario Lassnig <mario.lassnig@cern.ch>, 2013-2021
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2021
# - Ralph Vigne <ralph.vigne@cern.ch>, 2015
# - Brian Bockelman <bbockelm@cse.unl.edu>, 2018
# - Martin Barisits <martin.barisits@cern.ch>, 2018-2021
Expand Down Expand Up @@ -49,7 +49,7 @@ def declare_bad_file_replicas(self, pfns, reason):
"""
Declare a list of bad replicas.
:param pfns: The list of PFNs.
:param pfns: Either a list of PFNs (string) or a list of replicas {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id>}.
:param reason: The reason of the loss.
"""
data = {'reason': reason, 'pfns': pfns}
Expand Down
171 changes: 81 additions & 90 deletions lib/rucio/core/replica.py
Expand Up @@ -42,8 +42,6 @@
# - Joel Dierkes <joel.dierkes@cern.ch>, 2021
# - Christoph Ames <cames@cern.ch>, 2021

from __future__ import print_function

import heapq
import logging
import random
Expand Down Expand Up @@ -311,7 +309,7 @@ def __declare_bad_file_replicas(pfns, rse_id, reason, issuer, status=BadFilesSta
"""
Declare a list of bad replicas.
:param pfns: The list of PFNs.
:param pfns: Either a list of PFNs (string) or a list of replicas {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id>}.
:param rse_id: The RSE id.
:param reason: The reason of the loss.
:param issuer: The issuer account.
Expand All @@ -320,94 +318,73 @@ def __declare_bad_file_replicas(pfns, rse_id, reason, issuer, status=BadFilesSta
:param session: The database session in use.
"""
unknown_replicas = []
declared_replicas = []
rse_info = rsemgr.get_rse_info(rse_id=rse_id, session=session)
replicas = []
proto = rsemgr.create_protocol(rse_info, 'read', scheme=scheme)
if rse_info['deterministic']:
parsed_pfn = proto.parse_pfns(pfns=pfns)
for pfn in parsed_pfn:
# WARNING : this part is ATLAS specific and must be changed
path = parsed_pfn[pfn]['path']
if path.startswith('/user') or path.startswith('/group'):
scope = '%s.%s' % (path.split('/')[1], path.split('/')[2])
name = parsed_pfn[pfn]['name']
elif path.startswith('/'):
scope = path.split('/')[1]
name = parsed_pfn[pfn]['name']
else:
scope = path.split('/')[0]
name = parsed_pfn[pfn]['name']

scope = InternalScope(scope, vo=issuer.vo)

__exists, scope, name, already_declared, size = __exists_replicas(rse_id, scope, name, path=None, session=session)
if __exists and ((status == BadFilesStatus.BAD and not already_declared) or status == BadFilesStatus.SUSPICIOUS):
replicas.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'state': ReplicaState.BAD})
new_bad_replica = models.BadReplicas(scope=scope, name=name, rse_id=rse_id, reason=reason, state=status, account=issuer, bytes=size)
new_bad_replica.save(session=session, flush=False)
session.query(models.Source).filter_by(scope=scope, name=name, rse_id=rse_id).delete(synchronize_session=False)
declared_replicas.append(pfn)
else:
if already_declared:
unknown_replicas.append('%s %s' % (pfn, 'Already declared'))
declared_replicas = []
type_ = type(pfns[0])

if type_ is str:
# If pfns is a list of PFNs, the scope and names need to be extracted from the path
rse_info = rsemgr.get_rse_info(rse_id=rse_id, session=session)
proto = rsemgr.create_protocol(rse_info, 'read', scheme=scheme)
if rse_info['deterministic']:
# TBD : In case of deterministic RSE, call the extract_scope_from_path method
parsed_pfn = proto.parse_pfns(pfns=pfns)
for pfn in parsed_pfn:
# WARNING : this part is ATLAS specific and must be changed
path = parsed_pfn[pfn]['path']
if path.startswith('/user') or path.startswith('/group'):
scope = '%s.%s' % (path.split('/')[1], path.split('/')[2])
name = parsed_pfn[pfn]['name']
elif path.startswith('/'):
scope = path.split('/')[1]
name = parsed_pfn[pfn]['name']
else:
no_hidden_char = True
for char in str(pfn):
if not isprint(char):
unknown_replicas.append('%s %s' % (pfn, 'PFN contains hidden chars'))
no_hidden_char = False
break
if no_hidden_char:
unknown_replicas.append('%s %s' % (pfn, 'Unknown replica'))
if status == BadFilesStatus.BAD:
# For BAD file, we modify the replica state, not for suspicious
try:
# there shouldn't be any exceptions since all replicas exist
update_replicas_states(replicas, session=session)
except exception.UnsupportedOperation:
raise exception.ReplicaNotFound("One or several replicas don't exist.")
scope = path.split('/')[0]
name = parsed_pfn[pfn]['name']

scope = InternalScope(scope, vo=issuer.vo)
replicas.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'state': status})

else:
# For non-deterministic RSEs use the path + rse_id to extract the scope
parsed_pfn = proto.parse_pfns(pfns=pfns)
for pfn in parsed_pfn:
path = '%s%s' % (parsed_pfn[pfn]['path'], parsed_pfn[pfn]['name'])
replicas.append({'scope': None, 'name': None, 'rse_id': rse_id, 'path': path, 'state': status})

else:
path_clause = []
parsed_pfn = proto.parse_pfns(pfns=pfns)
for pfn in parsed_pfn:
path = '%s%s' % (parsed_pfn[pfn]['path'], parsed_pfn[pfn]['name'])
__exists, scope, name, already_declared, size = __exists_replicas(rse_id, scope=None, name=None, path=path, session=session)
if __exists and ((status == BadFilesStatus.BAD and not already_declared) or status == BadFilesStatus.SUSPICIOUS):
replicas.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'state': ReplicaState.BAD})
new_bad_replica = models.BadReplicas(scope=scope, name=name, rse_id=rse_id, reason=reason, state=status, account=issuer, bytes=size)
new_bad_replica.save(session=session, flush=False)
session.query(models.Source).filter_by(scope=scope, name=name, rse_id=rse_id).delete(synchronize_session=False)
declared_replicas.append(pfn)
path_clause.append(models.RSEFileAssociation.path == path)
if path.startswith('/'):
path_clause.append(models.RSEFileAssociation.path == path[1:])
else:
path_clause.append(models.RSEFileAssociation.path == '/%s' % path)
else:
if already_declared:
unknown_replicas.append('%s %s' % (pfn, 'Already declared'))
else:
no_hidden_char = True
for char in str(pfn):
if not isprint(char):
unknown_replicas.append('%s %s' % (pfn, 'PFN contains hidden chars'))
no_hidden_char = False
break
if no_hidden_char:
unknown_replicas.append('%s %s' % (pfn, 'Unknown replica'))

if status == BadFilesStatus.BAD and declared_replicas != []:
# For BAD file, we modify the replica state, not for suspicious
query = session.query(models.RSEFileAssociation) \
.with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_PATH_IDX", 'oracle') \
.filter(models.RSEFileAssociation.rse_id == rse_id) \
.filter(or_(*path_clause))
rowcount = query.update({'state': ReplicaState.BAD})
if rowcount != len(declared_replicas):
# there shouldn't be any exceptions since all replicas exist
print(rowcount, len(declared_replicas), declared_replicas)
raise exception.ReplicaNotFound("One or several replicas don't exist.")
# If pfns is a list of replicas, just use scope, name and rse_id
for pfn in pfns:
replicas.append({'scope': pfn['scope'], 'name': pfn['name'], 'rse_id': rse_id, 'state': status})

for replica in replicas:
scope, name, rse_id, path = replica['scope'], replica['name'], replica['rse_id'], replica.get('path', None)
__exists, scope, name, already_declared, size = __exists_replicas(rse_id=rse_id, scope=scope, name=name, path=path, session=session)
if __exists and ((status == BadFilesStatus.BAD and not already_declared) or status == BadFilesStatus.SUSPICIOUS):
declared_replicas.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'state': ReplicaState.BAD})
new_bad_replica = models.BadReplicas(scope=scope, name=name, rse_id=rse_id, reason=reason, state=status, account=issuer, bytes=size)
new_bad_replica.save(session=session, flush=False)
else:
if already_declared:
unknown_replicas.append('%s %s' % (pfn, 'Already declared'))
elif path:
no_hidden_char = True
for char in str(path):
if not isprint(char):
unknown_replicas.append('%s %s' % (path, 'PFN contains hidden chars'))
no_hidden_char = False
break
if no_hidden_char:
unknown_replicas.append('%s %s' % (pfn, 'Unknown replica'))

if status == BadFilesStatus.BAD:
# For BAD file, we modify the replica state, not for suspicious
try:
# there shouldn't be any exceptions since all replicas exist
update_replicas_states(declared_replicas, session=session)
except exception.UnsupportedOperation:
raise exception.ReplicaNotFound("One or several replicas don't exist.")

try:
session.flush()
except IntegrityError as error:
Expand Down Expand Up @@ -472,13 +449,27 @@ def declare_bad_file_replicas(pfns, reason, issuer, status=BadFilesStatus.BAD, s
"""
Declare a list of bad replicas.
:param pfns: The list of PFNs.
:param pfns: Either a list of PFNs (string) or a list of replicas {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id>}.
:param reason: The reason of the loss.
:param issuer: The issuer account.
:param status: The status of the file (SUSPICIOUS or BAD).
:param session: The database session in use.
"""
scheme, files_to_declare, unknown_replicas = get_pfn_to_rse(pfns, vo=issuer.vo, session=session)
type_ = type(pfns[0])
unknown_replicas = {}
files_to_declare = {}
for pfn in pfns:
if not isinstance(pfn, type_):
raise exception.InvalidType('The PFNs must be either a list of string or list of dict')
if type_ == str:
scheme, files_to_declare, unknown_replicas = get_pfn_to_rse(pfns, vo=issuer.vo, session=session)
else:
scheme = None
for pfn in pfns:
rse_id = pfn['rse_id']
if rse_id not in files_to_declare:
files_to_declare[rse_id] = []
files_to_declare[rse_id].append(pfn)
for rse_id in files_to_declare:
notdeclared = __declare_bad_file_replicas(files_to_declare[rse_id], rse_id, reason, issuer, status=status, scheme=scheme, session=session)
if notdeclared:
Expand Down
31 changes: 28 additions & 3 deletions lib/rucio/tests/test_bad_replica.py
Expand Up @@ -23,7 +23,7 @@

import pytest

from rucio.common.exception import RucioException, UnsupportedOperation
from rucio.common.exception import RucioException, UnsupportedOperation, InvalidType
from rucio.common.utils import generate_uuid, clean_surls
from rucio.core.replica import add_replicas, get_replicas_state, list_replicas, declare_bad_file_replicas, list_bad_replicas, get_bad_pfns, get_bad_replicas_backlog
from rucio.daemons.badreplicas.minos import run as minos_run
Expand Down Expand Up @@ -194,9 +194,7 @@ def test_client_add_list_bad_replicas(rse_factory, replica_client, did_client):
for replica in replica_client.list_replicas(dids=[{'scope': f['scope'], 'name': f['name']} for f in files], schemes=['srm'], all_states=True):
replicas.extend(replica['rses'][rse2])
list_rep.append(replica)
print(replicas, list_rep)
r = replica_client.declare_bad_file_replicas(replicas, 'This is a good reason')
print(r)
assert r == {}
bad_replicas = list_bad_replicas()
nbbadrep = 0
Expand All @@ -213,6 +211,33 @@ def test_client_add_list_bad_replicas(rse_factory, replica_client, did_client):
output = ['%s Unknown replica' % rep for rep in files]
assert r == {rse2: output}

# Now test adding bad_replicas with a list of replicas instead of PFNs
# Adding replicas to deterministic RSE
rse3, rse3_id = rse_factory.make_srm_rse(deterministic=True)
files = [{'scope': tmp_scope, 'name': 'file_%s' % generate_uuid(), 'bytes': 1, 'adler32': '0cc737eb', 'meta': {'events': 10}} for _ in range(nbfiles)]
replica_client.add_replicas(rse=rse3, files=files)
list_rep = [{'scope': file_['scope'], 'name': file_['name'], 'rse': rse3} for file_ in files]

# Listing replicas on deterministic RSE
replicas = []
for replica in replica_client.list_replicas(dids=[{'scope': f['scope'], 'name': f['name']} for f in files], schemes=['srm'], all_states=True):
replicas.extend(replica['rses'][rse3])
r = replica_client.declare_bad_file_replicas(list_rep, 'This is a good reason')
assert r == {}
bad_replicas = list_bad_replicas()
nbbadrep = 0
for rep in list_rep:
for badrep in bad_replicas:
if badrep['rse_id'] == rse3_id:
if badrep['scope'].external == rep['scope'] and badrep['name'] == rep['name']:
nbbadrep += 1
assert len(replicas) == nbbadrep

# InvalidType is raised if list_rep contains a mixture of replicas and PFNs
list_rep.extend(['srm://%s.cern.ch/test/%s/%s' % (rse2_id, tmp_scope, generate_uuid()), ])
with pytest.raises(InvalidType):
r = replica_client.declare_bad_file_replicas(list_rep, 'This is a good reason')


def test_client_add_suspicious_replicas(rse_factory, replica_client):
""" REPLICA (CLIENT): Add suspicious replicas"""
Expand Down

0 comments on commit c7a6e0c

Please sign in to comment.