From 38e1eecb6c1156c9d2fa903208e7bbe8374c6c5c Mon Sep 17 00:00:00 2001 From: Vincent Michel Date: Mon, 16 Jan 2017 16:15:05 +0100 Subject: [PATCH] Update server executors to handle the tango loop properly --- tango/server.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/tango/server.py b/tango/server.py index 35c3d3f5..38a1035e 100644 --- a/tango/server.py +++ b/tango/server.py @@ -1316,8 +1316,8 @@ def tango_loop(): log.debug("server loop exit") if async_mode: - tango_thread_id = worker.run_in_thread(tango_loop) - worker.run() + task = worker.run_in_thread(tango_loop) + worker.run(task) log.debug("async worker finished") else: tango_loop() @@ -1592,12 +1592,14 @@ def is_gevent_thread(self): return self.__id == id(current_thread()) def run_in_thread(self, func, *args, **kwargs): - thread_id = gevent._threading.start_new_thread(func, args, kwargs) - return thread_id + return gevent.get_hub().threadpool.spawn(func, *args, **kwargs) - def run(self, timeout=None): - return gevent.wait(objects=(self.__stop_event,), - timeout=timeout) + def run(self, until=None, timeout=None): + if until is not None: + objects = until, self.__stop_event + else: + objects = self.__stop_event, + return gevent.wait(objects=objects, timeout=timeout) def execute(self, func, *args, **kwargs): if self.is_gevent_thread(): @@ -1657,11 +1659,15 @@ def run_in_thread(self, func, *args, **kwargs): # That is not actually necessary since coro is actually # a future. But it is an implementation detail and it # might be changed later on. - asyncio.async(coro) + return asyncio.async(coro) - def run(self, timeout=None): + def run(self, until=None, timeout=None): """Run the asyncio event loop.""" - self.loop.run_forever() + if until is None and timeout is None: + return self.loop.run_forever() + if until is None: + until = asyncio.sleep(timeout, loop=self.loop) + return self.loop.run_until_complete(until) def stop(self): """Run the asyncio event loop."""