diff --git a/lib/rucio/daemons/tracer/kronos.py b/lib/rucio/daemons/tracer/kronos.py index 758ff6b6eb..0d21201614 100644 --- a/lib/rucio/daemons/tracer/kronos.py +++ b/lib/rucio/daemons/tracer/kronos.py @@ -19,23 +19,22 @@ import logging import re -import socket from configparser import NoOptionError, NoSectionError from datetime import datetime +import functools from json import loads as jloads, dumps as jdumps -from os import getpid -from threading import Event, Thread, current_thread -from time import sleep, time +from threading import Event, Thread +from time import time +from typing import Dict import rucio.db.sqla.util +from rucio.daemons.common import HeartbeatHandler, run_daemon from rucio.common.config import config_get, config_get_bool, config_get_int from rucio.common.exception import RSENotFound, DatabaseException -from rucio.common.logging import setup_logging, formatted_logger +from rucio.common.logging import setup_logging from rucio.common.stomp_utils import get_stomp_brokers from rucio.common.types import InternalAccount, InternalScope -from rucio.common.utils import daemon_sleep from rucio.core.did import touch_dids, list_parent_dids -from rucio.core.heartbeat import live, die, sanity_check from rucio.core.lock import touch_dataset_locks from rucio.core.monitor import record_counter, record_timer from rucio.core.replica import touch_replica, touch_collection_replicas, declare_bad_file_replicas @@ -54,6 +53,8 @@ class AMQConsumer(object): + """ActiveMQ message consumer""" + def __init__(self, broker, conn, queue, chunksize, subscription_id, excluded_usrdns, dataset_queue, bad_files_patterns, logger=logging.log): self.__broker = broker self.__conn = conn @@ -295,21 +296,42 @@ def __update_atime(self): record_counter('daemons.tracer.kronos.updated_replicas') -def kronos_file(thread=0, dataset_queue=None, sleep_time=60): +def kronos_file(once: bool = False, dataset_queue: Queue = None, sleep_time: int = 60): """ Main loop to consume tracer reports. """ + return_values = {} + run_daemon( + once=once, + graceful_stop=graceful_stop, + executable='kronos-file', + logger_prefix='kronos-file', + partition_wait_time=1, + sleep_time=sleep_time, + run_once_fnc=functools.partial( + run_once_kronos_file, + return_values=return_values, + dataset_queue=dataset_queue, + sleep_time=sleep_time, + ) + ) + for conn in return_values['conns']: + try: + conn.disconnect() + except Exception: + pass - logging.info('kronos_file[%i/?] starting', thread) - executable = 'kronos-file' - hostname = socket.gethostname() - pid = getpid() - hb_thread = current_thread() +def run_once_kronos_file(heartbeat_handler: HeartbeatHandler, return_values: Dict, dataset_queue: Queue, sleep_time: int, **kwargs): + """ + Run the amq consumer once. + """ + _, _, logger = heartbeat_handler.live() chunksize = config_get_int('tracer-kronos', 'chunksize') prefetch_size = config_get_int('tracer-kronos', 'prefetch_size') subscription_id = config_get('tracer-kronos', 'subscription_id') + # Load bad file patterns from config try: bad_files_patterns = [] pattern = config_get(section='kronos', option='bad_files_patterns', session=None) @@ -320,15 +342,12 @@ def kronos_file(thread=0, dataset_queue=None, sleep_time=60): except (NoOptionError, NoSectionError, RuntimeError): bad_files_patterns = [] except Exception as error: - logging.log(logging.ERROR, 'kronos_file[%i/?] Failed to get bad_file_patterns %s', thread, str(error)) + logger.error(f'Failed to get bad_file_patterns {str(error)}') bad_files_patterns = [] - - use_ssl = True try: use_ssl = config_get_bool('tracer-kronos', 'use_ssl') except Exception: pass - if not use_ssl: username = config_get('tracer-kronos', 'username') password = config_get('tracer-kronos', 'password') @@ -341,85 +360,63 @@ def kronos_file(thread=0, dataset_queue=None, sleep_time=60): reconnect_attempts = config_get_int('tracer-kronos', 'reconnect_attempts') ssl_key_file = config_get('tracer-kronos', 'ssl_key_file', raise_exception=False) ssl_cert_file = config_get('tracer-kronos', 'ssl_cert_file', raise_exception=False) - - sanity_check(executable=executable, hostname=hostname) - while not graceful_stop.is_set(): - start_time = time() - heart_beat = live(executable, hostname, pid, hb_thread) - prepend_str = 'kronos-file[%i/%i] ' % (heart_beat['assign_thread'], heart_beat['nr_threads']) - logger = formatted_logger(logging.log, prepend_str + '%s') - conns = get_stomp_brokers(brokers=brokers_alias, - port=port, - use_ssl=use_ssl, - vhost=vhost, - reconnect_attempts=reconnect_attempts, - ssl_key_file=ssl_key_file, - ssl_cert_file=ssl_cert_file, - timeout=sleep_time, - logger=logger) - for conn in conns: - if not conn.is_connected(): - logger(logging.INFO, 'connecting to %s' % str(conn.transport._Transport__host_and_ports[0])) - record_counter('daemons.tracer.kronos.reconnect.{host}', labels={'host': conn.transport._Transport__host_and_ports[0][0]}) - conn.set_listener('rucio-tracer-kronos', AMQConsumer(broker=conn.transport._Transport__host_and_ports[0], - conn=conn, - queue=config_get('tracer-kronos', 'queue'), - chunksize=chunksize, - subscription_id=subscription_id, - excluded_usrdns=excluded_usrdns, - dataset_queue=dataset_queue, - bad_files_patterns=bad_files_patterns, - logger=logger)) - if not use_ssl: - conn.connect(username, password) - else: - conn.connect() - conn.subscribe(destination=config_get('tracer-kronos', 'queue'), ack='client-individual', id=subscription_id, headers={'activemq.prefetchSize': prefetch_size}) - - tottime = time() - start_time - if tottime < sleep_time: - logger(logging.INFO, 'Will sleep for %s seconds' % (sleep_time - tottime)) - sleep(sleep_time - tottime) - - logger(logging.INFO, 'graceful stop requested') + conns = return_values['conns'] = get_stomp_brokers( + brokers=brokers_alias, + port=port, + use_ssl=use_ssl, + vhost=vhost, + reconnect_attempts=reconnect_attempts, + ssl_key_file=ssl_key_file, + ssl_cert_file=ssl_cert_file, + timeout=sleep_time, + logger=logger + ) for conn in conns: - try: - conn.disconnect() - except Exception: - pass - - die(executable=executable, hostname=hostname, pid=pid, thread=thread) - logger(logging.INFO, 'graceful stop done') - + if not conn.is_connected(): + logger(logging.INFO, 'connecting to %s' % str(conn.transport._Transport__host_and_ports[0])) + record_counter('daemons.tracer.kronos.reconnect.{host}', labels={'host': conn.transport._Transport__host_and_ports[0][0]}) + conn.set_listener('rucio-tracer-kronos', AMQConsumer(broker=conn.transport._Transport__host_and_ports[0], + conn=conn, + queue=config_get('tracer-kronos', 'queue'), + chunksize=chunksize, + subscription_id=subscription_id, + excluded_usrdns=excluded_usrdns, + dataset_queue=dataset_queue, + bad_files_patterns=bad_files_patterns, + logger=logger)) + if not use_ssl: + conn.connect(username, password) + else: + conn.connect() + conn.subscribe(destination=config_get('tracer-kronos', 'queue'), ack='client-individual', id=subscription_id, headers={'activemq.prefetchSize': prefetch_size}) + + +def kronos_dataset(once: bool = False, dataset_queue: Queue = None, sleep_time: int = 60): + run_daemon( + once=once, + graceful_stop=graceful_stop, + executable='kronos-dataset', + logger_prefix='kronos-dataset', + partition_wait_time=1, + sleep_time=sleep_time, + run_once_fnc=functools.partial( + run_once_kronos_dataset, + dataset_queue=dataset_queue, + ) + ) -def kronos_dataset(thread=0, dataset_queue=None, sleep_time=60): - logging.info('kronos-dataset[%d/?] starting', thread) + # once again for the backlog + __update_datasets(dataset_queue) - executable = 'kronos-dataset' - hostname = socket.gethostname() - pid = getpid() - hb_thread = current_thread() +def run_once_kronos_dataset(heartbeat_handler: HeartbeatHandler, dataset_queue: Queue, **kwargs): + _, _, logger = heartbeat_handler.live() dataset_wait = config_get_int('tracer-kronos', 'dataset_wait') start = datetime.now() - sanity_check(executable=executable, hostname=hostname) - while not graceful_stop.is_set(): - start_time = time() - heart_beat = live(executable, hostname, pid, hb_thread) - prepend_str = 'kronos-dataset[%i/%i] ' % (heart_beat['assign_thread'], heart_beat['nr_threads']) - logger = formatted_logger(logging.log, prepend_str + '%s') - if (datetime.now() - start).seconds > dataset_wait: - __update_datasets(dataset_queue, logger=logger) - start = datetime.now() - - daemon_sleep(start_time=start_time, sleep_time=sleep_time, graceful_stop=graceful_stop) - - die(executable=executable, hostname=hostname, pid=pid, thread=thread) - - # once again for the backlog - logger(logging.INFO, 'cleaning dataset backlog before shutdown...') - __update_datasets(dataset_queue) + if (datetime.now() - start).seconds > dataset_wait: + __update_datasets(dataset_queue=dataset_queue, logger=logger) + start = datetime.now() def __update_datasets(dataset_queue, logger=logging.log): @@ -493,7 +490,7 @@ def stop(signum=None, frame=None): graceful_stop.set() -def run(threads=1, sleep_time_datasets=60, sleep_time_files=60): +def run(once=False, threads=1, sleep_time_datasets=60, sleep_time_files=60): """ Starts up the consumer threads """ @@ -506,11 +503,11 @@ def run(threads=1, sleep_time_datasets=60, sleep_time_files=60): logging.info('starting tracer consumer threads') thread_list = [] - for thread in range(0, threads): - thread_list.append(Thread(target=kronos_file, kwargs={'thread': thread, + for _ in range(0, threads): + thread_list.append(Thread(target=kronos_file, kwargs={'once': once, 'sleep_time': sleep_time_files, 'dataset_queue': dataset_queue})) - thread_list.append(Thread(target=kronos_dataset, kwargs={'thread': thread, + thread_list.append(Thread(target=kronos_dataset, kwargs={'once': once, 'sleep_time': sleep_time_datasets, 'dataset_queue': dataset_queue}))