Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
Merge c68a6ef into d123f2d
Browse files Browse the repository at this point in the history
  • Loading branch information
rmotitsuki committed May 28, 2021
2 parents d123f2d + c68a6ef commit 3c58da8
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 20 deletions.
17 changes: 16 additions & 1 deletion kytos/core/controller.py
Expand Up @@ -85,6 +85,8 @@ def __init__(self, options=None, loop=None):

self._loop = loop or asyncio.get_event_loop()
self._pool = ThreadPoolExecutor(max_workers=1)
# asyncio tasks
self._tasks = []

#: dict: keep the main threads of the controller (buffers and handler)
self._threads = {}
Expand Down Expand Up @@ -293,11 +295,16 @@ async def _run_api_server_thread(executor):
len(completed), len(pending))

task = self._loop.create_task(self.raw_event_handler())
self._tasks.append(task)
task = self._loop.create_task(self.msg_in_event_handler())
self._tasks.append(task)
task = self._loop.create_task(self.msg_out_event_handler())
self._tasks.append(task)
task = self._loop.create_task(self.app_event_handler())
self._tasks.append(task)
task = self._loop.create_task(_run_api_server_thread(self._pool))
task.add_done_callback(_stop_loop)
self._tasks.append(task)

self.log.info("ThreadPool started: %s", self._pool)

Expand Down Expand Up @@ -405,7 +412,11 @@ def stop_controller(self, graceful=True):
self.log.debug("%s threads before threadpool shutdown: %s",
len(threads), threads)

self._pool.shutdown(wait=graceful)
try:
# Python >= 3.9
self._pool.shutdown(wait=graceful, cancel_futures=True)
except TypeError:
self._pool.shutdown(wait=graceful)

# self.server.socket.shutdown()
# self.server.socket.close()
Expand All @@ -423,6 +434,10 @@ def stop_controller(self, graceful=True):
self.unload_napps()
self.buffers = KytosBuffers()

# Cancel all async tasks (event handlers and servers)
for task in self._tasks:
task.cancel()

# ASYNC TODO: close connections
# self.server.server_close()

Expand Down
41 changes: 25 additions & 16 deletions kytos/core/kytosd.py
Expand Up @@ -35,7 +35,7 @@ def _create_pid_dir():
os.chmod(pid_dir, 0o1777) # permissions like /tmp


def start_shell(controller=None):
def create_shell(controller=None):
"""Load Kytos interactive shell."""
kytos_ascii = r"""
_ _
Expand Down Expand Up @@ -84,8 +84,7 @@ def start_shell(controller=None):
banner1=banner1,
exit_msg=exit_msg)
ipshell.prompts = KytosPrompt(ipshell)

ipshell()
return ipshell


# def disable_threadpool_exit():
Expand All @@ -111,48 +110,58 @@ def main():

def async_main(config):
"""Start main Kytos Daemon with asyncio loop."""
def stop_controller(controller):
def stop_controller(controller, shell_task=None):
"""Stop the controller before quitting."""
loop = asyncio.get_event_loop()

# If stop() hangs, old ctrl+c behaviour will be restored
loop.remove_signal_handler(signal.SIGINT)
loop.remove_signal_handler(signal.SIGTERM)
if loop:
# If stop() hangs, old ctrl+c behaviour will be restored
loop.remove_signal_handler(signal.SIGINT)
loop.remove_signal_handler(signal.SIGTERM)

# disable_threadpool_exit()

controller.log.info("Stopping Kytos controller...")
controller.stop()

if shell_task:
shell_task.cancel()

async def start_shell_async():
"""Run the shell inside a thread and stop controller when done."""
_start_shell = functools.partial(start_shell, controller)
data = await loop.run_in_executor(executor, _start_shell)
executor.shutdown()
stop_controller(controller)
interactive_shell = create_shell(controller)

try:
data = await loop.run_in_executor(executor, interactive_shell)
finally:
# Exit all embedded code in shell
interactive_shell.magic("%exit_raise")
stop_controller(controller)
return data

loop = asyncio.get_event_loop()

controller = Controller(config)

kill_handler = functools.partial(stop_controller, controller)
loop.add_signal_handler(signal.SIGINT, kill_handler)
loop.add_signal_handler(signal.SIGTERM, kill_handler)

if controller.options.debug:
loop.set_debug(True)

loop.call_soon(controller.start)

shell_task = None
if controller.options.foreground:
executor = ThreadPoolExecutor(max_workers=1)
loop.create_task(start_shell_async())
shell_task = loop.create_task(start_shell_async())

kill_handler = functools.partial(stop_controller, controller, shell_task)
loop.add_signal_handler(signal.SIGINT, kill_handler)
loop.add_signal_handler(signal.SIGTERM, kill_handler)

try:
loop.run_forever()
except SystemExit as exc:
controller.log.error(exc)
controller.log.info("Shutting down Kytos...")
finally:

loop.close()
7 changes: 4 additions & 3 deletions tests/unit/test_core/test_kytosd.py
Expand Up @@ -2,7 +2,7 @@
from unittest import TestCase
from unittest.mock import MagicMock, patch

from kytos.core.kytosd import _create_pid_dir, async_main, main, start_shell
from kytos.core.kytosd import _create_pid_dir, async_main, main, create_shell


class TestKytosd(TestCase):
Expand All @@ -26,14 +26,15 @@ def test_create_pid_dir__system(*args):
(mock_mkdirs, mock_chmod) = args
_create_pid_dir()

mock_mkdirs.assert_called_with('/var/run/kytos', exist_ok=True)
mock_mkdirs.assert_called_with('/var/run/kytos', exist_ok=Tre)
mock_chmod.assert_called_with('/var/run/kytos', 0o1777)

@staticmethod
@patch('kytos.core.kytosd.InteractiveShellEmbed')
def test_start_shell(mock_interactive_shell):
"""Test stop_api_server method."""
start_shell(MagicMock())
ipshell = create_shell(MagicMock())
ipshell()

mock_interactive_shell.assert_called()

Expand Down

0 comments on commit 3c58da8

Please sign in to comment.