Skip to content

Commit

Permalink
Transfers: re-introduce last_processed_by logic in poller. Closes #6376
Browse files Browse the repository at this point in the history
The protection provided by older_than, mentioned in
33f9578,
was over-estimated.
  • Loading branch information
rcarpa authored and bari12 committed Nov 27, 2023
1 parent 6bdad2b commit ad5c23c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
17 changes: 16 additions & 1 deletion lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def get_and_mark_next(
for share in activity_shares:

query = select(
models.Request
models.Request.id
).where(
models.Request.state.in_(state),
models.Request.request_type.in_(request_type)
Expand Down Expand Up @@ -744,6 +744,21 @@ def get_and_mark_next(
else:
query = query.limit(limit)

if session.bind.dialect.name == 'oracle':
query = select(
models.Request
).where(
models.Request.id.in_(query)
).with_for_update(
skip_locked=True
)
else:
query = query.with_only_columns(
models.Request
).with_for_update(
skip_locked=True,
of=models.Request.last_processed_by
)
query_result = session.execute(query).scalars()
if query_result:
if mode_all:
Expand Down
3 changes: 3 additions & 0 deletions lib/rucio/daemons/conveyor/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def _fetch_requests(
filter_transfertool,
cached_topology,
activity,
set_last_processed_by: bool,
heartbeat_handler
):
worker_number, total_workers, logger = heartbeat_handler.live()
Expand All @@ -77,6 +78,7 @@ def _fetch_requests(
rse_collection=topology,
request_type=[RequestType.TRANSFER, RequestType.STAGEIN, RequestType.STAGEOUT],
state=[RequestState.SUBMITTED],
processed_by=heartbeat_handler.short_executable if set_last_processed_by else None,
limit=db_bulk,
older_than=datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than) if older_than else None,
total_workers=total_workers,
Expand Down Expand Up @@ -197,6 +199,7 @@ def _db_producer(*, activity: str, heartbeat_handler: "HeartbeatHandler"):
filter_transfertool=filter_transfertool,
cached_topology=cached_topology,
activity=activity,
set_last_processed_by=not once,
heartbeat_handler=heartbeat_handler,
)

Expand Down

0 comments on commit ad5c23c

Please sign in to comment.