Skip to content

Commit

Permalink
Moved the shared memory's commands processing in a different process.
Browse files Browse the repository at this point in the history
This doesn't bring compatibility to Windows, but it's a little step in that
direction.
  • Loading branch information
Pietro Albini committed Sep 24, 2015
1 parent 26829ff commit 10833fa
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 25 deletions.
34 changes: 23 additions & 11 deletions botogram/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ def __init__(self, *bots, workers=2):

self._updater_processes = {}
self._worker_processes = []
self._shared_memory_process = None

self.running = False
self._stop = False
self._started_at = None

# Create a new memory manager and apply a new driver to all the bots
# The manager created here will raise an exception if you try to get
# memories in the master process. It will run fine however in its own
# process.
self._shared_memory = shared.SharedMemoryManager()
for bot_id, bot in self._bots.items():
driver = self._shared_memory.get_driver()
bot._shared_memory.switch_driver(driver)
for bot in self._bots.values():
bot._shared_memory.switch_driver(self._shared_memory.get_driver())

shared_memory_queues = self._shared_memory.get_commands_queue()
self._shared_memory_commands = shared_memory_queues[0]
self._shared_memory_responses = shared_memory_queues[1]

self._workers_count = workers

Expand All @@ -53,19 +59,16 @@ def run(self):
self._started_at = time.time()

self._enable_signals()
self._shared_memory.start()
to_workers, to_updaters = self._boot_processes()

try:
# Main server loop
while not self._stop:
self._shared_memory.process_commands()
time.sleep(0.1)
time.sleep(0.2)
except (KeyboardInterrupt, InterruptedError):
pass

self._shutdown_processes(to_workers, to_updaters)
self._shared_memory.stop()

self.running = False
self._started_at = None
Expand All @@ -79,16 +82,21 @@ def _boot_processes(self):
queue = multiprocessing.Queue()
upd_commands = multiprocessing.Queue()

# Boot up the shared memory process
shared_memory = processes.SharedMemoryProcess(self._shared_memory)
shared_memory.start()
self._shared_memory_process = shared_memory

# Boot up all the worker processes
for i in range(self._workers_count):
worker = processes.WorkerProcess(self, queue)
worker = processes.WorkerProcess(self._bots, queue)
worker.start()

self._worker_processes.append(worker)

# Boot up all the updater processes
for id, bot in self._bots.items():
updater = processes.UpdaterProcess(self, id, queue, upd_commands)
for bot in self._bots.values():
updater = processes.UpdaterProcess(bot, queue, upd_commands)
updater.start()

self._updater_processes[id] = updater
Expand All @@ -114,6 +122,10 @@ def _shutdown_processes(self, to_workers, to_updaters):
worker.join()
self._worker_processes = []

# And finally we stop the shared memory's manager
self._shared_memory_commands.put("stop")
self._shared_memory_process.join()

def _enable_signals(self):
"""Setup signals handlers"""
atexit.register(self.stop)
Expand Down
58 changes: 49 additions & 9 deletions botogram/runner/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import os
import traceback
import queue
import time

import logbook

from . import jobs
from .. import objects
Expand All @@ -19,10 +22,9 @@
class BaseProcess(multiprocessing.Process):
"""Base class for all of the processes"""

def __init__(self, runner, *args):
self.runner = runner
def __init__(self, *args):
self.stop = False
self.logger = runner.logger
self.logger = logbook.Logger("botogram subprocess")

super(BaseProcess, self).__init__()
self.setup(*args)
Expand All @@ -33,6 +35,8 @@ def setup(self, *args):

def run(self):
"""Run the process"""
self.before_start()

self.logger.debug("%s process is ready! (pid: %s)" % (self.name,
os.getpid()))
while not self.stop:
Expand All @@ -43,24 +47,58 @@ def run(self):
except:
traceback.print_exc()

self.after_stop()

self.logger.debug("%s process with pid %s just stopped" % (self.name,
os.getpid()))

def loop(self):
"""One single loop"""
pass

def before_start(self):
"""Before the process starts"""
pass

def after_stop(self):
"""After the process stops"""
pass

def on_stop(self):
"""When the process is stopping"""
self.stop = True


class SharedMemoryProcess(BaseProcess):
"""This process will manage commands for shared memory's drivers"""

name = "Shared"

def setup(self, manager):
self.manager = manager

def loop(self):
result = self.manager.process_commands()

# result == False means the process should stop
if result is False:
self.stop = True

def before_start(self):
self.manager.initialize()
self.manager.start()

def after_stop(self):
self.manager.stop()


class WorkerProcess(BaseProcess):
"""This process will execute all the updates it receives"""

name = "Worker"

def setup(self, queue):
def setup(self, bots, queue):
self.bots = bots
self.queue = queue
self.will_stop = False

Expand All @@ -80,7 +118,7 @@ def loop(self):
return

# Run the wanted job
job.process(self.runner._bots)
job.process(self.bots)

def on_stop(self):
self.will_stop = True
Expand All @@ -91,12 +129,14 @@ class UpdaterProcess(BaseProcess):

name = "Updater"

def setup(self, bot_id, to_workers, commands):
self.bot_id = bot_id
self.bot = self.runner._bots[bot_id]
def setup(self, bot, to_workers, commands):
self.bot = bot
self.bot_id = bot._bot_id
self.to_workers = to_workers
self.commands = commands

self.started_at = time.time()

self.backlog_processed = False
self.last_id = -1

Expand Down Expand Up @@ -131,7 +171,7 @@ def loop(self):
self.last_id = update.update_id

if not self.backlog_processed:
if update.message.date < self.runner._started_at:
if update.message.date < self.started_at:
self.logger.debug("Update #%s skipped because it's coming "
"from the backlog." % update.update_id)
continue
Expand Down
41 changes: 36 additions & 5 deletions botogram/runner/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,31 @@ class SharedMemoryManager:
def __init__(self):
self._memories = {}
self._queues = []
self._manager = None

def __reduce__(self):
return rebuild_manager, (self._memories, self._queues)

def initialize(self):
"""Initialize the manager"""
# This is on a different function, so we can initialize this in the
# process we want
self._manager = multiprocessing.managers.SyncManager()

def get(self, memory_id):
"""Get the shared memory of a given bot:component"""
if self._manager is None:
raise RuntimeError("Please call initialize() before")

if memory_id not in self._memories:
self._memories[memory_id] = self._manager.dict()
return self._memories[memory_id]

def get_driver(self):
"""Get a new driver for the shared memory"""
return MultiprocessingDriver(*self._get_commands_queue())
return MultiprocessingDriver(*self.get_commands_queue())

def _get_commands_queue(self):
def get_commands_queue(self):
"""Get a new queue for commands"""
commands = multiprocessing.Queue()
responses = multiprocessing.Queue()
Expand All @@ -52,15 +64,26 @@ def process_commands(self):

# new_queue
elif message.startswith("new_queue"):
new_q = self._get_commands_queue()
new_q = self.get_commands_queue()
responses.put(new_q)

# Tell the process to stop
elif message.startswith("stop"):
return False


def start(self):
"""Start the background process"""
if self._manager is None:
raise RuntimeError("Please call initialize() before")

self._manager.start()

def stop(self):
"""Stop the background process"""
if self._manager is None:
raise RuntimeError("Please call initialize() before")

self._manager.shutdown()


Expand Down Expand Up @@ -103,5 +126,13 @@ def export_data(self):
return result


def rebuild_driver(bot_id, commands, responses):
return MultiprocessingDriver(bot_id, commands, responses)
def rebuild_manager(memories, queues):
obj = SharedMemoryManager()
obj._memories = memories
obj._queues = queues

return obj


def rebuild_driver(commands, responses):
return MultiprocessingDriver(commands, responses)

0 comments on commit 10833fa

Please sign in to comment.