Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core : provide the possibility to specify PFNs or DIDs in declare_bad_file_replicas : Close #5033 #5068

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions lib/rucio/api/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
# Authors:
# - Vincent Garonne <vincent.garonne@cern.ch>, 2013-2016
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2019
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2021
# - Thomas Beermann <thomas.beermann@cern.ch>, 2014
# - Mario Lassnig <mario.lassnig@cern.ch>, 2017-2019
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
Expand Down Expand Up @@ -78,17 +78,29 @@ def declare_bad_file_replicas(pfns, reason, issuer, vo='def'):
"""
Declare a list of bad replicas.

:param pfns: The list of PFNs.
:param pfns: Either a list of PFNs (string) or a list of replicas {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id>}.
:param reason: The reason of the loss.
:param issuer: The issuer account.
:param vo: The VO to act on.
"""
kwargs = {}
rse_map = {}
if not permission.has_permission(issuer=issuer, vo=vo, action='declare_bad_file_replicas', kwargs=kwargs):
raise exception.AccessDenied('Account %s can not declare bad replicas' % (issuer))

issuer = InternalAccount(issuer, vo=vo)

type_ = type(pfns[0])
for pfn in pfns:
if not isinstance(pfn, type_):
raise exception.InvalidType('The PFNs must be either a list of string or list of dict')
if type_ == dict:
rse = pfn['rse']
if rse not in rse_map:
rse_id = get_rse_id(rse=rse, vo=vo)
rse_map[rse] = rse_id
pfn['rse_id'] = rse_map[rse]
pfn['scope'] = InternalScope(pfn['scope'], vo=vo)
replicas = replica.declare_bad_file_replicas(pfns=pfns, reason=reason, issuer=issuer, status=BadFilesStatus.BAD)

for k in list(replicas):
Expand Down
4 changes: 2 additions & 2 deletions lib/rucio/client/replicaclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# Authors:
# - Vincent Garonne <vincent.garonne@cern.ch>, 2013-2018
# - Mario Lassnig <mario.lassnig@cern.ch>, 2013-2021
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2014-2021
# - Ralph Vigne <ralph.vigne@cern.ch>, 2015
# - Brian Bockelman <bbockelm@cse.unl.edu>, 2018
# - Martin Barisits <martin.barisits@cern.ch>, 2018-2021
Expand Down Expand Up @@ -49,7 +49,7 @@ def declare_bad_file_replicas(self, pfns, reason):
"""
Declare a list of bad replicas.

:param pfns: The list of PFNs.
bari12 marked this conversation as resolved.
Show resolved Hide resolved
:param pfns: Either a list of PFNs (string) or a list of replicas {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id>}.
:param reason: The reason of the loss.
"""
data = {'reason': reason, 'pfns': pfns}
Expand Down
171 changes: 81 additions & 90 deletions lib/rucio/core/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021
# - Joel Dierkes <joel.dierkes@cern.ch>, 2021

from __future__ import print_function

import heapq
import logging
import random
Expand Down Expand Up @@ -309,7 +307,7 @@ def __declare_bad_file_replicas(pfns, rse_id, reason, issuer, status=BadFilesSta
"""
Declare a list of bad replicas.

:param pfns: The list of PFNs.
:param pfns: Either a list of PFNs (string) or a list of replicas {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id>}.
:param rse_id: The RSE id.
:param reason: The reason of the loss.
:param issuer: The issuer account.
Expand All @@ -318,94 +316,73 @@ def __declare_bad_file_replicas(pfns, rse_id, reason, issuer, status=BadFilesSta
:param session: The database session in use.
"""
unknown_replicas = []
declared_replicas = []
rse_info = rsemgr.get_rse_info(rse_id=rse_id, session=session)
replicas = []
proto = rsemgr.create_protocol(rse_info, 'read', scheme=scheme)
if rse_info['deterministic']:
parsed_pfn = proto.parse_pfns(pfns=pfns)
for pfn in parsed_pfn:
# WARNING : this part is ATLAS specific and must be changed
path = parsed_pfn[pfn]['path']
if path.startswith('/user') or path.startswith('/group'):
scope = '%s.%s' % (path.split('/')[1], path.split('/')[2])
name = parsed_pfn[pfn]['name']
elif path.startswith('/'):
scope = path.split('/')[1]
name = parsed_pfn[pfn]['name']
else:
scope = path.split('/')[0]
name = parsed_pfn[pfn]['name']

scope = InternalScope(scope, vo=issuer.vo)

__exists, scope, name, already_declared, size = __exists_replicas(rse_id, scope, name, path=None, session=session)
if __exists and ((status == BadFilesStatus.BAD and not already_declared) or status == BadFilesStatus.SUSPICIOUS):
replicas.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'state': ReplicaState.BAD})
new_bad_replica = models.BadReplicas(scope=scope, name=name, rse_id=rse_id, reason=reason, state=status, account=issuer, bytes=size)
new_bad_replica.save(session=session, flush=False)
session.query(models.Source).filter_by(scope=scope, name=name, rse_id=rse_id).delete(synchronize_session=False)
declared_replicas.append(pfn)
else:
if already_declared:
unknown_replicas.append('%s %s' % (pfn, 'Already declared'))
declared_replicas = []
type_ = type(pfns[0])

if type_ is str:
# If pfns is a list of PFNs, the scope and names need to be extracted from the path
rse_info = rsemgr.get_rse_info(rse_id=rse_id, session=session)
proto = rsemgr.create_protocol(rse_info, 'read', scheme=scheme)
if rse_info['deterministic']:
# TBD : In case of deterministic RSE, call the extract_scope_from_path method
parsed_pfn = proto.parse_pfns(pfns=pfns)
for pfn in parsed_pfn:
# WARNING : this part is ATLAS specific and must be changed
path = parsed_pfn[pfn]['path']
if path.startswith('/user') or path.startswith('/group'):
scope = '%s.%s' % (path.split('/')[1], path.split('/')[2])
name = parsed_pfn[pfn]['name']
elif path.startswith('/'):
scope = path.split('/')[1]
name = parsed_pfn[pfn]['name']
else:
no_hidden_char = True
for char in str(pfn):
if not isprint(char):
unknown_replicas.append('%s %s' % (pfn, 'PFN contains hidden chars'))
no_hidden_char = False
break
if no_hidden_char:
unknown_replicas.append('%s %s' % (pfn, 'Unknown replica'))
if status == BadFilesStatus.BAD:
# For BAD file, we modify the replica state, not for suspicious
try:
# there shouldn't be any exceptions since all replicas exist
update_replicas_states(replicas, session=session)
except exception.UnsupportedOperation:
raise exception.ReplicaNotFound("One or several replicas don't exist.")
scope = path.split('/')[0]
name = parsed_pfn[pfn]['name']

scope = InternalScope(scope, vo=issuer.vo)
replicas.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'state': status})

else:
# For non-deterministic RSEs use the path + rse_id to extract the scope
parsed_pfn = proto.parse_pfns(pfns=pfns)
for pfn in parsed_pfn:
path = '%s%s' % (parsed_pfn[pfn]['path'], parsed_pfn[pfn]['name'])
replicas.append({'scope': None, 'name': None, 'rse_id': rse_id, 'path': path, 'state': status})

else:
path_clause = []
parsed_pfn = proto.parse_pfns(pfns=pfns)
for pfn in parsed_pfn:
path = '%s%s' % (parsed_pfn[pfn]['path'], parsed_pfn[pfn]['name'])
__exists, scope, name, already_declared, size = __exists_replicas(rse_id, scope=None, name=None, path=path, session=session)
if __exists and ((status == BadFilesStatus.BAD and not already_declared) or status == BadFilesStatus.SUSPICIOUS):
replicas.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'state': ReplicaState.BAD})
new_bad_replica = models.BadReplicas(scope=scope, name=name, rse_id=rse_id, reason=reason, state=status, account=issuer, bytes=size)
new_bad_replica.save(session=session, flush=False)
session.query(models.Source).filter_by(scope=scope, name=name, rse_id=rse_id).delete(synchronize_session=False)
declared_replicas.append(pfn)
path_clause.append(models.RSEFileAssociation.path == path)
if path.startswith('/'):
path_clause.append(models.RSEFileAssociation.path == path[1:])
else:
path_clause.append(models.RSEFileAssociation.path == '/%s' % path)
else:
if already_declared:
unknown_replicas.append('%s %s' % (pfn, 'Already declared'))
else:
no_hidden_char = True
for char in str(pfn):
if not isprint(char):
unknown_replicas.append('%s %s' % (pfn, 'PFN contains hidden chars'))
no_hidden_char = False
break
if no_hidden_char:
unknown_replicas.append('%s %s' % (pfn, 'Unknown replica'))

if status == BadFilesStatus.BAD and declared_replicas != []:
# For BAD file, we modify the replica state, not for suspicious
query = session.query(models.RSEFileAssociation) \
.with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_PATH_IDX", 'oracle') \
.filter(models.RSEFileAssociation.rse_id == rse_id) \
.filter(or_(*path_clause))
rowcount = query.update({'state': ReplicaState.BAD})
if rowcount != len(declared_replicas):
# there shouldn't be any exceptions since all replicas exist
print(rowcount, len(declared_replicas), declared_replicas)
raise exception.ReplicaNotFound("One or several replicas don't exist.")
# If pfns is a list of replicas, just use scope, name and rse_id
for pfn in pfns:
replicas.append({'scope': pfn['scope'], 'name': pfn['name'], 'rse_id': rse_id, 'state': status})

for replica in replicas:
scope, name, rse_id, path = replica['scope'], replica['name'], replica['rse_id'], replica.get('path', None)
__exists, scope, name, already_declared, size = __exists_replicas(rse_id=rse_id, scope=scope, name=name, path=path, session=session)
if __exists and ((status == BadFilesStatus.BAD and not already_declared) or status == BadFilesStatus.SUSPICIOUS):
declared_replicas.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'state': ReplicaState.BAD})
new_bad_replica = models.BadReplicas(scope=scope, name=name, rse_id=rse_id, reason=reason, state=status, account=issuer, bytes=size)
new_bad_replica.save(session=session, flush=False)
else:
if already_declared:
unknown_replicas.append('%s %s' % (pfn, 'Already declared'))
elif path:
no_hidden_char = True
for char in str(path):
if not isprint(char):
unknown_replicas.append('%s %s' % (path, 'PFN contains hidden chars'))
no_hidden_char = False
break
if no_hidden_char:
unknown_replicas.append('%s %s' % (pfn, 'Unknown replica'))

if status == BadFilesStatus.BAD:
# For BAD file, we modify the replica state, not for suspicious
try:
# there shouldn't be any exceptions since all replicas exist
update_replicas_states(declared_replicas, session=session)
except exception.UnsupportedOperation:
raise exception.ReplicaNotFound("One or several replicas don't exist.")

try:
session.flush()
except IntegrityError as error:
Expand Down Expand Up @@ -470,13 +447,27 @@ def declare_bad_file_replicas(pfns, reason, issuer, status=BadFilesStatus.BAD, s
"""
Declare a list of bad replicas.

:param pfns: The list of PFNs.
:param pfns: Either a list of PFNs (string) or a list of replicas {'scope': <scope>, 'name': <name>, 'rse_id': <rse_id>}.
:param reason: The reason of the loss.
:param issuer: The issuer account.
:param status: The status of the file (SUSPICIOUS or BAD).
:param session: The database session in use.
"""
scheme, files_to_declare, unknown_replicas = get_pfn_to_rse(pfns, vo=issuer.vo, session=session)
type_ = type(pfns[0])
unknown_replicas = {}
files_to_declare = {}
for pfn in pfns:
if not isinstance(pfn, type_):
raise exception.InvalidType('The PFNs must be either a list of string or list of dict')
if type_ == str:
scheme, files_to_declare, unknown_replicas = get_pfn_to_rse(pfns, vo=issuer.vo, session=session)
else:
scheme = None
for pfn in pfns:
rse_id = pfn['rse_id']
if rse_id not in files_to_declare:
files_to_declare[rse_id] = []
files_to_declare[rse_id].append(pfn)
for rse_id in files_to_declare:
notdeclared = __declare_bad_file_replicas(files_to_declare[rse_id], rse_id, reason, issuer, status=status, scheme=scheme, session=session)
if notdeclared:
Expand Down
31 changes: 28 additions & 3 deletions lib/rucio/tests/test_bad_replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import pytest

from rucio.common.exception import RucioException, UnsupportedOperation
from rucio.common.exception import RucioException, UnsupportedOperation, InvalidType
from rucio.common.utils import generate_uuid, clean_surls
from rucio.core.replica import add_replicas, get_replicas_state, list_replicas, declare_bad_file_replicas, list_bad_replicas, get_bad_pfns, get_bad_replicas_backlog
from rucio.daemons.badreplicas.minos import run as minos_run
Expand Down Expand Up @@ -194,9 +194,7 @@ def test_client_add_list_bad_replicas(rse_factory, replica_client, did_client):
for replica in replica_client.list_replicas(dids=[{'scope': f['scope'], 'name': f['name']} for f in files], schemes=['srm'], all_states=True):
replicas.extend(replica['rses'][rse2])
list_rep.append(replica)
print(replicas, list_rep)
r = replica_client.declare_bad_file_replicas(replicas, 'This is a good reason')
print(r)
assert r == {}
bad_replicas = list_bad_replicas()
nbbadrep = 0
Expand All @@ -213,6 +211,33 @@ def test_client_add_list_bad_replicas(rse_factory, replica_client, did_client):
output = ['%s Unknown replica' % rep for rep in files]
assert r == {rse2: output}

# Now test adding bad_replicas with a list of replicas instead of PFNs
# Adding replicas to deterministic RSE
rse3, rse3_id = rse_factory.make_srm_rse(deterministic=True)
files = [{'scope': tmp_scope, 'name': 'file_%s' % generate_uuid(), 'bytes': 1, 'adler32': '0cc737eb', 'meta': {'events': 10}} for _ in range(nbfiles)]
replica_client.add_replicas(rse=rse3, files=files)
list_rep = [{'scope': file_['scope'], 'name': file_['name'], 'rse': rse3} for file_ in files]

# Listing replicas on deterministic RSE
replicas = []
for replica in replica_client.list_replicas(dids=[{'scope': f['scope'], 'name': f['name']} for f in files], schemes=['srm'], all_states=True):
replicas.extend(replica['rses'][rse3])
r = replica_client.declare_bad_file_replicas(list_rep, 'This is a good reason')
assert r == {}
bad_replicas = list_bad_replicas()
nbbadrep = 0
for rep in list_rep:
for badrep in bad_replicas:
if badrep['rse_id'] == rse3_id:
if badrep['scope'].external == rep['scope'] and badrep['name'] == rep['name']:
nbbadrep += 1
assert len(replicas) == nbbadrep

# InvalidType is raised if list_rep contains a mixture of replicas and PFNs
list_rep.extend(['srm://%s.cern.ch/test/%s/%s' % (rse2_id, tmp_scope, generate_uuid()), ])
with pytest.raises(InvalidType):
r = replica_client.declare_bad_file_replicas(list_rep, 'This is a good reason')


def test_client_add_suspicious_replicas(rse_factory, replica_client):
""" REPLICA (CLIENT): Add suspicious replicas"""
Expand Down