Skip to content

Commit

Permalink
Core: Add update_rule boost option rucio#4634
Browse files Browse the repository at this point in the history
The timeout between the retries of the transitioning of a rule from `STUCK` to
`REPLICATING` are quite big. This commit introduces the feature `--boost-rule`
to allow almost instant transition between the states.
  • Loading branch information
Joel Dierkes committed Oct 22, 2021
1 parent 9247486 commit 7ee4fe6
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 13 deletions.
10 changes: 8 additions & 2 deletions bin/rucio
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@
# - Rakshita Varadarajan <rakshitajps@gmail.com>, 2021
# - Christoph Ames <christoph.ames@physik.uni-muenchen.de>, 2021
# - James Perry <j.perry@epcc.ed.ac.uk>, 2021
# - Petr Vokac <petr.vokac@fjfi.cvut.cz>, 2021
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021
# - zoepap05 <90753392+zoepap05@users.noreply.github.com>, 2021
# - KosKyr <90753277+KosKyr@users.noreply.github.com>, 2021
# - Joel Dierkes <joel.dierkes@cern.ch>, 2021

from __future__ import print_function
Expand Down Expand Up @@ -1360,6 +1363,8 @@ def update_rule(args):
options['priority'] = int(args.priority)
if args.child_rule_id:
options['child_rule_id'] = args.child_rule_id
if args.boost_rule:
options['boost_rule'] = args.boost_rule
client.update_replication_rule(rule_id=args.rule_id, options=options)
print('Updated Rule')
return SUCCESS
Expand Down Expand Up @@ -2231,8 +2236,8 @@ You can filter by key/value, e.g.::
selected_parser.add_argument('--archive-did', action='store', dest='archive_did', help="Download from archive is transparent. This option is obsolete.")
selected_parser.add_argument('--no-resolve-archives', action='store_true', default=False, help="If set archives will not be considered for download.")
selected_parser.add_argument('--ignore-checksum', action='store_true', default=False, help="Don't validate checksum for downloaded files.")
selected_parser.add_argument('--transfer-timeout', dest='transfer_timeout', type=float, action='store', default=config_get('download', 'transfer_timeout', False, None), help='Transfer timeout (in seconds). Default: computed dynamically from --transfer-speed-timeout. If set to any value >= 0, --transfer-speed-timeout is ignored.')
selected_parser.add_argument('--transfer-speed-timeout', dest='transfer_speed_timeout', type=float, action='store', default=config_get('download', 'transfer_speed_timeout', False, 500), help='Minimum allowed average transfer speed (in KBps). Default: 500. Used to dynamically compute the timeout if --transfer-timeout not set. Is not supported for --pfn.')
selected_parser.add_argument('--transfer-timeout', dest='transfer_timeout', type=float, action='store', default=config_get('download', 'transfer_timeout', False, None), help='Transfer timeout (in seconds). Default: computed dynamically from --transfer-speed-timeout. If set to any value >= 0, --transfer-speed-timeout is ignored.') # NOQA: E501
selected_parser.add_argument('--transfer-speed-timeout', dest='transfer_speed_timeout', type=float, action='store', default=config_get('download', 'transfer_speed_timeout', False, 500), help='Minimum allowed average transfer speed (in KBps). Default: 500. Used to dynamically compute the timeout if --transfer-timeout not set. Is not supported for --pfn.') # NOQA: E501
selected_parser.add_argument('--aria', action='store_true', default=False, help="Use aria2c utility if possible. (EXPERIMENTAL)")
selected_parser.add_argument('--trace_appid', dest='trace_appid', action='store', default=os.environ.get('RUCIO_TRACE_APPID', None), help=argparse.SUPPRESS)
selected_parser.add_argument('--trace_dataset', dest='trace_dataset', action='store', default=os.environ.get('RUCIO_TRACE_DATASET', None), help=argparse.SUPPRESS)
Expand Down Expand Up @@ -2372,6 +2377,7 @@ You can filter by account::
update_rule_parser.add_argument('--cancel-requests', dest='cancel_requests', action='store_true', help='Cancel requests when setting rules to stuck.')
update_rule_parser.add_argument('--priority', dest='priority', action='store', help='Priority of the requests of the rule.')
update_rule_parser.add_argument('--child-rule-id', dest='child_rule_id', action='store', help='Child rule id of the rule.')
update_rule_parser.add_argument('--boost-rule', dest='boost_rule', action='store_true', help='')

# The move_rule command
move_rule_parser = subparsers.add_parser('move-rule', help='Move a replication rule to another RSE.')
Expand Down
20 changes: 10 additions & 10 deletions lib/rucio/core/permission/atlas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2016-2020 CERN for the benefit of the ATLAS collaboration.
# -*- coding: utf-8 -*-
# Copyright 2016-2021 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,18 +14,20 @@
# limitations under the License.
#
# Authors:
# - Vincent Garonne <vgaronne@gmail.com>, 2016
# - Vincent Garonne <vincent.garonne@cern.ch>, 2016
# - Martin Barisits <martin.barisits@cern.ch>, 2016-2020
# - Cedric Serfon <cedric.serfon@cern.ch>, 2016-2021
# - Mario Lassnig <mario.lassnig@cern.ch>, 2018-2020
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
# - Andrew Lister <andrew.lister@stfc.ac.uk>, 2019
# - Ruturaj Gujar <ruturaj.gujar23@gmail.com>, 2019
# - Eric Vaandering, <ewv@fnal.gov>, 2020
# - Jaroslav Guenther <jaroslav.guenther@cern.ch>, 2019
# - Eli Chadwick <eli.chadwick@stfc.ac.uk>, 2020
# - Patrick Austin <patrick.austin@stfc.ac.uk>, 2020
#
# PY3K COMPATIBLE
# - Eric Vaandering <ewv@fnal.gov>, 2020
# - Dimitrios Christidis <dimitrios.christidis@cern.ch>, 2021
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021
# - Joel Dierkes <joel.dierkes@cern.ch>, 2021

from typing import TYPE_CHECKING

Expand Down Expand Up @@ -525,11 +528,8 @@ def perm_update_rule(issuer, kwargs):
return True

# Only admin accounts can change account, state, priority of a rule
if 'account' in kwargs['options'] or\
'state' in kwargs['options'] or\
'priority' in kwargs['options'] or\
'child_rule_id' in kwargs['options'] or\
'meta' in kwargs['options']:
admin_reserved = {'account', 'state', 'priority', 'child_rule_id', 'meta', 'boost_rule'}
if admin_reserved.intersection(kwargs['options'].keys()):
return False # Only priv accounts are allowed to change that

# Country admins are allowed to change the rest.
Expand Down
10 changes: 9 additions & 1 deletion lib/rucio/core/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - Rakshita Varadarajan <rakshitajps@gmail.com>, 2021
# - Rahul Chauhan <omrahulchauhan@gmail.com>, 2021
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021
# - Joel Dierkes <joel.dierkes@cern.ch>, 2021

from __future__ import division
Expand Down Expand Up @@ -1286,7 +1287,7 @@ def update_rule(rule_id, options, session=None):
:raises: RuleNotFound if no Rule can be found, InputValidationError if invalid option is used, ScratchDiskLifetimeConflict if wrong ScratchDiskLifetime is used.
"""

valid_options = ['comment', 'locked', 'lifetime', 'account', 'state', 'activity', 'source_replica_expression', 'cancel_requests', 'priority', 'child_rule_id', 'eol_at', 'meta', 'purge_replicas']
valid_options = ['comment', 'locked', 'lifetime', 'account', 'state', 'activity', 'source_replica_expression', 'cancel_requests', 'priority', 'child_rule_id', 'eol_at', 'meta', 'purge_replicas', 'boost_rule']

for key in options:
if key not in valid_options:
Expand Down Expand Up @@ -1403,6 +1404,13 @@ def update_rule(rule_id, options, session=None):

insert_rule_history(rule=rule, recent=True, longterm=False, session=session)

# `boost_rule` should run after `stuck`, so lets not include it in the loop since the arguments are unordered
if 'boost_rule' in options:
for lock in session.query(models.ReplicaLock).filter_by(rule_id=rule.id, state=LockState.STUCK).all():
lock['updated_at'] -= timedelta(days=1)
rule['updated_at'] -= timedelta(days=1)
insert_rule_history(rule, recent=True, longterm=False, session=session)

except IntegrityError as error:
if match('.*ORA-00001.*', str(error.args[0])) \
or match('.*IntegrityError.*UNIQUE constraint failed.*', str(error.args[0])) \
Expand Down
6 changes: 6 additions & 0 deletions lib/rucio/tests/test_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# - Patrick Austin <patrick.austin@stfc.ac.uk>, 2020
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
# - Simon Fayer <simon.fayer05@imperial.ac.uk>, 2021
# - Joel Dierkes <joel.dierkes@cern.ch>, 2021

import unittest

Expand Down Expand Up @@ -84,3 +85,8 @@ def test_permission_get_auth_token_gss(self):
gsscred = 'rucio-dev@CERN.CH'
assert has_permission(issuer='root', action='get_auth_token_gss', kwargs={'account': 'root', 'gsscred': gsscred}, **self.vo)
assert not has_permission(issuer='root', action='get_auth_token_gss', kwargs={'account': self.usr, 'gsscred': gsscred}, **self.vo)

def test_permission_update_rule_boost(self):
kwargs = {'options': {'boost_rule': True}}
assert has_permission(issuer='root', action='update_rule', kwargs=kwargs, **self.vo)
assert not has_permission(issuer='jdoe', action='update_rule', kwargs=kwargs, **self.vo)
29 changes: 29 additions & 0 deletions lib/rucio/tests/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# - Radu Carpa <radu.carpa@cern.ch>, 2021
# - Dimitrios Christidis <dimitrios.christidis@cern.ch>, 2021
# - Simon Fayer <simon.fayer05@imperial.ac.uk>, 2021
# - David Población Criado <david.poblacion.criado@cern.ch>, 2021
# - Joel Dierkes <joel.dierkes@cern.ch>, 2021

import json
Expand Down Expand Up @@ -1215,6 +1216,34 @@ def mktree(scope, account):
assert(len(dsl3) == 0)


def test_rule_boost(vo, mock_scope, rse_factory, file_factory):
""" REPLICATION RULE (CORE): Update a replication rule to quicken the translation from stuck to replicating """
jdoe = InternalAccount('jdoe', vo)
_, tmp_rse_id = rse_factory.make_mock_rse()
rse, rse_id = rse_factory.make_mock_rse()
update_rse(rse_id, {'availability_write': False})
set_local_account_limit(jdoe, rse_id, -1)
files = create_files(3, mock_scope, tmp_rse_id)
dataset1 = 'dataset_' + str(uuid())
add_did(mock_scope, dataset1, DIDType.DATASET, jdoe)
attach_dids(mock_scope, dataset1, files, jdoe)

rule_id = add_rule(dids=[{'scope': mock_scope, 'name': dataset1}], account=jdoe, copies=1, rse_expression=rse, grouping='NONE', weight=None, lifetime=None, locked=False, subscription_id=None, ignore_availability=True)[0]
before_update_rule = {}
for file in files:
for filtered_lock in [lock for lock in get_replica_locks(scope=file['scope'], name=file['name'])]:
assert(filtered_lock['state'] == LockState.STUCK)
before_update_rule[filtered_lock['name']] = filtered_lock['updated_at']
before_update_rule_updated_at = get_rule(rule_id)['updated_at']

update_rule(rule_id, options={'boost_rule': True})

for file in files:
for filtered_lock in [lock for lock in get_replica_locks(scope=file['scope'], name=file['name'])]:
assert(before_update_rule[filtered_lock['name']] > filtered_lock['updated_at'])
assert(before_update_rule_updated_at > get_rule(rule_id)['updated_at'])


@pytest.mark.noparallel(reason='uses pre-defined RSE')
class TestReplicationRuleClient(unittest.TestCase):

Expand Down

0 comments on commit 7ee4fe6

Please sign in to comment.