Skip to content

Commit

Permalink
policies: pass RSE ids to the permissions function when declaring bad…
Browse files Browse the repository at this point in the history
… repliacas rucio#6339

added traceback in case of uncaught exception in the web server BadReplicas

add debug output when for an uncaught exception in BadReplicas.post()
  • Loading branch information
ivmfnal committed Oct 15, 2023
1 parent fc3973a commit 524208d
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 33 deletions.
53 changes: 34 additions & 19 deletions lib/rucio/api/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from rucio.common.constants import SuspiciousAvailability
from rucio.common.schema import validate_schema
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.utils import api_update_return_dict, invert_dict
from rucio.common.utils import api_update_return_dict
from rucio.core import replica
# from rucio.core.replica import get_pfn_to_rse
from rucio.core.rse import get_rse_id, get_rse_name
from rucio.db.sqla.constants import BadFilesStatus
from rucio.db.sqla.session import read_session, stream_session, transactional_session
Expand Down Expand Up @@ -85,36 +86,50 @@ def declare_bad_file_replicas(replicas, reason, issuer, vo='def', force=False, *
if not replicas:
return {}

kwargs = {}
rse_map = {} # RSE name -> RSE id
if not permission.has_permission(issuer=issuer, vo=vo, action='declare_bad_file_replicas', kwargs=kwargs, session=session):
raise exception.AccessDenied('Account %s can not declare bad replicas' % (issuer))

issuer = InternalAccount(issuer, vo=vo)

# make sure all elements are either strings or dicts, without mixing
type_ = type(replicas[0])
if any(not isinstance(r, type_) for r in replicas):

as_pfns = isinstance(replicas[0], str)
if any(isinstance(r, str) != as_pfns for r in replicas):
raise exception.InvalidType('The replicas must be specified either as a list of PFNs (strings) or list of dicts')

rse_map = {} # name -> id
rse_id_to_name = {} # id -> name

issuer = InternalAccount(issuer, vo=vo)

replicas_lst = replicas
if type_ is dict:
rse_ids_to_check = set()
if as_pfns:
# scheme, rses_for_replicas, unknowns = get_pfn_to_rse(replicas, vo=vo)
# if unknowns:
# raise exception.ReplicaNotFound("Not all replicas found")
# rse_ids_to_check = set(rses_for_replicas.keys())
pass
else:
# repliac are given as dicts {name, scope, rse}
replicas_lst = [] # need to create new list to convert replica["scope"] from strings to InternalScope objects
for r in replicas:
if "name" not in r or "scope" not in r or ("rse" not in r and "rse_id" not in r):
raise exception.InvalidType('The replica dictionary must include scope and either rse (name) or rse_id')
scope = InternalScope(r['scope'], vo=vo)
rse_id = r.get("rse_id") or rse_map.get(r['rse'])
if rse_id is None:
rse = r["rse"]
rse_map[rse] = rse_id = get_rse_id(rse=rse, vo=vo, session=session)
rse_id = r.get("rse_id")
rse = r.get("rse")
rse_id = rse_id or rse_map.get(rse) or get_rse_id(rse=rse, vo=vo, session=session)
rse = rse or rse_id_to_name.get(rse_id) or get_rse_name(rse_id, session=session)
rse_map[rse] = rse_id
rse_id_to_name[rse_id] = rse
replicas_lst.append({
"scope": scope,
"scope": InternalScope(r['scope'], vo=vo),
"rse_id": rse_id,
"name": r["name"]
})

rse_id_to_name = invert_dict(rse_map) # RSE id -> RSE name
rse_ids_to_check.add(rse_id)

for rse_id in rse_ids_to_check:
if not permission.has_permission(issuer=issuer, vo=vo, action='declare_bad_file_replicas',
kwargs={"rse_id": rse_id},
session=session):
raise exception.AccessDenied('Account %s can not declare bad replicas in RSE %s'
% (issuer, rse_id_to_name.get(rse_id, rse_id)))

undeclared = replica.declare_bad_file_replicas(replicas_lst, reason=reason, issuer=issuer, status=BadFilesStatus.BAD,
force=force, session=session)
Expand Down
37 changes: 24 additions & 13 deletions lib/rucio/web/rest/flaskapi/v1/replicas.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import traceback
import sys
import logging

from datetime import datetime
from itertools import chain
from json import dumps, loads
Expand Down Expand Up @@ -847,20 +851,27 @@ def post(self):
406:
description: Not acceptable
"""
parameters = json_parameters()
replicas = param_get(parameters, 'replicas', default=[]) or param_get(parameters, 'pfns', default=[])
reason = param_get(parameters, 'reason', default=None)
force = param_get(parameters, 'force', default=False)

logging.log(logging.DEBUG, "BadReplicas.post()...")
try:
not_declared_files = declare_bad_file_replicas(replicas, reason=reason,
issuer=request.environ.get('issuer'), vo=request.environ.get('vo'),
force=force)
return not_declared_files, 201
except AccessDenied as error:
return generate_http_error_flask(401, error)
except (RSENotFound, ReplicaNotFound) as error:
return generate_http_error_flask(404, error)
parameters = json_parameters()
replicas = param_get(parameters, 'replicas', default=[]) or param_get(parameters, 'pfns', default=[])
reason = param_get(parameters, 'reason', default=None)
force = param_get(parameters, 'force', default=False)

try:
not_declared_files = declare_bad_file_replicas(replicas, reason=reason,
issuer=request.environ.get('issuer'), vo=request.environ.get('vo'),
force=force)
return not_declared_files, 201
except AccessDenied as error:
return generate_http_error_flask(401, error)
except (RSENotFound, ReplicaNotFound) as error:
return generate_http_error_flask(404, error)
except Exception as e:
tb = " ".join(traceback.format_exception(*sys.exc_info()))
message = f"Uncaught server side exception: {e}\n-------------------\n{tb}\n-----------------"
logging.log(logging.DEBUG, message)
raise


class QuarantineReplicas(ErrorHandlingMethodView):
Expand Down
57 changes: 56 additions & 1 deletion tests/test_bad_replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
from json import dumps, loads

import pytest
import logging

from rucio.common.exception import RucioException, UnsupportedOperation, InvalidType
from rucio.common.utils import generate_uuid, clean_surls
from rucio.core.did import delete_dids
from rucio.core.replica import (add_replicas, get_replicas_state, list_replicas,
declare_bad_file_replicas, list_bad_replicas, get_bad_pfns,
get_bad_replicas_backlog, list_bad_replicas_status, get_pfn_to_rse)
from rucio.core.rse import add_rse
from rucio.api.replica import declare_bad_file_replicas as api_declare_bad_file_replicas
from rucio.client.rseclient import RSEClient
from rucio.daemons.badreplicas.minos import run as minos_run
from rucio.daemons.badreplicas.minos_temporary_expiration import run as minos_temp_run
from rucio.daemons.badreplicas.necromancer import run as necromancer_run
from rucio.daemons.badreplicas.necromancer import REGION
from rucio.db.sqla.constants import DIDType, ReplicaState, BadPFNStatus, BadFilesStatus
from rucio.tests.common import headers, auth
from rucio.tests.common import headers, auth, rse_name_generator


@pytest.fixture
Expand Down Expand Up @@ -105,6 +108,57 @@ def test_add_list_bad_replicas(rse_factory, mock_scope, root_account):
assert r == {rse2_id: output}


@pytest.mark.xfail
@pytest.mark.noparallel(reason='calls list_bad_replicas() which acts on all bad replicas without any filtering')
def test_api_declare_bad_replicas_by_lfn(rse_factory, mock_scope, root_account):
""" REPLICA (API): Add bad replicas and list them"""

nbfiles = 5
# Adding replicas to deterministic RSE

rse_name = rse_name_generator()
properties = {
'ASN': 'ASN',
'availability_read': False,
'availability_write': True,
'availability_delete': False,
'deterministic': True,
'volatile': True,
'city': 'city',
'region_code': 'DE',
'country_name': 'country_name',
'continent': 'EU',
'time_zone': 'time_zone',
'ISP': 'ISP',
'staging_area': True,
'rse_type': 'DISK',
'longitude': 1.0,
'latitude': 2.0,
'vo': 'def'
}
rse_id = add_rse(rse_name, **properties)

files = [{'scope': mock_scope, 'name': 'file_%s' % generate_uuid(), 'bytes': 1, 'adler32': '0cc737eb', 'meta': {'events': 10}} for _ in range(nbfiles)]
add_replicas(rse_id=rse_id, files=files, account=root_account, ignore_availability=True)

replicas = [
{"name": f["name"], "scope": str(f["scope"]), "rse": rse_name}
for f in files
]

undeclared = api_declare_bad_file_replicas(replicas, 'This is a good reason', str(root_account))
assert not undeclared, "Not all replicas declared"
bad_replicas = list_bad_replicas()
nbbadrep = 0
for rep in replicas:
for badrep in bad_replicas:
if badrep['rse_id'] == rse_id:
if badrep['scope'] == rep['scope'] and badrep['name'] == rep['name']:
nbbadrep += 1
break
assert len(replicas) == nbbadrep


@pytest.mark.noparallel(reason='runs necromancer which acts on all bad replicas without any filtering')
@pytest.mark.parametrize("file_config_mock", [{
"overrides": [('necromancer', 'max_bad_replicas_backlog_count', '20')]
Expand Down Expand Up @@ -179,6 +233,7 @@ def test_client_add_list_bad_replicas(rse_factory, replica_client, did_client):
for replica in replica_client.list_replicas(dids=[{'scope': f['scope'], 'name': f['name']} for f in files], schemes=['srm'], all_states=True):
replicas.extend(replica['rses'][rse1])
list_rep.append(replica)
logging.log(logging.DEBUG, "test_client_add_list_bad_replicas: calling replica_client.declare_bad_file_replicas()...")
r = replica_client.declare_bad_file_replicas(replicas, 'This is a good reason')
assert r == {}
bad_replicas = list_bad_replicas()
Expand Down

0 comments on commit 524208d

Please sign in to comment.