Skip to content

Commit

Permalink
Transfers: correctly record updated requests on failed multihops
Browse files Browse the repository at this point in the history
When handling a multi-hop transfer, all the hops following the one
which failed are automatically failed by the poller/receiver.
Until recently, those hops where handled again separately. However,
a recent change started ignoring requests which where already handled.
As a result, the metrics don't record correctly the number of
updated requests.
  • Loading branch information
rcarpa authored and bari12 committed Jan 15, 2024
1 parent 531a854 commit e44d24d
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
4 changes: 3 additions & 1 deletion lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,9 +1096,10 @@ def is_intermediate_hop(request):


@transactional_session
def handle_failed_intermediate_hop(request, *, session: "Session"):
def handle_failed_intermediate_hop(request, *, session: "Session") -> int:
"""
Perform housekeeping behind a failed intermediate hop
Returns the number of updated requests
"""
# mark all hops following this one (in any multihop path) as Failed
new_state = RequestState.FAILED
Expand All @@ -1123,6 +1124,7 @@ def handle_failed_intermediate_hop(request, *, session: "Session"):
err_msg=get_transfer_error(new_state, reason=reason),
)
session.execute(stmt)
return len(dependent_requests)


@METRICS.count_it
Expand Down
12 changes: 7 additions & 5 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,11 @@ def update_transfer_state(
:param tt_status_report: The transfertool status update, retrieved via request.query_request().
:param session: The database session to use.
:param logger: Optional decorated logger that can be passed from the calling daemons or servers.
:returns commit_or_rollback: Boolean.
:returns: The number of updated requests
"""

request_id = tt_status_report.request_id
nb_updated = 0
try:
fields_to_update = tt_status_report.get_db_fields_to_update(session=session, logger=logger)
if not fields_to_update:
Expand All @@ -556,11 +557,12 @@ def update_transfer_state(
updated = transition_request_state(request_id, request=request, session=session, **fields_to_update)

if not updated:
return False
return nb_updated
nb_updated += 1

if tt_status_report.state == RequestState.FAILED:
if request_core.is_intermediate_hop(request):
request_core.handle_failed_intermediate_hop(request, session=session)
nb_updated += request_core.handle_failed_intermediate_hop(request, session=session)

if tt_status_report.state:
stats_manager.observe(
Expand All @@ -580,10 +582,10 @@ def update_transfer_state(
additional_fields=tt_status_report.get_monitor_msg_fields(session=session, logger=logger),
session=session
)
return True
return nb_updated
except UnsupportedOperation as error:
logger(logging.WARNING, "Request %s doesn't exist - Error: %s" % (request_id, str(error).replace('\n', '')))
return False
return 0
except Exception:
logger(logging.CRITICAL, "Exception", exc_info=True)

Expand Down
7 changes: 4 additions & 3 deletions lib/rucio/daemons/conveyor/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,11 @@ def _poll_transfers(
stats_manager=transfer_stats_manager,
logger=logger,
)
# if True, really update request content; if False, only touch request
cnt += ret
if ret:
cnt += 1
METRICS.counter('update_request_state.{updated}').labels(updated=ret).inc()
METRICS.counter('update_request_state.{updated}').labels(updated=True).inc(delta=ret)
else:
METRICS.counter('update_request_state.{updated}').labels(updated=False).inc()

# should touch transfers.
# Otherwise if one bulk transfer includes many requests and one is not terminated, the transfer will be poll again.
Expand Down
5 changes: 4 additions & 1 deletion lib/rucio/daemons/conveyor/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ def _perform_request_update(self, msg, *, session=None, logger=logging.log):
session=session,
logger=logger,
)
METRICS.counter('update_request_state.{updated}').labels(updated=ret).inc()
if ret:
METRICS.counter('update_request_state.{updated}').labels(updated=True).inc(delta=ret)
else:
METRICS.counter('update_request_state.{updated}').labels(updated=False).inc()
except Exception:
logging.critical(traceback.format_exc())

Expand Down

0 comments on commit e44d24d

Please sign in to comment.