Skip to content

Commit

Permalink
Merge 3a8ac3f into 12bc05e
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Nov 11, 2021
2 parents 12bc05e + 3a8ac3f commit edc61d6
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 132 deletions.
115 changes: 28 additions & 87 deletions aiomisc/service/sdwatchdog.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
import logging
import os
import socket
from collections import deque
from typing import Any, Deque, Optional, Tuple, Union
from typing import Any, Optional, Tuple

from aiomisc.entrypoint import entrypoint
from aiomisc.periodic import PeriodicCallback
Expand All @@ -14,101 +12,43 @@
log = logging.getLogger(__name__)


class AsyncUDPSocket:

__slots__ = (
"__address",
"__lock",
"__loop",
"__sock",
"__write_queue",
"__writer_added",
)

def __init__(
self, address: Union[tuple, str, bytes],
loop: asyncio.AbstractEventLoop,
):
self.__loop = loop
self.__sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.__sock.setblocking(False)
self.__sock.connect(address)
self.__writer_added = False
self.__write_queue: Deque[Tuple[bytes, asyncio.Future]] = deque()

async def sendall(self, data: str) -> bool:
future = self.__loop.create_future()
self.__write_queue.append((data.encode(), future))

if not self.__writer_added:
self.__loop.add_writer(self.__sock.fileno(), self.__sender)
self.__writer_added = True
return await future

def __sender(self) -> None:
if not self.__write_queue:
self.__loop.remove_writer(self.__sock.fileno())
self.__writer_added = False
return

data, future = self.__write_queue[0]

try:
self.__sock.sendall(data)
except (BlockingIOError, InterruptedError):
return
except BaseException as exc:
self.__abort(exc)
else:
self.__write_queue.popleft()
future.set_result(True)

def __abort(self, exc: BaseException) -> None:
for future in (f for _, f in self.__write_queue if not f.done()):
future.set_exception(exc)

self.close()

def close(self) -> None:
self.__loop.remove_writer(self.__sock.fileno())
self.__sock.close()

for future in (f for _, f in self.__write_queue if not f.done()):
future.set_exception(ConnectionError("Connection closed"))

self.__write_queue.clear()


def _get_watchdog_interval() -> Optional[TimeoutType]:
value = os.getenv("WATCHDOG_USEC")
if value is None:
return None
# Send notifications twice as often
return int(value) / 1000000. / 2
return int(value) / 1000000.


WATCHDOG_INTERVAL: Optional[TimeoutType] = _get_watchdog_interval()


class SDWatchdogService(Service):
socket: AsyncUDPSocket
socket: socket.socket
watchdog_interval: Optional[TimeoutType]
_watchdog_timer: PeriodicCallback

async def _send(self, payload: str) -> None:
try:
await self.loop.sock_sendall(
self.socket,
payload.encode(),
)
except (ConnectionError, OSError) as e:
log.warning("SystemD notify socket communication problem: %r", e)

async def _post_start(
self, services: Tuple[Service, ...], **__: Any
) -> None:
if not hasattr(self, "socket"):
return
await self.socket.sendall(
"STATUS=Started {} services".format(len(services)),
)
await self.socket.sendall("READY=1")

await self._send(f"STATUS=Started {len(services)} services")
await self._send("READY=1")

async def _pre_stop(self, *_: Any, **__: Any) -> None:
if not hasattr(self, "socket"):
return
await self.socket.sendall("STOPPING=1")
await self._send("STOPPING=1")

def __init__(
self, *, watchdog_interval: Optional[TimeoutType] = WATCHDOG_INTERVAL,
Expand All @@ -119,7 +59,8 @@ def __init__(
entrypoint.PRE_STOP.connect(self._pre_stop)
super().__init__(**kwargs)

def _get_socket_addr(self) -> Optional[str]:
@staticmethod
def _get_socket_addr() -> Optional[str]:
addr = os.getenv("NOTIFY_SOCKET")
if addr is None:
return None
Expand All @@ -137,25 +78,25 @@ async def start(self) -> None:
)
return None

self.socket = AsyncUDPSocket(addr, loop=self.loop)
await self.socket.sendall("STATUS=starting")
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.socket.connect(addr)
self.socket.setblocking(False)

await self._send("STATUS=starting")

if self.watchdog_interval is None:
return

if self.watchdog_interval != WATCHDOG_INTERVAL:
await self.socket.sendall(
"WATCHDOG_USEC={}".format(
int(self.watchdog_interval * 1000000),
),
)
watchdog_usec = int(self.watchdog_interval * 1000000)
await self._send(f"WATCHDOG_USEC={watchdog_usec}")

self.start_event.set()
self._watchdog_timer = PeriodicCallback(
self.socket.sendall,
"WATCHDOG=1",
self._send, "WATCHDOG=1",
)
self._watchdog_timer.start(self.watchdog_interval)
# Send notifications twice as often
self._watchdog_timer.start(self.watchdog_interval / 2)

async def stop(self, exception: Exception = None) -> Any:
await self._watchdog_timer.stop()
4 changes: 2 additions & 2 deletions aiomisc/version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" This file is automatically generated by distutils. """

# Follow PEP-0396 rationale
version_info = (15, 2, 7, "g9deb835")
__version__ = "15.2.7"
version_info = (15, 3, 0, "g8b9caa1")
__version__ = "15.3.0"
100 changes: 58 additions & 42 deletions aiomisc_log/formatter/journald.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ class JournaldLogHandler(logging.Handler):
VALUE_LEN_STRUCT = struct.Struct("<Q")
SOCKET_PATH = Path("/run/systemd/journal/socket")

RECORD_FIELDS_MAP = MappingProxyType({
"args": "arguments",
"created": None,
"exc_info": None,
"exc_text": None,
"filename": None,
"funcName": None,
"levelname": None,
"levelno": None,
"lineno": None,
"message": None,
"module": None,
"msecs": None,
"msg": "message_raw",
"name": "logger_name",
"pathname": None,
"process": "pid",
"processName": "process_name",
"relativeCreated": None,
"thread": "thread_id",
"threadName": "thread_name",
})

__slots__ = ("__facility", "socket", "__identifier")

@staticmethod
Expand All @@ -70,6 +93,12 @@ def pack(cls, fp: BinaryIO, key: str, value: Any) -> None:
elif isinstance(value, (int, float)):
fp.write(cls._encode_short(key, value))
return
elif isinstance(value, str):
if "\n" in value:
fp.write(cls._encode_long(key, value.encode()))
return
fp.write(cls._encode_short(key, value))
return
elif isinstance(value, bytes):
fp.write(cls._encode_long(key, value))
return
Expand Down Expand Up @@ -100,49 +129,34 @@ def _to_usec(ts: float) -> int:
return int(ts * 1000000)

def emit(self, record: logging.LogRecord) -> None:
message = str(record.getMessage())

tb_message = ""
if record.exc_info:
tb_message = "\n".join(
traceback.format_exception(*record.exc_info),
)

message += "\n"
message += tb_message

ts = self._to_usec(record.created)

hash_fields = (
message,
record.funcName,
record.levelno,
record.process,
record.processName,
record.levelname,
record.pathname,
record.name,
record.thread,
record.lineno,
ts,
tb_message,
)

with BytesIO() as fp:
message_id = uuid.uuid3(
uuid.NAMESPACE_OID, "$".join(str(x) for x in hash_fields),
).hex
message_id = uuid.uuid1().hex

self.pack(fp, "message", self.format(record))
self.pack(fp, "message_id", message_id)
self.pack(fp, "message_raw", record.msg)

if record.exc_info:
exc_type, exc_value, exc_tb = record.exc_info
self.pack(
fp, "exception", {
"type": exc_type,
"value": exc_value,
},
)
tb_message = "\n".join(
traceback.format_exception(*record.exc_info),
)
self.pack(fp, "traceback", tb_message)

self.pack(fp, "priority", self.LEVELS[record.levelno])
self.pack(fp, "syslog_facility", self.__facility)
self.pack(fp, "syslog_identifier", self.__identifier)
self.pack(
fp, "code", "{}.{}:{}".format(
record.module, record.funcName, record.lineno,
),
)

self.pack(
fp, "code", {
"func": record.funcName,
Expand All @@ -151,20 +165,22 @@ def emit(self, record: logging.LogRecord) -> None:
"module": record.module,
},
)
self.pack(fp, "logger_name", record.name)
self.pack(fp, "pid", record.process)
self.pack(fp, "process_name", record.processName)
self.pack(fp, "thread_id", record.thread)
self.pack(fp, "thread_name", record.threadName)

self.pack(fp, "created_usec", self._to_usec(record.created))
self.pack(
fp, "relative_usec", self._to_usec(record.relativeCreated),
)

self.pack(fp, "syslog_identifier", self.__identifier)
self.pack(fp, "created_usec", self._to_usec(record.created))
self.pack(fp, "arguments", record.args)
self.pack(fp, "stack_info", record.stack_info)
self.pack(fp, "traceback", tb_message)
source = dict(record.__dict__)

for field, name in self.RECORD_FIELDS_MAP.items():
value = source.pop(field, None)
if name is None or value is None:
continue
self.pack(fp, name, value)

self.pack(fp, "extra", source)

self.socket.sendall(fp.getvalue())


Expand Down
1 change: 1 addition & 0 deletions requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pytest
pytest-cov~=2.5.1
pytest-freezegun~=0.4.2
pytest-rst
pytest-subtests
rich
sphinx-autobuild
sphinx-intl
Expand Down
3 changes: 2 additions & 1 deletion tests/test_circuit_breaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ async def test_simple(loop):
assert responses[True]

responses.clear()
ctx.failed = False

# PASSING state
for _ in range(10):
for _ in range(20):
PatchedCircuitBreaker.tick()

try:
Expand Down
Loading

0 comments on commit edc61d6

Please sign in to comment.