Skip to content

Commit

Permalink
Add params and type hints to stop() in daemons
Browse files Browse the repository at this point in the history
The stop() functions used by daemons to gracefully stop had missing
parameters `signum` and `frame`. These parameters are passed
when the daemon receives SIGTERM signal. Since we have no use
for the parameter as of now, we have set them to `None` by default.
Also type-hints are added to the parameters and to the return type.
Fixes rucio#6075

Signed-off-by: Soumik Dutta <shalearkane@gmail.com>
  • Loading branch information
shalearkane committed Mar 22, 2023
1 parent bc71856 commit c2baa16
Show file tree
Hide file tree
Showing 34 changed files with 907 additions and 38 deletions.
7 changes: 6 additions & 1 deletion lib/rucio/daemons/abacus/account.py
Expand Up @@ -20,6 +20,7 @@
import logging
import threading
import time
from typing import TYPE_CHECKING

import rucio.db.sqla.util
from rucio.common import exception
Expand All @@ -28,6 +29,10 @@
from rucio.core.account_counter import get_updated_account_counters, update_account_counter, fill_account_counter_history_table
from rucio.daemons.common import run_daemon

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()


Expand Down Expand Up @@ -68,7 +73,7 @@ def run_once(heartbeat_handler, **_kwargs):
logger(logging.DEBUG, 'update of account-rse counter "%s-%s" took %f' % (account_rse_id[0], account_rse_id[1], time.time() - start_time))


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/abacus/collection_replica.py
Expand Up @@ -20,13 +20,18 @@
import logging
import threading
import time
from typing import TYPE_CHECKING

import rucio.db.sqla.util
from rucio.common import exception
from rucio.common.logging import setup_logging
from rucio.core.replica import get_cleaned_updated_collection_replicas, update_collection_replica
from rucio.daemons.common import run_daemon

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()


Expand Down Expand Up @@ -77,7 +82,7 @@ def run_once(heartbeat_handler, limit, **_kwargs):
return must_sleep


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/abacus/rse.py
Expand Up @@ -20,6 +20,7 @@
import logging
import threading
import time
from typing import TYPE_CHECKING

import rucio.db.sqla.util
from rucio.common import exception
Expand All @@ -28,6 +29,10 @@
from rucio.core.rse_counter import get_updated_rse_counters, update_rse_counter, fill_rse_counter_history_table
from rucio.daemons.common import run_daemon

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()


Expand Down Expand Up @@ -69,7 +74,7 @@ def run_once(heartbeat_handler, **_kwargs):
logger(logging.DEBUG, 'update of rse "%s" took %f' % (rse_id, time.time() - start_time))


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
4 changes: 3 additions & 1 deletion lib/rucio/daemons/atropos/atropos.py
Expand Up @@ -37,6 +37,8 @@

if TYPE_CHECKING:
from rucio.daemons.common import HeartbeatHandler
from types import FrameType
from typing import Optional

GRACEFUL_STOP = threading.Event()

Expand Down Expand Up @@ -233,7 +235,7 @@ def run(
thread_list = [t.join(timeout=3.14) for t in thread_list if t and t.is_alive()]


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
5 changes: 4 additions & 1 deletion lib/rucio/daemons/automatix/automatix.py
Expand Up @@ -40,6 +40,9 @@


if TYPE_CHECKING:
from types import FrameType
from typing import Optional

from rucio.daemons.common import HeartbeatHandler

METRICS = MetricManager(module=__name__)
Expand Down Expand Up @@ -272,7 +275,7 @@ def run(
[thread.join(timeout=3.14) for thread in threads]


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
5 changes: 4 additions & 1 deletion lib/rucio/daemons/badreplicas/minos.py
Expand Up @@ -40,6 +40,9 @@


if TYPE_CHECKING:
from types import FrameType
from typing import Optional

from rucio.daemons.common import HeartbeatHandler
from rucio.common.types import InternalAccount

Expand Down Expand Up @@ -302,7 +305,7 @@ def run(threads: int = 1, bulk: int = 100, once: bool = False, sleep_time: int =
thread_list = [thread.join(timeout=3.14) for thread in thread_list if thread and thread.is_alive()]


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
5 changes: 4 additions & 1 deletion lib/rucio/daemons/badreplicas/minos_temporary_expiration.py
Expand Up @@ -37,6 +37,9 @@


if TYPE_CHECKING:
from types import FrameType
from typing import Optional

from rucio.daemons.common import HeartbeatHandler

graceful_stop = threading.Event()
Expand Down Expand Up @@ -164,7 +167,7 @@ def run(threads: int = 1, bulk: int = 100, once: bool = False, sleep_time: int =
thread_list = [thread.join(timeout=3.14) for thread in thread_list if thread and thread.is_alive()]


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
5 changes: 4 additions & 1 deletion lib/rucio/daemons/badreplicas/necromancer.py
Expand Up @@ -38,6 +38,9 @@
from rucio.daemons.common import run_daemon

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

from rucio.daemons.common import HeartbeatHandler

graceful_stop = threading.Event()
Expand Down Expand Up @@ -191,7 +194,7 @@ def run(threads: int = 1, bulk: int = 100, once: bool = False, sleep_time: int =
thread_list = [thread.join(timeout=3.14) for thread in thread_list if thread and thread.is_alive()]


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
5 changes: 4 additions & 1 deletion lib/rucio/daemons/bb8/bb8.py
Expand Up @@ -34,6 +34,9 @@


if TYPE_CHECKING:
from types import FrameType
from typing import Optional

from rucio.daemons.common import HeartbeatHandler

graceful_stop = threading.Event()
Expand Down Expand Up @@ -311,7 +314,7 @@ def run_once(
return must_sleep


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/c3po/c3po.py
Expand Up @@ -24,6 +24,7 @@
from threading import Event, Thread
from time import sleep
from uuid import uuid4
from typing import TYPE_CHECKING

from requests import post
from requests.auth import HTTPBasicAuth
Expand All @@ -41,6 +42,10 @@

from queue import Queue

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

GRACEFUL_STOP = Event()


Expand Down Expand Up @@ -249,7 +254,7 @@ def place_replica(once=False,
logging.critical(e)


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/cache/consumer.py
Expand Up @@ -22,6 +22,7 @@
import threading
import time
from traceback import format_exc
from typing import TYPE_CHECKING

import rucio.db.sqla.util
from rucio.common import exception
Expand All @@ -33,6 +34,10 @@
from rucio.core.rse import get_rse_id
from rucio.core.volatile_replica import add_volatile_replicas, delete_volatile_replicas

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

logging.getLogger("stomp").setLevel(logging.CRITICAL)

METRICS = MetricManager(module=__name__)
Expand Down Expand Up @@ -167,7 +172,7 @@ def consumer(id_, num_thread=1):
logger(logging.INFO, 'graceful stop done')


def stop():
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/conveyor/finisher.py
Expand Up @@ -23,6 +23,7 @@
import os
import re
import threading
from typing import TYPE_CHECKING

from dogpile.cache.api import NoValue
from sqlalchemy.exc import DatabaseError
Expand All @@ -45,6 +46,10 @@

from urllib.parse import urlparse

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()

region = make_region_memcached(expiration_time=900)
Expand Down Expand Up @@ -139,7 +144,7 @@ def finisher(once=False, sleep_time=60, activities=None, bulk=100, db_bulk=1000,
)


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/conveyor/poller.py
Expand Up @@ -26,6 +26,7 @@
import threading
import time
from itertools import groupby
from typing import TYPE_CHECKING

from requests.exceptions import RequestException
from configparser import NoOptionError
Expand All @@ -45,6 +46,10 @@
from rucio.transfertool.fts3 import FTS3Transfertool
from rucio.transfertool.globus import GlobusTransferTool

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

graceful_stop = threading.Event()
METRICS = MetricManager(module=__name__)

Expand Down Expand Up @@ -162,7 +167,7 @@ def poller(once=False, activities=None, sleep_time=60,
)


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/daemons/conveyor/preparer.py
Expand Up @@ -39,7 +39,7 @@
graceful_stop = threading.Event()


def stop():
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/conveyor/receiver.py
Expand Up @@ -23,6 +23,7 @@
import threading
import time
import traceback
from typing import TYPE_CHECKING

import stomp

Expand All @@ -37,6 +38,10 @@
from rucio.db.sqla.session import transactional_session
from rucio.transfertool.fts3 import FTS3CompletionMessageTransferStatusReport

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

logging.getLogger("stomp").setLevel(logging.CRITICAL)

METRICS = MetricManager(module=__name__)
Expand Down Expand Up @@ -175,7 +180,7 @@ def receiver(id_, total_threads=1, all_vos=False):
pass


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/rucio/daemons/conveyor/stager.py
Expand Up @@ -20,6 +20,7 @@
import functools
import logging
import threading
from typing import TYPE_CHECKING

from configparser import NoOptionError

Expand All @@ -34,6 +35,10 @@
from rucio.db.sqla.constants import RequestType
from rucio.transfertool.fts3 import FTS3Transfertool

if TYPE_CHECKING:
from types import FrameType
from typing import Optional

METRICS = MetricManager(module=__name__)
graceful_stop = threading.Event()

Expand Down Expand Up @@ -156,7 +161,7 @@ def stager(once=False, rses=None, bulk=100, group_bulk=1, group_policy='rule',
)


def stop(signum=None, frame=None):
def stop(signum: "Optional[int]" = None, frame: "Optional[FrameType]" = None) -> None:
"""
Graceful exit.
"""
Expand Down

0 comments on commit c2baa16

Please sign in to comment.