Skip to content

Commit

Permalink
Transfers: compress multihop if it passes through a source. Closes ru…
Browse files Browse the repository at this point in the history
…cio#4872

This may happen if source_replica_expression forces a particular source
RSE, but transfer to destination from the forced source ends being
a multi-hop via another source.
  • Loading branch information
rcarpa committed Oct 5, 2021
1 parent 09d9a50 commit e32fd92
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,27 +1231,35 @@ def get_dsn(scope, name, dsn):
return 'other'


def __filter_unwanted_paths(candidate_paths: "Iterable[List[DirectTransferDefinition]]") -> "Generator[List[DirectTransferDefinition]]":

def __filter_multihops_with_intermediate_tape(candidate_paths: "Iterable[List[DirectTransferDefinition]]") -> "Generator[List[DirectTransferDefinition]]":
# Discard multihop transfers which contain a tape source as an intermediate hop
filtered_candidate_paths = []
for path in candidate_paths:
if any(transfer.src.rse.is_tape_or_staging_required() for transfer in path[1:]):
continue
filtered_candidate_paths.append(path)
candidate_paths = filtered_candidate_paths
pass
else:
yield path

# Discard multihop transfers which contain other candidate as part of itself For example:
# if A->B->C and B->C are both candidates, discard A->B->C because it includes B->C. Doing B->C is enough.
source_rses = {path[0].src.rse.id for path in candidate_paths}
filtered_candidate_paths = []

def __compress_multihops(
candidate_paths: "Iterable[List[DirectTransferDefinition]]",
sources: "Iterable[TransferSource]",
) -> "Generator[List[DirectTransferDefinition]]":
# Compress multihop transfers which contain other sources as part of itself.
# For example: multihop A->B->C and B is a source, compress A->B->C into B->C
source_rses = {s.rse.id for s in sources}
seen_source_rses = set()
for path in candidate_paths:
if any(hop.src.rse.id in source_rses for hop in path[1:]):
continue
filtered_candidate_paths.append(path)
candidate_paths = filtered_candidate_paths
if len(path) > 1:
# find the index of the first hop starting from the end which is also a source. Path[0] will always be a source.
last_source_idx = next((idx for idx, hop in reversed(list(enumerate(path))) if hop.src.rse.id in source_rses), (0, None))
if last_source_idx > 0:
path = path[last_source_idx:]

yield from candidate_paths
# Deduplicate paths from same source
src_rse_id = path[0].src.rse.id
if src_rse_id not in seen_source_rses:
seen_source_rses.add(src_rse_id)
yield path


def __sort_paths(candidate_paths: "Iterable[List[DirectTransferDefinition]]") -> "Generator[List[DirectTransferDefinition]]":
Expand Down Expand Up @@ -1522,7 +1530,8 @@ def __build_transfer_paths(
if len(filtered_sources) != len(candidate_paths):
logger(logging.DEBUG, 'Sources after path computation for %s: %s', rws, [str(path[0].src.rse) for path in candidate_paths])

candidate_paths = __filter_unwanted_paths(candidate_paths)
candidate_paths = __filter_multihops_with_intermediate_tape(candidate_paths)
candidate_paths = __compress_multihops(candidate_paths, rws.sources)
candidate_paths = list(__sort_paths(candidate_paths))

if not candidate_paths:
Expand Down

0 comments on commit e32fd92

Please sign in to comment.