Skip to content

Commit

Permalink
Merge 2dc138f into 12bc05e
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Oct 30, 2021
2 parents 12bc05e + 2dc138f commit 1be677f
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 63 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ in this document.
15.2.x
------

* Added `aiomisc.service.RespawningProcessService` for running python function in
separate process and restart it when exiting.
* Rewrite `aiomisc.WorkerPool` using `multiprocessing.Process`
instead of `subprocess.Popen`.
* `aiomisc.ServiceMeta` is now inherited from `abc.ABCMeta`. It means
* fixed (cron): is now set current datetime as start point #120
* `aiomisc.ServiceMeta` is now inherited from `abc.ABCMeta`. It means
* `aiomisc.ServiceMeta` is now inherited from `abc.ABCMeta`. It means
the decorations like `@abc.abstractmethod` will work with service classes.
* Added `aiomisc.service.ProcessService` for running python function in
separate process.
Expand Down
3 changes: 3 additions & 0 deletions aiomisc/service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import Service, ServiceMeta, SimpleServer
from .process import ProcessService, RespawningProcessService
from .profiler import Profiler
from .tcp import TCPServer
from .tls import TLSServer
Expand All @@ -8,7 +9,9 @@

__all__ = (
"MemoryTracer",
"ProcessService",
"Profiler",
"RespawningProcessService",
"Service",
"ServiceMeta",
"SimpleServer",
Expand Down
54 changes: 48 additions & 6 deletions aiomisc/service/process.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
import os
import signal
from abc import abstractclassmethod
from abc import ABC, abstractclassmethod
from multiprocessing import Event, Process, synchronize
from typing import Any, Callable, Dict
from typing import Any, Callable, Dict, Optional

from aiomisc_log import LOG_FORMAT, LOG_LEVEL, LogFormat, basic_config

from .base import Service

from aiomisc.service.base import Service
from aiomisc.periodic import PeriodicCallback

log = logging.getLogger(__name__)

Expand All @@ -21,7 +21,7 @@ def _process_inner(
stop_event: synchronize.Event,
**kwargs: Any
) -> None:
basic_config(level=log_level, format=log_format)
basic_config(level=log_level, log_format=log_format)
start_event.set()
try:
function(**kwargs)
Expand All @@ -30,6 +30,7 @@ def _process_inner(


class ProcessService(Service):
name: Optional[str] = None
process: Process
process_start_event: synchronize.Event
process_stop_event: synchronize.Event
Expand Down Expand Up @@ -65,11 +66,22 @@ async def start(self) -> Any:
self.process_stop_event,
),
kwargs=self.get_process_kwargs(),
name=self.name,
)

process.start()

await self.loop.run_in_executor(None, self.process_start_event.wait)
self.process = process

def __repr__(self) -> str:
pid: Optional[int] = None
if hasattr(self, "process"):
pid = self.process.pid

return "<{} object at {}: name={!r}, pid={}>".format(
self.__class__.__name__, hex(id(self)), self.name, pid,
)

async def stop(self, exception: Exception = None) -> Any:
if not self.process.is_alive() or not self.process.pid:
Expand All @@ -80,4 +92,34 @@ async def stop(self, exception: Exception = None) -> Any:
await self.loop.run_in_executor(None, self.process_stop_event.wait)


__all__ = ("ProcessService",)
class RespawningProcessService(ProcessService, ABC):
process_poll_timeout: int = 5

_supervisor: PeriodicCallback

async def __supervise(self) -> None:
if not hasattr(self, "process"):
return

if await self.loop.run_in_executor(None, self.process.is_alive):
return

log.info(
"Process in service %r exited with code %r, respawning.",
self, self.process.exitcode,
)
await super().start()

async def start(self) -> None:
await super().start()
self._supervisor = PeriodicCallback(self.__supervise)
self._supervisor.start(
self.process_poll_timeout,
)

async def stop(self, exception: Exception = None) -> Any:
await self._supervisor.stop()
await super().stop(exception)


__all__ = ("ProcessService", "RespawningProcessService")
4 changes: 2 additions & 2 deletions aiomisc/version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" This file is automatically generated by distutils. """

# Follow PEP-0396 rationale
version_info = (15, 2, 7, "g9deb835")
__version__ = "15.2.7"
version_info = (15, 2, 15, "gfe109a0")
__version__ = "15.2.15"
59 changes: 28 additions & 31 deletions aiomisc/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
import warnings
from inspect import Traceback
from itertools import chain
from multiprocessing import AuthenticationError, ProcessError
from multiprocessing import AuthenticationError, Process, ProcessError
from os import chmod, urandom
from subprocess import PIPE, Popen
from tempfile import mktemp
from types import MappingProxyType
from typing import (
Expand All @@ -22,7 +21,7 @@
from aiomisc_log import LOG_FORMAT, LOG_LEVEL
from aiomisc_worker import (
COOKIE_SIZE, HASHER, INET_AF, SIGNAL, AddressType, Header, PacketTypes, T,
log,
log, worker_process,
)


Expand Down Expand Up @@ -73,40 +72,37 @@ def _create_socket(self) -> None:
self.address = self.socket.getsockname()[:2]

@staticmethod
def _kill_process(process: Popen) -> None:
if process.returncode is not None:
def _kill_process(process: Process) -> None:
if not process.is_alive():
return None
log.debug("Terminating worker pool process PID: %s", process.pid)
process.kill()

@threaded
def __create_process(self, identity: str) -> Popen:
def __create_process(self, identity: str) -> Process:
if self.__closing:
raise RuntimeError("Pool closed")

env = dict(os.environ)
env["AIOMISC_NO_PLUGINS"] = ""
process = Popen(
[sys.executable, "-m", "aiomisc_worker"],
stdin=PIPE, env=env,
)
self.__spawning[identity] = process
log.debug("Spawning new worker pool process PID: %s", process.pid)

assert process.stdin

log_level = (
log.getEffectiveLevel() if LOG_LEVEL is None else LOG_LEVEL.get()
)
log_format = "color" if LOG_FORMAT is None else LOG_FORMAT.get().value

process.stdin.write(
pickle.dumps((
self.address, self.__cookie, identity,
log_level, log_format,
)),
process = Process(
target=worker_process.process,
args=(
self.address,
self.__cookie,
identity,
log_level,
log_format,
),
)
process.stdin.close()

process.start()

self.__spawning[identity] = process
log.debug("Spawning new worker pool process PID: %s", process.pid)

return process

Expand All @@ -121,21 +117,21 @@ def __init__(
self.__cookie = urandom(COOKIE_SIZE)
self.__loop: Optional[asyncio.AbstractEventLoop] = None
self.__futures: Set[asyncio.Future] = set()
self.__spawning: Dict[str, Popen] = dict()
self.__spawning: Dict[str, Process] = dict()
self.__task_store: Set[asyncio.Task] = set()
self.__closing = False
self.__starting: Dict[str, asyncio.Future] = dict()
self._statistic = WorkerPoolStatistic()
self.processes: Set[Popen] = set()
self.processes: Set[Process] = set()
self.workers = workers
self.tasks = asyncio.Queue(maxsize=max_overflow)
self.process_poll_time = process_poll_time
self.initializer = initializer
self.initializer_args = initializer_args
self.initializer_kwargs = initializer_kwargs

async def __wait_process(self, process: Popen) -> None:
while process.poll() is None:
async def __wait_process(self, process: Process) -> None:
while process.is_alive():
await asyncio.sleep(self.process_poll_time)

@property
Expand Down Expand Up @@ -278,7 +274,7 @@ async def handler(start_event: asyncio.Event) -> None:
result_future.set_exception(
ProcessError(
"Process {!r} exited with code {!r}".format(
process, process.returncode,
process, process.exitcode,
),
),
)
Expand Down Expand Up @@ -330,7 +326,7 @@ async def start(self) -> None:

await asyncio.gather(*tasks)

def __on_exit(self, process: Popen) -> None:
def __on_exit(self, process: Process) -> None:
async def respawn() -> None:
if self.__closing:
return None
Expand Down Expand Up @@ -392,12 +388,13 @@ async def create_task(
func, args, kwargs, result_future, process_future,
))

process: Popen = await process_future
process: Process = await process_future

try:
return await result_future
except asyncio.CancelledError:
os.kill(process.pid, SIGNAL)
if process.pid is not None:
os.kill(process.pid, SIGNAL)
raise

async def __aenter__(self) -> "WorkerPool":
Expand Down
4 changes: 3 additions & 1 deletion aiomisc_log/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
try:
import contextvars
LOG_LEVEL = contextvars.ContextVar("LOG_LEVEL", default=logging.INFO)
LOG_FORMAT = contextvars.ContextVar("LOG_FORMAT", default=LogFormat.color)
LOG_FORMAT = contextvars.ContextVar(
"LOG_FORMAT", default=LogFormat.default()
)
except ImportError:
pass

Expand Down
18 changes: 4 additions & 14 deletions aiomisc_worker/__main__.py → aiomisc_worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pickle
import signal
import socket
import sys
from os import urandom
from types import FrameType
from typing import Any, Optional, Tuple, Union
Expand All @@ -18,14 +17,10 @@ def on_signal(signum: int, frame: FrameType) -> None:
raise asyncio.CancelledError


def main() -> Optional[int]:
address: Union[str, Tuple[str, int]]
cookie: bytes
identity: str

(
address, cookie, identity, log_level, log_format,
) = pickle.load(sys.stdin.buffer)
def process(
address: Union[str, Tuple[str, int]],
cookie: bytes, identity: str, log_level: str, log_format: str,
) -> Optional[int]:

basic_config(level=log_level, log_format=log_format)

Expand Down Expand Up @@ -116,8 +111,3 @@ def step() -> bool:
return 0
except KeyboardInterrupt:
return 1


if __name__ == "__main__":
rc = main()
exit(rc or 0)
2 changes: 1 addition & 1 deletion docs/source/locale/ru/LC_MESSAGES/index.po
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ msgid ""
msgstr ""
"Project-Id-Version: 14\n"
"Report-Msgid-Bugs-To: me@mosquito.su\n"
"POT-Creation-Date: 2021-10-28 23:52+0300\n"
"POT-Creation-Date: 2021-10-30 00:28+0300\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: Dmitry Orlov <me@mosquito.su>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
Expand Down
16 changes: 15 additions & 1 deletion docs/source/locale/ru/LC_MESSAGES/services.po
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ msgid ""
msgstr ""
"Project-Id-Version: 14\n"
"Report-Msgid-Bugs-To: me@mosquito.su\n"
"POT-Creation-Date: 2021-10-28 23:52+0300\n"
"POT-Creation-Date: 2021-10-30 00:28+0300\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: Dmitry Orlov <me@mosquito.su>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
Expand Down Expand Up @@ -351,3 +351,17 @@ msgstr ""
"Базовый класс для запуска функции отдельным системным процессом и "
"завершения при остановке родительского процесса."

msgid "RespawningProcessService"
msgstr "Класс RespawningProcessService"

msgid ""
"A base class for launching a function by a separate system process, and "
"by termination when the parent process is stopped, It's pretty like "
"`ProcessService` but have one difference when the process unexpectedly "
"exited this will be respawned."
msgstr ""
"Базовый класс для запуска функции отдельным системным процессом и "
"завершения при остановке родительского процесса. Это очень похоже на "
"`ProcessService` с одним отличием - если дочерний процесс неожиданно "
"завершится, то он будет перезапущен."

Loading

0 comments on commit 1be677f

Please sign in to comment.