Skip to content

Commit

Permalink
Merge pull request #4691 from rcarpa/patch-4473-cosmetic_changes_list…
Browse files Browse the repository at this point in the history
…_replicas

Core & Internals: simplify list_replica response creation. Closes #4473
  • Loading branch information
bari12 committed Sep 6, 2021
2 parents f429a08 + 7a00e4f commit b6092bc
Showing 1 changed file with 50 additions and 91 deletions.
141 changes: 50 additions & 91 deletions lib/rucio/core/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,31 @@ def get_multi_cache_prefix(cache_site, filename, logger=logging.log):
return ''


def _sort_replica_file_pfns(file):
"""
Helper function to be executed after analyzing the last replica of a file (same scope:name)
"""
if not file.get('pfns'):
return

# set the total order for the priority
# --> exploit that L(AN) comes before W(AN) before Z(IP) alphabetically
# and use 1-indexing to be compatible with metalink
tmp = sorted([(file['pfns'][p]['domain'], file['pfns'][p]['priority'], p) for p in file['pfns']])
for i in range(0, len(tmp)):
file['pfns'][tmp[i][2]]['priority'] = i + 1

# also sort the pfns inside the rse structure
file['rses'] = {}
rse_pfns = sorted((file['pfns'][pfn]['rse_id'], file['pfns'][pfn]['priority'], pfn) for pfn in file['pfns'])
for t_rse, t_priority, t_pfn in rse_pfns:
if t_rse in file['rses']:
file['rses'][t_rse].append(t_pfn)
else:
file['rses'][t_rse] = [t_pfn]


def _list_replicas(dataset_clause, file_clause, state_clause, show_pfns,
schemes, files_wo_replica, rse_clause, client_location, domain,
sign_urls, signature_lifetime, constituent_clause, resolve_parents,
Expand Down Expand Up @@ -1154,104 +1179,38 @@ def _list_replicas(dataset_clause, file_clause, state_clause, show_pfns,
except KeyError:
file['space_token'] = None

if 'scope' in file and 'name' in file:
if file['scope'] == scope and file['name'] == name:
# extract properly the pfn from the tuple
file['rses'][rse_id] += list(set([tmp_pfn[0] for tmp_pfn in pfns]))
file['states'][rse_id] = str(state.name if state else state)

if resolve_parents:
file['parents'] = ['%s:%s' % (parent['scope'].internal, parent['name'])
for parent in rucio.core.did.list_all_parent_dids(scope, name, session=session)]

for tmp_pfn in pfns:
file['pfns'][tmp_pfn[0]] = {'rse_id': rse_id,
'rse': rse,
'type': str(rse_type.name),
'volatile': volatile,
'domain': tmp_pfn[1],
'priority': tmp_pfn[2],
'client_extract': tmp_pfn[3]}
else:
if resolve_parents:
file['parents'] = ['%s:%s' % (parent['scope'].internal, parent['name'])
for parent in rucio.core.did.list_all_parent_dids(file['scope'], file['name'], session=session)]

# quick exit, but don't forget to set the total order for the priority
# --> exploit that L(AN) comes before W(AN) before Z(IP) alphabetically
# and use 1-indexing to be compatible with metalink
tmp = sorted([(file['pfns'][p]['domain'], file['pfns'][p]['priority'], p) for p in file['pfns']])

for i in range(0, len(tmp)):
file['pfns'][tmp[i][2]]['priority'] = i + 1
file['rses'] = {}

rse_pfns = []
for t_rse, t_priority, t_pfn in [(file['pfns'][t_pfn]['rse_id'], file['pfns'][t_pfn]['priority'], t_pfn) for t_pfn in file['pfns']]:
rse_pfns.append((t_rse, t_priority, t_pfn))
rse_pfns = sorted(rse_pfns)

for t_rse, t_priority, t_pfn in rse_pfns:
if t_rse in file['rses']:
file['rses'][t_rse].append(t_pfn)
else:
file['rses'][t_rse] = [t_pfn]

yield file
file = {}
# Finished the replicas of the previous scope/name. Yield the file and start fresh with current scope/name
if 'scope' in file and 'name' in file and not (file['scope'] == scope and file['name'] == name):
_sort_replica_file_pfns(file)
yield file
file = {}

if not ('scope' in file and 'name' in file):
file['scope'], file['name'] = scope, name
file['bytes'], file['md5'], file['adler32'] = bytes_, md5, adler32
file['pfns'], file['rses'] = {}, defaultdict(list)
file['states'] = {rse_id: str(state.name if state else state)}

if resolve_parents:
file['parents'] = ['%s:%s' % (parent['scope'].internal, parent['name'])
for parent in rucio.core.did.list_all_parent_dids(scope, name, session=session)]

if rse_id:
# extract properly the pfn from the tuple
file['rses'][rse_id] = list(set([tmp_pfn[0] for tmp_pfn in pfns]))
for tmp_pfn in pfns:
file['pfns'][tmp_pfn[0]] = {'rse_id': rse_id,
'rse': rse,
'type': str(rse_type.name),
'volatile': volatile,
'domain': tmp_pfn[1],
'priority': tmp_pfn[2],
'client_extract': tmp_pfn[3]}

# set the total order for the priority
# --> exploit that L(AN) comes before W(AN) before Z(IP) alphabetically
# and use 1-indexing to be compatible with metalink
if 'pfns' in file:
tmp = sorted([(file['pfns'][p]['domain'], file['pfns'][p]['priority'], p) for p in file['pfns']])
for i in range(0, len(tmp)):
file['pfns'][tmp[i][2]]['priority'] = i + 1

if 'scope' in file and 'name' in file:
file['rses'] = {}

# don't forget to resolve parents for the last replica
if resolve_parents:
file['pfns'], file['rses'], file['states'] = {}, defaultdict(list), {}

if rse_id:
file['rses'][rse_id] += list(set([tmp_pfn[0] for tmp_pfn in pfns]))
file['states'][rse_id] = str(state.name if state else state)

for tmp_pfn in pfns:
file['pfns'][tmp_pfn[0]] = {'rse_id': rse_id,
'rse': rse,
'type': str(rse_type.name),
'volatile': volatile,
'domain': tmp_pfn[1],
'priority': tmp_pfn[2],
'client_extract': tmp_pfn[3]}

if resolve_parents and not file.get('parents'):
file['parents'] = ['%s:%s' % (parent['scope'].internal, parent['name'])
for parent in rucio.core.did.list_all_parent_dids(file['scope'], file['name'], session=session)]

# also sort the pfns inside the rse structure
rse_pfns = []
for t_rse, t_priority, t_pfn in [(file['pfns'][t_pfn]['rse_id'], file['pfns'][t_pfn]['priority'], t_pfn) for t_pfn in file['pfns']]:
rse_pfns.append((t_rse, t_priority, t_pfn))
rse_pfns = sorted(rse_pfns)

for t_rse, t_priority, t_pfn in rse_pfns:
if t_rse in file['rses']:
file['rses'][t_rse].append(t_pfn)
else:
file['rses'][t_rse] = [t_pfn]
for parent in rucio.core.did.list_all_parent_dids(scope, name, session=session)]

# Handle the last file
if 'scope' in file and 'name' in file:
_sort_replica_file_pfns(file)
yield file
file = {}

for scope, name, bytes_, md5, adler32 in _list_files_wo_replicas(files_wo_replica, session):
yield {
Expand Down

0 comments on commit b6092bc

Please sign in to comment.