Skip to content

Commit

Permalink
Transfers: use new transfer core in stager. #857, #4447
Browse files Browse the repository at this point in the history
Adapt the list_transfers_request query from regular transfers
and use it in stager. There is a difference with the stagein query
which is being removed: we don't perform a left join on rse attributes
to retrieve the staging_buffer attribute. This shouldn't be a big
downside. It was a left outer join and it didn't reduce the
number of returned rows. However, it means that more rse.get_attributes
calls will be performed by the stager latter in the code.

Substitute third_party_copy constants with operation_src/dst variables
and pass them through because stager uses 'write'.

Add a StageinTransferDefinition class which enforces the requirements
for stageing transfers.

Add an integration test which relies on real fts, but with the 'mock'
gfal plugin. It reports successful transfers without executing
anything.

Also remove the mock argument in stager, which wasn't used anywhere.
  • Loading branch information
rcarpa committed Aug 19, 2021
1 parent 900b555 commit 74186dd
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 362 deletions.
20 changes: 10 additions & 10 deletions bin/rucio-conveyor-stager
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
# Copyright 2015-2018 CERN for the benefit of the ATLAS collaboration.
# -*- coding: utf-8 -*-
# Copyright 2015-2021 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,13 +15,15 @@
# limitations under the License.
#
# Authors:
# - Wen Guan, <wguan.icedew@gmail.com>, 2015
# - Vincent Garonne, <vgaronne@gmail.com>, 2015-2018
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2018-2019
# - Andrew Lister, <andrew.lister@stfc.ac.uk>, 2019
# - Wen Guan <wen.guan@cern.ch>, 2015
# - Vincent Garonne <vincent.garonne@cern.ch>, 2015-2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2018-2019
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018
# - Joaquín Bogado <jbogado@linti.unlp.edu.ar>, 2019
# - Andrew Lister <andrew.lister@stfc.ac.uk>, 2019
# - Patrick Austin <patrick.austin@stfc.ac.uk>, 2020
#
# PY3K COMPATIBLE
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
# - Radu Carpa <radu.carpa@cern.ch>, 2021

"""
Conveyor is a daemon to manage file transfers.
Expand All @@ -47,8 +50,6 @@ def get_parser():
help='Group control: number of requests per group')
parser.add_argument("--group-policy", action="store", default='rule', type=str,
help='Group control: policy used to group. enum{rule, dest, src_dest, rule_src_dest}')
parser.add_argument("--mock", action="store_true", default=False,
help='Fake source replicas for requests')
parser.add_argument('--source-strategy', action="store", default=None, type=str,
help='Source strategy. Overload the strategy defined in config DB.')
parser.add_argument('--exclude-rses', action="store", default=None, type=str,
Expand Down Expand Up @@ -78,7 +79,6 @@ if __name__ == "__main__":
total_threads=args.total_threads,
bulk=args.bulk,
group_bulk=args.group_bulk,
mock=args.mock,
include_rses=args.include_rses,
exclude_rses=args.exclude_rses,
rses=args.rses,
Expand Down
93 changes: 3 additions & 90 deletions lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - Matt Snyder <msnyder@bnl.gov>, 2021
# - Sahan Dilshan <32576163+sahandilshan@users.noreply.github.com>, 2021
# - Nick Smith <nick.smith@cern.ch>, 2021
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021

import datetime
Expand All @@ -45,7 +46,7 @@
from six import string_types
from sqlalchemy import and_, or_, func, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.expression import asc, false, true
from sqlalchemy.sql.expression import asc, true

from rucio.common.config import config_get_bool
from rucio.common.constants import FTS_STATE
Expand All @@ -57,7 +58,7 @@
from rucio.core.monitor import record_counter, record_timer
from rucio.core.rse import get_rse_name, get_rse_vo, get_rse_transfer_limits, get_rse_attribute
from rucio.db.sqla import models, filter_thread_work
from rucio.db.sqla.constants import RequestState, RequestType, ReplicaState, LockState, RequestErrMsg
from rucio.db.sqla.constants import RequestState, RequestType, LockState, RequestErrMsg
from rucio.db.sqla.session import read_session, transactional_session, stream_session
from rucio.transfertool.fts3 import FTS3Transfertool

Expand Down Expand Up @@ -764,94 +765,6 @@ def cancel_request_external_id(transfer_id, transfer_host):
raise RucioException('Could not cancel FTS3 transfer %s on %s: %s' % (transfer_id, transfer_host, traceback.format_exc()))


@read_session
def list_stagein_requests_and_source_replicas(total_workers=0, worker_number=0, limit=None, activity=None, older_than=None, rses=None, session=None):
"""
List stagein requests with source replicas
:param total_workers: Number of total workers.
:param worker_number: Id of the executing worker.
:param limit: Integer of requests to retrieve.
:param activity: Activity to be selected.
:param older_than: Only select requests older than this DateTime.
:param rses: List of rse_id to select requests.
:param session: Database session to use.
:returns: List.
"""
sub_requests = session.query(models.Request.id,
models.Request.rule_id,
models.Request.scope,
models.Request.name,
models.Request.md5,
models.Request.adler32,
models.Request.bytes,
models.Request.activity,
models.Request.attributes,
models.Request.previous_attempt_id,
models.Request.dest_rse_id,
models.Request.retry_count)\
.with_hint(models.Request, "INDEX(REQUESTS REQUESTS_TYP_STA_UPD_IDX)", 'oracle')\
.filter(models.Request.state == RequestState.QUEUED)\
.filter(models.Request.request_type == RequestType.STAGEIN)

if isinstance(older_than, datetime.datetime):
sub_requests = sub_requests.filter(models.Request.requested_at < older_than)

if activity:
sub_requests = sub_requests.filter(models.Request.activity == activity)

sub_requests = filter_thread_work(session=session, query=sub_requests, total_threads=total_workers, thread_id=worker_number)

if limit:
sub_requests = sub_requests.limit(limit)

sub_requests = sub_requests.subquery()

query = session.query(sub_requests.c.id,
sub_requests.c.rule_id,
sub_requests.c.scope,
sub_requests.c.name,
sub_requests.c.md5,
sub_requests.c.adler32,
sub_requests.c.bytes,
sub_requests.c.activity,
sub_requests.c.attributes,
sub_requests.c.dest_rse_id,
models.RSEFileAssociation.rse_id,
models.RSE.rse,
models.RSE.deterministic,
models.RSE.rse_type,
models.RSEFileAssociation.path,
models.RSEAttrAssociation.value,
sub_requests.c.retry_count,
sub_requests.c.previous_attempt_id,
models.Source.url,
models.Source.ranking)\
.outerjoin(models.RSEFileAssociation, and_(sub_requests.c.scope == models.RSEFileAssociation.scope,
sub_requests.c.name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state == ReplicaState.AVAILABLE,
sub_requests.c.dest_rse_id != models.RSEFileAssociation.rse_id))\
.with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_PK)", 'oracle')\
.outerjoin(models.RSE, and_(models.RSE.id == models.RSEFileAssociation.rse_id,
models.RSE.staging_area == false(),
models.RSE.deleted == false()))\
.outerjoin(models.RSEAttrAssociation, and_(models.RSEAttrAssociation.rse_id == models.RSE.id,
models.RSEAttrAssociation.key == 'staging_buffer'))\
.outerjoin(models.Source, and_(sub_requests.c.id == models.Source.request_id,
models.RSE.id == models.Source.rse_id))\
.with_hint(models.Source, "+ index(sources SOURCES_PK)", 'oracle')

if rses:
result = []
for item in query.all():
dest_rse_id = item[9]
if dest_rse_id in rses:
result.append(item)
return result
else:
return query.all()


@read_session
def get_sources(request_id, rse_id=None, session=None):
"""
Expand Down
216 changes: 0 additions & 216 deletions lib/rucio/core/staging.py

This file was deleted.

0 comments on commit 74186dd

Please sign in to comment.