Skip to content

Commit

Permalink
Ensure atexit runs on SIGTERM
Browse files Browse the repository at this point in the history
It requires a signal handler to run, so we add a blank one (if there isn't one already)
  • Loading branch information
RealOrangeOne committed Aug 9, 2023
1 parent 138847a commit 5667660
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
3 changes: 2 additions & 1 deletion sbot/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .motor_board import MotorBoard
from .power_board import Note, PowerBoard
from .servo_board import ServoBoard
from .utils import obtain_lock, singular
from .utils import ensure_atexit_on_term, obtain_lock, singular

try:
from .mqtt import MQTT_VALID, MQTTClient, get_mqtt_variables
Expand Down Expand Up @@ -58,6 +58,7 @@ def __init__(
self._metadata: Metadata | None = None

setup_logging(debug, trace_logging)
ensure_atexit_on_term()

logger.info(f"SourceBots API v{__version__}")

Expand Down
30 changes: 29 additions & 1 deletion sbot/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from __future__ import annotations

import logging
import signal
import socket
from abc import ABC, abstractmethod
from typing import Any, Mapping, NamedTuple, TypeVar
from types import FrameType
from typing import Any, Mapping, NamedTuple, Optional, TypeVar

from serial.tools.list_ports_common import ListPortInfo

Expand Down Expand Up @@ -199,3 +201,29 @@ def get_USB_identity(port: ListPortInfo) -> BoardIdentity:
logger.warning(
f"Failed to pull identifying information from serial device {port.device}")
return BoardIdentity()


def ensure_atexit_on_term() -> None:
"""
Ensure `atexit` triggers on `SIGTERM`.
> The functions registered via [`atexit`] are not called when the program is
killed by a signal not handled by Python
"""

if signal.getsignal(signal.SIGTERM) != signal.SIG_DFL:
# If a signal handler is already present for SIGTERM,
# this is sufficient for `atexit` to trigger, so do nothing.
return

def handle_signal(handled_signum: int, frame: Optional[FrameType]) -> None:
"""
Handle the given signal by outputting some text and terminating the process.
This will trigger `atexit`.
"""
logger.info(signal.strsignal(handled_signum))
exit(1)

# Add the null-ish signal handler
signal.signal(signal.SIGTERM, handle_signal)
4 changes: 4 additions & 0 deletions tests/test_sbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import socket

import pytest
import signal

from sbot import Robot
from sbot.exceptions import MetadataNotReadyError
Expand Down Expand Up @@ -121,6 +122,9 @@ def test_robot(monkeypatch, caplog) -> None:

assert r.zone == 0

# Check _something_ is configured to handle SIGTERM
assert signal.getsignal(signal.SIGTERM) != signal.SIG_DFL


@pytest.hookimpl(trylast=True)
@pytest.mark.hardware
Expand Down

0 comments on commit 5667660

Please sign in to comment.