Skip to content

Commit

Permalink
Replicas: also sort json output. Closes #4105
Browse files Browse the repository at this point in the history
Also enforces the "limit" input parameter on JSON, which wasn't
done until now.
  • Loading branch information
rcarpa committed Sep 24, 2021
1 parent b6092bc commit 70e0571
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 37 deletions.
14 changes: 4 additions & 10 deletions lib/rucio/tests/test_replica_sorting.py
Expand Up @@ -122,10 +122,7 @@ def protocols_setup(vo):


@pytest.mark.noparallel(reason='fails when run in parallel, lists replicas and checks for length of returned list')
@pytest.mark.parametrize("content_type", [
Mime.METALINK,
pytest.param(Mime.JSON_STREAM, marks=pytest.mark.xfail(reason='see https://github.com/rucio/rucio/issues/4105')),
])
@pytest.mark.parametrize("content_type", [Mime.METALINK, Mime.JSON_STREAM])
def test_sort_geoip_wan(vo, rest_client, auth_token, protocols_setup, content_type):
"""Replicas: test sorting a few WANs via geoip."""
n = 10
Expand Down Expand Up @@ -187,8 +184,8 @@ def prepare_sort_geoip_lan_before_wan_params():
argvalues = [
(Mime.METALINK, 0),
(Mime.METALINK, 1),
pytest.param(Mime.JSON_STREAM, 0, marks=pytest.mark.xfail(reason='see https://github.com/rucio/rucio/issues/4105')),
pytest.param(Mime.JSON_STREAM, 1, marks=pytest.mark.xfail(reason='see https://github.com/rucio/rucio/issues/4105')),
(Mime.JSON_STREAM, 0),
(Mime.JSON_STREAM, 1),
]
rargvalues = map(lambda p: p.values if hasattr(p, 'values') else p, argvalues)
ids = [f'mime={repr(mime)}, lan-site={repr(base_rse_info[iid]["site"])}' for mime, iid in rargvalues]
Expand Down Expand Up @@ -323,10 +320,7 @@ def fake_sort_replicas(dictreplica, *args, **kwargs):


@pytest.mark.noparallel(reason='fails when run in parallel')
@pytest.mark.parametrize("content_type", [
Mime.METALINK,
pytest.param(Mime.JSON_STREAM, marks=pytest.mark.xfail(reason='see https://github.com/rucio/rucio/issues/4105')),
])
@pytest.mark.parametrize("content_type", [Mime.METALINK, Mime.JSON_STREAM])
def test_sort_geoip_address_not_found_error(vo, rest_client, auth_token, protocols_setup, content_type):
"""Replicas: test sorting via geoip with ignoring geoip errors."""

Expand Down
59 changes: 32 additions & 27 deletions lib/rucio/web/rest/flaskapi/v1/replicas.py
Expand Up @@ -21,6 +21,7 @@
# - Martin Barisits <martin.barisits@cern.ch>, 2021

from datetime import datetime
from itertools import chain
from json import dumps, loads
from urllib.parse import parse_qs, unquote
from xml.sax.saxutils import escape
Expand Down Expand Up @@ -337,7 +338,7 @@ def post(self):
def generate(request_id, issuer, vo):
# we need to call list_replicas before starting to reply
# otherwise the exceptions won't be propagated correctly
first = metalink
first = True

for rfile in list_replicas(dids=dids, schemes=schemes,
unavailable=unavailable,
Expand All @@ -354,23 +355,34 @@ def generate(request_id, issuer, vo):
issuer=issuer,
vo=vo):

# in first round, set the appropriate content type, and stream the header
if first and metalink:
yield '<?xml version="1.0" encoding="UTF-8"?>\n<metalink xmlns="urn:ietf:params:xml:ns:metalink">\n'
first = False
# Sort rfile['pfns'] and limit its size according to "limit" parameter
lanreplicas = {}
wanreplicas = {}
for pfn, replica in rfile['pfns'].items():
replica_tuple = (replica['domain'], replica['priority'], replica['rse'], replica['client_extract'])
if replica_tuple[0] == 'lan':
lanreplicas[pfn] = replica_tuple
else:
wanreplicas[pfn] = replica_tuple
unsorted_replicas = rfile['pfns']
sorted_replicas = rfile['pfns'] = {}
# Lan replicas sorted by priority; followed by wan replicas sorted by selection criteria
for idx, pfn in enumerate(chain(sorted(lanreplicas.keys(), key=lambda pfn: lanreplicas[pfn][1]),
sort_replicas(wanreplicas, client_location, selection=select)),
start=1):
replica = unsorted_replicas[pfn]
sorted_replicas[pfn] = replica
replica['priority'] = idx
if limit and limit == idx:
break

if not metalink:
yield dumps(rfile, cls=APIEncoder) + '\n'
else:
replicas = []
dictreplica = {}
for replica in rfile['pfns'].keys():
replicas.append(replica)
dictreplica[replica] = (rfile['pfns'][replica]['domain'],
rfile['pfns'][replica]['priority'],
rfile['pfns'][replica]['rse'],
rfile['pfns'][replica]['client_extract'])

# in first round, set the appropriate content type, and stream the header
if first:
yield '<?xml version="1.0" encoding="UTF-8"?>\n<metalink xmlns="urn:ietf:params:xml:ns:metalink">\n'
first = False
yield ' <file name="' + rfile['name'] + '">\n'

if 'parents' in rfile and rfile['parents']:
Expand All @@ -389,19 +401,12 @@ def generate(request_id, issuer, vo):
policy_schema = config_get('policy', 'schema', raise_exception=False, default='generic')
yield f' <glfn name="/{policy_schema}/rucio/{rfile["scope"]}:{rfile["name"]}"></glfn>\n'

lanreplicas = [replica for replica, v in dictreplica.items() if v[0] == 'lan']
# sort lan by priority
lanreplicas.sort(key=lambda rep: dictreplica[rep][1])
replicas = lanreplicas + sort_replicas({k: v for k, v in dictreplica.items() if v[0] != 'lan'}, client_location, selection=select)

for idx, replica in enumerate(replicas, start=1):
yield ' <url location="' + str(dictreplica[replica][2]) \
+ '" domain="' + str(dictreplica[replica][0]) \
+ '" priority="' + str(idx) \
+ '" client_extract="' + str(dictreplica[replica][3]).lower() \
+ '">' + escape(replica) + '</url>\n'
if limit and limit == idx:
break
for pfn, replica in rfile['pfns'].items():
yield ' <url location="' + str(replica['rse']) \
+ '" domain="' + str(replica['domain']) \
+ '" priority="' + str(replica['priority']) \
+ '" client_extract="' + str(replica['client_extract']).lower() \
+ '">' + escape(pfn) + '</url>\n'
yield ' </file>\n'

if metalink:
Expand Down

0 comments on commit 70e0571

Please sign in to comment.