Skip to content

Commit

Permalink
Transfers: Plugins for interacting with fts3 'archive_metdata' field r…
Browse files Browse the repository at this point in the history
…ucio#6398

Changes:
	* Introduce fts3_plugin class, a subclass of PolicyPlugins
	* Include collocation wrapper, prority placement algorithm for
	  activity based collocation/placement
	* Typing for fts3_transfertool in part
  • Loading branch information
voetberg committed Mar 21, 2024
1 parent 093092d commit 0de7aa0
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 65 deletions.
8 changes: 3 additions & 5 deletions lib/rucio/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from functools import partial, wraps
from io import StringIO
from itertools import zip_longest
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Callable, Optional
from urllib.parse import urlparse, urlencode, quote, parse_qsl, urlunparse
from uuid import uuid4 as uuid
from xml.etree import ElementTree
Expand All @@ -65,9 +65,7 @@
EXTRA_MODULES['paramiko'] = False

if TYPE_CHECKING:
from collections.abc import Callable
from typing import TypeVar, Optional

from typing import TypeVar
T = TypeVar('T')


Expand Down Expand Up @@ -2183,7 +2181,7 @@ def _wrapper(*args, **kwargs):


def deep_merge_dict(source: dict, destination: dict) -> dict:
"""Merge two dictionaries together recurively"""
"""Merge two dictionaries together recursively"""
for key, value in source.items():
if isinstance(value, dict):
# get node or create one
Expand Down
50 changes: 32 additions & 18 deletions lib/rucio/transfertool/fts3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from collections.abc import Callable
from configparser import NoOptionError, NoSectionError
from json import loads
from typing import Any, Optional, TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Optional
from urllib.parse import urlparse

import requests
Expand All @@ -43,11 +43,12 @@
get_rse_supported_checksums_from_attributes)
from rucio.db.sqla.constants import RequestState
from rucio.transfertool.transfertool import Transfertool, TransferToolBuilder, TransferStatusReport
from rucio.transfertool.fts3_plugins import FTS3MetadataPlugin
from rucio.transfertool.fts3_plugins import FTS3TapeMetadataPlugin

if TYPE_CHECKING:
from rucio.core.request import DirectTransfer
from rucio.core.rse import RseData
from sqlalchemy.orm import Session

logging.getLogger("requests").setLevel(logging.CRITICAL)
disable_warnings()
Expand Down Expand Up @@ -309,7 +310,12 @@ def _use_tokens(transfer_hop: "DirectTransfer"):
return True


def build_job_params(transfer_path, bring_online, default_lifetime, archive_timeout_override, max_time_in_queue, logger):
def build_job_params(
transfer_path: list["DirectTransfer"],
bring_online: Optional[int] = None,
default_lifetime: Optional[int] = None,
archive_timeout_override: Optional[int] = None,
max_time_in_queue: Optional[dict] = None, logger: Callable = logging.log) -> dict[str, Any]:
"""
Prepare the job parameters which will be passed to FTS transfertool
"""
Expand Down Expand Up @@ -403,8 +409,16 @@ def build_job_params(transfer_path, bring_online, default_lifetime, archive_time
return job_params


def bulk_group_transfers(transfer_paths, policy='rule', group_bulk=200, source_strategy=None, max_time_in_queue=None,
logger=logging.log, archive_timeout_override=None, bring_online=None, default_lifetime=None):
def bulk_group_transfers(
transfer_paths: list[list["DirectTransfer"]],
policy: str = 'rule',
group_bulk: int = 200,
source_strategy: Optional[str] = None,
max_time_in_queue: Optional[dict] = None,
logger: Callable = logging.log,
archive_timeout_override: Optional[int] = None,
bring_online: Optional[int] = None,
default_lifetime: Optional[int] = None) -> list[dict[str, Any]]:
"""
Group transfers in bulk based on certain criterias
Expand Down Expand Up @@ -495,7 +509,7 @@ class Fts3TransferStatusReport(TransferStatusReport):
'attributes',
]

def __init__(self, external_host, request_id, request=None):
def __init__(self, external_host: str, request_id: str, request: Optional[dict] = None):
super().__init__(request_id, request=request)
self.external_host = external_host

Expand Down Expand Up @@ -524,10 +538,10 @@ def __str__(self):
return f'Transfer {self._transfer_id} of {self._file_metadata["scope"]}:{self._file_metadata["name"]} ' \
f'{self._file_metadata["src_rse"]} --({self._file_metadata["request_id"]})-> {self._file_metadata["dst_rse"]}'

def initialize(self, session, logger=logging.log):
def initialize(self, session: "Session", logger: Callable = logging.log) -> None:
raise NotImplementedError(f"{self.__class__.__name__} is abstract and shouldn't be used directly")

def get_monitor_msg_fields(self, session, logger=logging.log):
def get_monitor_msg_fields(self, session: "Session", logger: Callable = logging.log) -> dict[str, Any]:
self.ensure_initialized(session, logger)
fields = {
'transfer_link': self._transfer_link(),
Expand All @@ -546,7 +560,7 @@ def get_monitor_msg_fields(self, session, logger=logging.log):
def _transfer_link(self):
return '%s/fts3/ftsmon/#/job/%s' % (self._fts_address.replace('8446', '8449'), self._transfer_id)

def _find_attribute_updates(self, request, new_state, reason, overwrite_corrupted_files):
def _find_attribute_updates(self, request: dict, new_state: RequestState, reason: str, overwrite_corrupted_files: Optional[bool] = None) -> Optional[dict[str, Any]]:
attributes = None
if new_state == RequestState.FAILED and 'Destination file exists and overwrite is not enabled' in (reason or ''):
dst_file = self._file_metadata.get('dst_file', {})
Expand All @@ -556,7 +570,7 @@ def _find_attribute_updates(self, request, new_state, reason, overwrite_corrupte
attributes['overwrite'] = True
return attributes

def _find_used_source_rse(self, session, logger):
def _find_used_source_rse(self, session: "Session", logger: Callable) -> tuple[Optional[str], Optional[str]]:
"""
For multi-source transfers, FTS has a choice between multiple sources.
Find which of the possible sources FTS actually used for the transfer.
Expand All @@ -574,7 +588,7 @@ def _find_used_source_rse(self, session, logger):
return meta_rse_name, meta_rse_id

@staticmethod
def _dst_file_set_and_file_corrupted(request, dst_file):
def _dst_file_set_and_file_corrupted(request: dict, dst_file: dict) -> bool:
"""
Returns True if the `dst_file` dict returned by fts was filled and its content allows to
affirm that the file is corrupted.
Expand All @@ -587,7 +601,7 @@ def _dst_file_set_and_file_corrupted(request, dst_file):
return False

@staticmethod
def _dst_file_set_and_file_correct(request, dst_file):
def _dst_file_set_and_file_correct(request: dict, dst_file: dict) -> bool:
"""
Returns True if the `dst_file` dict returned by fts was filled and its content allows to
affirm that the file is correct.
Expand Down Expand Up @@ -644,7 +658,7 @@ def __init__(self, external_host, request_id, fts_message):
self._src_url = fts_message.get('src_url', None)
self._dst_url = fts_message.get('dst_url', None)

def initialize(self, session, logger=logging.log):
def initialize(self, session: "Session", logger: Callable = logging.log) -> None:

fts_message = self.fts_message
request_id = self.request_id
Expand Down Expand Up @@ -716,7 +730,7 @@ def __init__(self, external_host, request_id, job_response, file_response, reque
self._dst_url = file_response.get('dest_surl', None)
self.logger = logging.log

def initialize(self, session, logger=logging.log):
def initialize(self, session: "Session", logger=logging.log) -> None:

self.logger = logger
job_response = self.job_response
Expand Down Expand Up @@ -813,8 +827,8 @@ def __init__(self, external_host, oidc_account=None, oidc_support: bool = False,
self.default_lifetime = default_lifetime
self.archive_timeout_override = archive_timeout_override

plugins = config_get_list("transfers", "plugins", False, "[]")
self.plugins = [FTS3MetadataPlugin(plugin.strip(" ")) for plugin in plugins]
tape_plugins = config_get_list("transfers", "fts3tape_metadata_plugins", False, "[]")
self.tape_metadata_plugins = [FTS3TapeMetadataPlugin(plugin.strip(" ")) for plugin in tape_plugins]

self.token = None
if oidc_support:
Expand Down Expand Up @@ -955,7 +969,7 @@ def _file_from_transfer(self, transfer, job_params):
if isinstance(activity_id, int):
t_file['scitag'] = self.scitags_exp_id << 6 | activity_id

for plugin in self.plugins:
for plugin in self.tape_metadata_plugins:
plugin_hints = plugin.hints(t_file['metadata'])

t_file = deep_merge_dict(source=plugin_hints, destination=t_file)
Expand Down Expand Up @@ -1478,7 +1492,7 @@ def __bulk_query_responses(self, jobs_response, requests_by_eid):
job_response['http_message'] if 'http_message' in job_response else None))
return responses

def __query_details(self, transfer_id):
def __query_details(self, transfer_id: Optional[dict[str, Any]]) -> Optional[str]:
"""
Query the detailed status of a transfer in FTS3 via JSON.
Expand Down
55 changes: 22 additions & 33 deletions lib/rucio/transfertool/fts3_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,35 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Callable, Optional
import json
import sys
from configparser import NoSectionError
from typing import Any, Callable, Optional

from rucio.core.plugins import PolicyPackageAlgorithms
from rucio.common.config import config_get_int, config_get_items
from configparser import NoSectionError

from rucio.common.exception import InvalidRequest


class FTS3MetadataPlugin(PolicyPackageAlgorithms):
class FTS3TapeMetadataPlugin(PolicyPackageAlgorithms):
"""
Add a "archive_metadata" field to a file's transfer parameters.
Plugins are registered during initialization and called during a transfer with FTS3
"""

ALGORITHM_NAME = "fts3_plugins"
HINTS_NAME = "fts3_plugins_init"
ALGORITHM_NAME = "fts3_tape_metadata_plugins"
_HINTS_NAME = "fts3_plugins_init"
DEFAULT = "def"

def __init__(self, policy_algorithm: str):
def __init__(self, policy_algorithm: str) -> None:
"""
:param policy_algorithm: policy algorithm indentifier - choose from any of the policy package algorithms registered under the `fts3_plugins` group.
:type policy_algorithm: str
:param policy_algorithm: policy algorithm indentifier - choose from any of the policy package algorithms registered under the `fts3_tape_metadata_plugins` group.
"""
super().__init__()
self.register("activity", func=self._activity_hints, init_func=self._init_activity_hints)
self.register("activity", func=self._activity_hints, init_func=self._init_instance_activity_hints)
self.register(self.DEFAULT, func=lambda x: self._collocation(self._default, x))
self.register("cms_collocation", func=lambda x: self._collocation(self._cms_collocation, x))
self.register("test", func=lambda x: self._collocation(self._test_collocation, x))

self.transfer_limit = config_get_int(
Expand All @@ -58,8 +56,8 @@ def __init__(self, policy_algorithm: str):
policy_algorithm = self.DEFAULT

# If the policy has a supplied and registered init function
if self._supports(self.HINTS_NAME, policy_algorithm):
self._get_one_algorithm(self.HINTS_NAME, name=policy_algorithm)()
if self._supports(self._HINTS_NAME, policy_algorithm):
self._get_one_algorithm(self._HINTS_NAME, name=policy_algorithm)()

self.set_in_hints = self._get_one_algorithm(self.ALGORITHM_NAME, name=policy_algorithm)

Expand All @@ -69,24 +67,24 @@ def register(cls, name: str, func: Callable, init_func: Optional[Callable] = Non
Register a fts3 transfer plugin
:param name: name to register under
:type name: str
:param func: function called by the plugin
:type func: Callable
:param init_func: Initialization requirements for the plugin, defaults to None
:type init_func: Optional[Callable], optional
"""
super()._register(cls.ALGORITHM_NAME, algorithm_dict={name: func})
if init_func is not None:
super()._register(cls.HINTS_NAME, algorithm_dict={name: init_func})
super()._register(cls._HINTS_NAME, algorithm_dict={name: init_func})

def _init_activity_hints(self):
def _init_instance_activity_hints(self) -> None:
"""
Load prorities for activities from the config
"""
try:
self.prority_table = dict(config_get_items("tape_priority"))
except NoSectionError:
self.prority_table = {}

def _activity_hints(self, activity_kwargs: dict[str, str], default_prority: str = '20') -> dict[str, dict]:
""" Activity Hints - assign a prorioty based on activity"""
""" Activity Hints - assign a priority based on activity"""
if "activity" in activity_kwargs:
activity = activity_kwargs["activity"].lower()

Expand All @@ -98,30 +96,23 @@ def _activity_hints(self, activity_kwargs: dict[str, str], default_prority: str

return {"scheduling_hints": {"priority": priority}}

def _collocation(self, collocation_func: Callable, hints: dict) -> dict[str, dict]:
def _collocation(self, collocation_func: Callable, hints: dict[str, Any]) -> dict[str, dict]:
"""
Wraps a 'collacation' style plugin for formatting
:param collocation_func: Function that defines the collocation rules
:type collocation_func: Callable
:param hints: kwargs utilized by the collocation rules
:type hints: dict
:return: Collocation hints produced by the collocation_func, wrapped
:rtype: dict
"""
return {"collocation_hints": collocation_func(*hints)}

def _test_collocation(self, *hint: dict) -> dict:
def _test_collocation(self, *hint: dict) -> dict[str, Any]:
return {"0": "", "1": "", "2": "", "3": ""}

def _default(self, *hints: dict) -> dict:
return {}

def _cms_collocation(self, *hints: dict) -> None:
# Placeholder - should not be used
raise NotImplementedError

def _verify_in_format(self, hint_dict: dict) -> None:
def _verify_in_format(self, hint_dict: dict[str, Any]) -> None:
"""Check the to-be-submitted file transfer params are both json encodable and under the size limit for transfer"""
try:
hints_json = json.dumps(hint_dict)
Expand All @@ -134,19 +125,17 @@ def _verify_in_format(self, hint_dict: dict) -> None:
f"Request too large, decrease to less than {self.transfer_limit}", e
)

def hints(self, hint_kwargs: dict) -> dict:
def hints(self, hint_kwargs: dict) -> dict[str, Any]:
"""
Produce "archive_metadata" hints for how a transfer should be executed by fts3.
:param hint_kwargs: Args passed forward to the plugin algorithm
:type hint_kwargs: dict
:return: Archiving metadata in the format {archive_metadata: {<plugin produced hints>}}
:rtype: dict
"""
hints = self.set_in_hints(hint_kwargs)
self._verify_in_format(hints)
return {"archive_metadata": hints}


# Register the policies
FTS3MetadataPlugin("")
FTS3TapeMetadataPlugin("")
Loading

0 comments on commit 0de7aa0

Please sign in to comment.