Skip to content

Commit

Permalink
Added support for shared memory in the botogram runner.
Browse files Browse the repository at this point in the history
  • Loading branch information
Pietro Albini committed Sep 17, 2015
1 parent 7586a28 commit c338c21
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 5 deletions.
1 change: 1 addition & 0 deletions botogram/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def freeze(self):
self.after_help, self.process_backlog,
self.lang, self.itself, self._commands_re,
self._components+[self._main_component],
self._main_component._component_id,
self._bot_id, self._shared_memory)

@property
Expand Down
12 changes: 9 additions & 3 deletions botogram/frozenbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class FrozenBot:

def __init__(self, api, about, owner, hide_commands, before_help,
after_help, process_backlog, lang, itself, commands_re,
components, bot_id, shared_memory):
components, main_component_id, bot_id, shared_memory):
# This attribute should be added with the default setattr, because is
# needed by the custom setattr
object.__setattr__(self, "_frozen", False)
Expand All @@ -37,6 +37,7 @@ def __init__(self, api, about, owner, hide_commands, before_help,
self.lang = lang
self._commands_re = commands_re
self._components = components
self._main_component_id = main_component_id
self._bot_id = bot_id
self._shared_memory = shared_memory

Expand Down Expand Up @@ -73,7 +74,7 @@ def __reduce__(self):
self.api, self.about, self.owner, self.hide_commands,
self.before_help, self.after_help, self.process_backlog,
self.lang, self.itself, self._commands_re, self._components,
self._bot_id, self._shared_memory
self._main_component_id, self._bot_id, self._shared_memory
)
return restore, args

Expand Down Expand Up @@ -196,7 +197,12 @@ def _call_arguments(self, func, funcid, args):

# If the developer wants the component's shared memory
elif arg == "shared":
shared = self._shared_memory.of(func.botogram.component)
if func.botogram.component is not None:
compid = func.botogram.component._component_id
else:
compid = self._main_component_id

shared = self._shared_memory.of(compid)
args = (shared,) + args

return args
Expand Down
14 changes: 12 additions & 2 deletions botogram/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
"""

import multiprocessing
import multiprocessing.managers
import time
import atexit
import signal
import logbook

from . import processes
from . import shared


class BotogramRunner:
Expand All @@ -29,6 +31,12 @@ def __init__(self, *bots, workers=2):
self._stop = False
self._started_at = None

# Create a new memory manager and apply a new driver to all the bots
self._shared_memory = shared.SharedMemoryManager()
for bot_id, bot in self._bots.items():
driver = self._shared_memory.get_driver(bot_id)
bot._shared_memory.switch_driver(driver)

self._workers_count = workers

self.logger = logbook.Logger("botogram runner")
Expand All @@ -45,17 +53,19 @@ def run(self):
self._started_at = time.time()

self._enable_signals()
self._shared_memory.start()
to_workers, to_updaters = self._boot_processes()

try:
# Main server loop
# This actually does nothing, sorry
while not self._stop:
time.sleep(0.2)
self._shared_memory.process_commands()
time.sleep(0.1)
except (KeyboardInterrupt, InterruptedError):
pass

self._shutdown_processes(to_workers, to_updaters)
self._shared_memory.stop()

self.running = False
self._started_at = None
Expand Down
108 changes: 108 additions & 0 deletions botogram/runner/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
botogram.runner.shared
Shared memory implementation for the botogram runner
Copyright (c) 2015 Pietro Albini <pietro@pietroalbini.io>
Released under the MIT license
"""

import queue
import multiprocessing
import multiprocessing.managers


class SharedMemoryManager:
"""Manage shared memory in the botogram runner"""

def __init__(self):
self._memories = {}
self._queues = []
self._manager = multiprocessing.managers.SyncManager()

def get(self, memory_id):
"""Get the shared memory of a given bot:component"""
if memory_id not in self._memories:
self._memories[memory_id] = self._manager.dict()
return self._memories[memory_id]

def get_driver(self, bot_id):
"""Get a new driver for the shared memory"""
return MultiprocessingDriver(bot_id, *self._get_commands_queue())

def _get_commands_queue(self):
"""Get a new queue for commands"""
commands = multiprocessing.Queue()
responses = multiprocessing.Queue()
self._queues.append((commands, responses))
return commands, responses

def process_commands(self):
"""Process commands from the drivers"""
for commands, responses in self._queues:
# Get a new command from the queue
try:
message = commands.get(False)
except queue.Empty:
continue

# memory <memoryid>
if message.startswith("memory"):
memory = self.get(message.split(" ", 1)[1])
responses.put(memory)

# new_queue
elif message.startswith("new_queue"):
new_q = self._get_commands_queue()
responses.put(new_q)

def start(self):
"""Start the background process"""
self._manager.start()

def stop(self):
"""Stop the background process"""
self._manager.shutdown()


class MultiprocessingDriver:
"""This is a multiprocessing-ready driver for the shared memory"""

def __init__(self, bot_id, commands, responses):
self._bot_id = bot_id
self._commands = commands
self._responses = responses
self._memories = {}

def __reduce__(self):
new_commands, new_responses = self._command("new_queue")
return rebuild_driver, (self._bot_id, new_commands, new_responses)

def _command(self, *args):
"""Send a command to the manager"""
self._commands.put(" ".join(args))
return self._responses.get()

def get(self, component):
# Create the shared memory if it doens't exist
if component not in self._memories:
memory = self._command("memory", self._bot_id+":"+component)
self._memories[component] = memory

return self._memories[component]

def import_data(self, data):
# This will merge the provided component with the shared memory
for component, memory in data.items():
memory = self.get(component)
memory.update(data)

def export_data(self):
result = {}
for component, data in self._memories.items():
result[component] = dict(data)

return result


def rebuild_driver(bot_id, commands, responses):
return MultiprocessingDriver(bot_id, commands, responses)

0 comments on commit c338c21

Please sign in to comment.