Skip to content

Commit

Permalink
Merge pull request #4881 from rcarpa/patch-4872-compress_multihop_via…
Browse files Browse the repository at this point in the history
…_source

 Transfers: compress multihop if it passes through a source. Closes #4872
  • Loading branch information
bari12 committed Oct 18, 2021
2 parents c3e6552 + 58d9f29 commit de42772
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,27 +1201,35 @@ def get_dsn(scope, name, dsn):
return 'other'


def __filter_unwanted_paths(candidate_paths: "Iterable[List[DirectTransferDefinition]]") -> "Generator[List[DirectTransferDefinition]]":

def __filter_multihops_with_intermediate_tape(candidate_paths: "Iterable[List[DirectTransferDefinition]]") -> "Generator[List[DirectTransferDefinition]]":
# Discard multihop transfers which contain a tape source as an intermediate hop
filtered_candidate_paths = []
for path in candidate_paths:
if any(transfer.src.rse.is_tape_or_staging_required() for transfer in path[1:]):
continue
filtered_candidate_paths.append(path)
candidate_paths = filtered_candidate_paths
pass
else:
yield path

# Discard multihop transfers which contain other candidate as part of itself For example:
# if A->B->C and B->C are both candidates, discard A->B->C because it includes B->C. Doing B->C is enough.
source_rses = {path[0].src.rse.id for path in candidate_paths}
filtered_candidate_paths = []

def __compress_multihops(
candidate_paths: "Iterable[List[DirectTransferDefinition]]",
sources: "Iterable[TransferSource]",
) -> "Generator[List[DirectTransferDefinition]]":
# Compress multihop transfers which contain other sources as part of itself.
# For example: multihop A->B->C and B is a source, compress A->B->C into B->C
source_rses = {s.rse.id for s in sources}
seen_source_rses = set()
for path in candidate_paths:
if any(hop.src.rse.id in source_rses for hop in path[1:]):
continue
filtered_candidate_paths.append(path)
candidate_paths = filtered_candidate_paths
if len(path) > 1:
# find the index of the first hop starting from the end which is also a source. Path[0] will always be a source.
last_source_idx = next((idx for idx, hop in reversed(list(enumerate(path))) if hop.src.rse.id in source_rses), (0, None))
if last_source_idx > 0:
path = path[last_source_idx:]

yield from candidate_paths
# Deduplicate paths from same source
src_rse_id = path[0].src.rse.id
if src_rse_id not in seen_source_rses:
seen_source_rses.add(src_rse_id)
yield path


def __sort_paths(candidate_paths: "Iterable[List[DirectTransferDefinition]]") -> "Generator[List[DirectTransferDefinition]]":
Expand Down Expand Up @@ -1401,6 +1409,9 @@ def __build_transfer_paths(
unavailable_read_rse_ids = __get_unavailable_rse_ids(operation='read', session=session)
unavailable_write_rse_ids = __get_unavailable_rse_ids(operation='write', session=session)

# Disallow multihop via blocklisted RSEs
multihop_rses = list(set(multihop_rses).difference(unavailable_write_rse_ids).difference(unavailable_read_rse_ids))

candidate_paths_by_request_id, reqs_no_source, reqs_only_tape_source, reqs_scheme_mismatch = {}, set(), set(), set()
for rws in requests_with_sources:

Expand Down Expand Up @@ -1492,7 +1503,8 @@ def __build_transfer_paths(
if len(filtered_sources) != len(candidate_paths):
logger(logging.DEBUG, 'Sources after path computation for %s: %s', rws, [str(path[0].src.rse) for path in candidate_paths])

candidate_paths = __filter_unwanted_paths(candidate_paths)
candidate_paths = __filter_multihops_with_intermediate_tape(candidate_paths)
candidate_paths = __compress_multihops(candidate_paths, rws.sources)
candidate_paths = list(__sort_paths(candidate_paths))

if not candidate_paths:
Expand Down

0 comments on commit de42772

Please sign in to comment.