Skip to content

Commit

Permalink
Add a bunch of docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
yuvipanda committed Dec 26, 2018
1 parent 0666875 commit 37442ba
Showing 1 changed file with 47 additions and 6 deletions.
53 changes: 47 additions & 6 deletions simpervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import logging
from simpervisor import atexitasync


class SupervisedProcess:
def __init__(self, name, *args, always_restart=False, **kwargs):
self.always_restart = always_restart
Expand All @@ -22,11 +21,21 @@ def __init__(self, name, *args, always_restart=False, **kwargs):
self._killed = False

# The 'process' is a shared resource, and protected by this lock
# This lock must be aquired whenever the process' state can be
# changed. That includes starting it, communicating with it & waiting
# for it to stop. While we need to be careful around sending signals to
# the process, that doesn't require the lock to be held - since sending
# signals is synchronous.
self._proc_lock = asyncio.Lock()

self.log = logging.getLogger('simpervisor')

def _debug_log(self, action, message, extras=None):
"""
Log debug message with some added meta information.
Makes structured logging easier
"""
base_extras = {
'action': action,
'proccess-name': self.name,
Expand All @@ -48,12 +57,21 @@ def _handle_signal(self, signal):

async def start(self):
"""
Start the process
Start the process if it isn't already running.
If the process is already running, this is a noop. If the process
has already been killed, this raises an exception
"""
# Aquire process lock before we try to start the process.
# We could concurrently be in any other part of the code where
# process is started or killed. So we check for that as soon
# as we aquire the lock and behave accordingly.
with (await self._proc_lock):
if self.running:
# Don't wanna start it again, if we're already running
return
if self._killed:
raise
self._debug_log('try-start', f'Trying to start {self.name}',)
self.proc = await asyncio.create_subprocess_exec(
*self._proc_args, **self._proc_kwargs
Expand All @@ -63,11 +81,20 @@ async def start(self):
self._killed = False
self.running = True

# Spin off a coroutine to watch, reap & restart process if needed
self._restart_process_future = asyncio.ensure_future(self._restart_process_if_needed())
atexitasync.add_handler(self._handle_signal)
# Spin off a coroutine to watch, reap & restart process if needed
# We don't wanna do this multiple times, so this is also inside the lock
self._restart_process_future = asyncio.ensure_future(self._restart_process_if_needed())

# This handler is removed when process stops
atexitasync.add_handler(self._handle_signal)

async def _restart_process_if_needed(self):
"""
Watch for process to exit & restart it if needed.
This is a long running task that keeps running until the process
exits. If we restart the process, `start()` sets this up again.
"""
retcode = await self.proc.wait()
atexitasync.remove_handler(self._handle_signal)
self._debug_log(
Expand All @@ -80,16 +107,30 @@ async def _restart_process_if_needed(self):


async def _signal_and_wait(self, signum):
# We don't want this to race with start
"""
Send a SIGTERM or SIGKILL to the child process & reap it.
- Send the signal to the process
- Make sure we don't restart it when it stops
- Wait for it to stop
- Remove signal handler for it after we are done.
"""

# Aquire lock to modify process sate
with (await self._proc_lock):
# Don't yield control between sending signal & calling wait
# This way, we don't end up in a call to _restart_process_if_needed
# and possibly restarting. We also set _killed, just to be sure.
self.proc.send_signal(signum)
self._killed = True

# We cancel the restart watcher & wait for the process to finish,
# since we return only after the process has been reaped
self._restart_process_future.cancel()
await self.proc.wait()
self.running = False
# Remove signal handler *after* the process is done
atexitasync.remove_handler(self._handle_signal)

async def terminate(self):
"""
Expand Down

0 comments on commit 37442ba

Please sign in to comment.