From 695317260866ef32a60bdec0ecf2c32dbe0f8754 Mon Sep 17 00:00:00 2001 From: Shane Loretz Date: Fri, 15 Sep 2017 08:57:20 -0700 Subject: [PATCH 1/6] Custom exector uses a worker thread --- .../custom_executor.py | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/rclpy/executors/examples_rclpy_executors/custom_executor.py b/rclpy/executors/examples_rclpy_executors/custom_executor.py index 43c38044..cc1c406e 100644 --- a/rclpy/executors/examples_rclpy_executors/custom_executor.py +++ b/rclpy/executors/examples_rclpy_executors/custom_executor.py @@ -44,16 +44,41 @@ class PriorityExecutor(Executor): def __init__(self): super().__init__() + self.is_shutdown = False self.high_priority_nodes = set() - self.low_priority_thread = None + self.lp_handler = None + self.lp_cv = threading.Condition() + self.lp_thread = threading.Thread(target=self.low_priority_runner) + self.lp_thread.start() def add_high_priority_node(self, node): self.high_priority_nodes.add(node) # add_node inherited self.add_node(node) + def shutdown(self, timeout_sec=None): + """Wait for all calbacks to finish executing and stop spinning.""" + if super().shutdown(timeout_sec=timeout_sec): + with self.lp_cv: + self.is_shutdown = True + self.lp_cv.notify_all() + self.lp_thread.join() + return True + return False + + def low_priority_runner(self): + """Wait for a low priority callback and run it.""" + while not self.is_shutdown: + with self.lp_cv: + self.lp_cv.wait_for(lambda: self.is_shutdown or self.lp_handler is not None) + if self.lp_handler is not None: + try: + self.lp_handler() + finally: + self.lp_handler = None + def can_run_low_priority(self): - return self.low_priority_thread is None or not self.low_priority_thread.is_alive() + return self.lp_handler is None def spin_once(self, timeout_sec=None): """ @@ -85,8 +110,9 @@ def spin_once(self, timeout_sec=None): t = threading.Thread(target=handler) t.start() else: - self.low_priority_thread = threading.Thread(target=handler) - self.low_priority_thread.start() + with self.lp_cv: + self.lp_handler = handler + self.lp_cv.notify_all() def main(args=None): From 7befe47fc94ec17596bf37d5f3b591323b9fc33a Mon Sep 17 00:00:00 2001 From: Shane Loretz Date: Fri, 15 Sep 2017 09:13:57 -0700 Subject: [PATCH 2/6] Shortened example with ThreadPoolExecutor --- .../custom_executor.py | 57 +++---------------- 1 file changed, 7 insertions(+), 50 deletions(-) diff --git a/rclpy/executors/examples_rclpy_executors/custom_executor.py b/rclpy/executors/examples_rclpy_executors/custom_executor.py index cc1c406e..b29ff51e 100644 --- a/rclpy/executors/examples_rclpy_executors/custom_executor.py +++ b/rclpy/executors/examples_rclpy_executors/custom_executor.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading +from concurrent.futures import ThreadPoolExecutor from examples_rclpy_executors.listener import Listener from examples_rclpy_executors.talker import Talker @@ -44,42 +44,15 @@ class PriorityExecutor(Executor): def __init__(self): super().__init__() - self.is_shutdown = False self.high_priority_nodes = set() - self.lp_handler = None - self.lp_cv = threading.Condition() - self.lp_thread = threading.Thread(target=self.low_priority_runner) - self.lp_thread.start() + self.hp_executor = ThreadPoolExecutor(max_workers=10) + self.lp_executor = ThreadPoolExecutor(max_workers=1) def add_high_priority_node(self, node): self.high_priority_nodes.add(node) # add_node inherited self.add_node(node) - def shutdown(self, timeout_sec=None): - """Wait for all calbacks to finish executing and stop spinning.""" - if super().shutdown(timeout_sec=timeout_sec): - with self.lp_cv: - self.is_shutdown = True - self.lp_cv.notify_all() - self.lp_thread.join() - return True - return False - - def low_priority_runner(self): - """Wait for a low priority callback and run it.""" - while not self.is_shutdown: - with self.lp_cv: - self.lp_cv.wait_for(lambda: self.is_shutdown or self.lp_handler is not None) - if self.lp_handler is not None: - try: - self.lp_handler() - finally: - self.lp_handler = None - - def can_run_low_priority(self): - return self.lp_handler is None - def spin_once(self, timeout_sec=None): """ Execute a single callback, then return. @@ -91,28 +64,16 @@ def spin_once(self, timeout_sec=None): :param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0 :type timeout_sec: float or None """ - # Wait only on high priority nodes if the low priority thread is taken. - # this avoids spinning rapidly when low priority callbacks are available but - # can't be acted on - nodes = self.high_priority_nodes - if self.can_run_low_priority(): - # get_nodes returns all nodes added to executor - nodes = self.get_nodes() - # wait_for_ready_callbacks yields callbacks that are ready to be executed try: - handler, group, node = next(self.wait_for_ready_callbacks( - timeout_sec=timeout_sec, nodes=nodes)) + handler, group, node = next(self.wait_for_ready_callbacks(timeout_sec=timeout_sec)) except StopIteration: pass else: if node in self.high_priority_nodes: - t = threading.Thread(target=handler) - t.start() + self.hp_executor.submit(handler) else: - with self.lp_cv: - self.lp_handler = handler - self.lp_cv.notify_all() + self.lp_executor.submit(handler) def main(args=None): @@ -123,11 +84,7 @@ def main(args=None): executor.add_node(Listener()) executor.add_node(Talker()) try: - # TODO(sloretz) use executor.spin() once guard conditions become available to users - while rclpy.ok(): - # A timeout is used to make the executor periodically reevaluate if low priority - # nodes can be executed - executor.spin_once(timeout_sec=0.5) + executor.spin() finally: executor.shutdown() finally: From 440015046c8e775b56b80006dc410f24d064be24 Mon Sep 17 00:00:00 2001 From: Shane Loretz Date: Fri, 15 Sep 2017 09:27:42 -0700 Subject: [PATCH 3/6] Simplified arguments in tests --- rclpy/executors/test/test_copyright.py | 6 ++---- rclpy/executors/test/test_flake8.py | 6 ++---- rclpy/executors/test/test_pep257.py | 6 ++---- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/rclpy/executors/test/test_copyright.py b/rclpy/executors/test/test_copyright.py index 2746b44f..4203a610 100644 --- a/rclpy/executors/test/test_copyright.py +++ b/rclpy/executors/test/test_copyright.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os - from ament_copyright.main import main def test_copyright(): - root_path = os.path.realpath(os.path.join(os.path.dirname(__file__), '..')) - rc = main(argv=[root_path]) + # Test is called from package root + rc = main(argv=['.']) assert rc == 0, 'Found errors' diff --git a/rclpy/executors/test/test_flake8.py b/rclpy/executors/test/test_flake8.py index cd2e6ec2..5f5b1932 100644 --- a/rclpy/executors/test/test_flake8.py +++ b/rclpy/executors/test/test_flake8.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os - from ament_flake8.main import main def test_flake8(): - root_path = os.path.realpath(os.path.join(os.path.dirname(__file__), '..')) - rc = main(argv=[root_path]) + # Test is called from package root + rc = main(argv=['.']) assert rc == 0, 'Found errors' diff --git a/rclpy/executors/test/test_pep257.py b/rclpy/executors/test/test_pep257.py index 443a67a2..0a7cfa36 100644 --- a/rclpy/executors/test/test_pep257.py +++ b/rclpy/executors/test/test_pep257.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os - from ament_pep257.main import main def test_pep257(): - root_path = os.path.realpath(os.path.join(os.path.dirname(__file__), '..')) - rc = main(argv=[root_path]) + # Test is called from package root + rc = main(argv=['.']) assert rc == 0, 'Found code style errors / warnings' From 018513a66e1f025c26ea76112b91f0122090498b Mon Sep 17 00:00:00 2001 From: Shane Loretz Date: Fri, 15 Sep 2017 09:29:54 -0700 Subject: [PATCH 4/6] Num workers = cpu_count() --- rclpy/executors/examples_rclpy_executors/custom_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rclpy/executors/examples_rclpy_executors/custom_executor.py b/rclpy/executors/examples_rclpy_executors/custom_executor.py index b29ff51e..47cc54eb 100644 --- a/rclpy/executors/examples_rclpy_executors/custom_executor.py +++ b/rclpy/executors/examples_rclpy_executors/custom_executor.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent.futures import ThreadPoolExecutor +import multiprocessing from examples_rclpy_executors.listener import Listener from examples_rclpy_executors.talker import Talker @@ -45,7 +46,7 @@ class PriorityExecutor(Executor): def __init__(self): super().__init__() self.high_priority_nodes = set() - self.hp_executor = ThreadPoolExecutor(max_workers=10) + self.hp_executor = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) self.lp_executor = ThreadPoolExecutor(max_workers=1) def add_high_priority_node(self, node): From 5faf21687bb49ee6889b8bf48de6d38532456ee5 Mon Sep 17 00:00:00 2001 From: Shane Loretz Date: Fri, 15 Sep 2017 09:53:49 -0700 Subject: [PATCH 5/6] max_workers = os.cpu_count() or 4 --- rclpy/executors/examples_rclpy_executors/custom_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclpy/executors/examples_rclpy_executors/custom_executor.py b/rclpy/executors/examples_rclpy_executors/custom_executor.py index 47cc54eb..134f1eb7 100644 --- a/rclpy/executors/examples_rclpy_executors/custom_executor.py +++ b/rclpy/executors/examples_rclpy_executors/custom_executor.py @@ -13,7 +13,7 @@ # limitations under the License. from concurrent.futures import ThreadPoolExecutor -import multiprocessing +import os from examples_rclpy_executors.listener import Listener from examples_rclpy_executors.talker import Talker @@ -46,7 +46,7 @@ class PriorityExecutor(Executor): def __init__(self): super().__init__() self.high_priority_nodes = set() - self.hp_executor = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) + self.hp_executor = ThreadPoolExecutor(max_workers=(os.cpu_count() or 4)) self.lp_executor = ThreadPoolExecutor(max_workers=1) def add_high_priority_node(self, node): From 53bd00f6a2a9a6ddb4961ae31d0a138d1bc52354 Mon Sep 17 00:00:00 2001 From: Shane Loretz Date: Fri, 15 Sep 2017 10:05:19 -0700 Subject: [PATCH 6/6] Remove () --- rclpy/executors/examples_rclpy_executors/custom_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclpy/executors/examples_rclpy_executors/custom_executor.py b/rclpy/executors/examples_rclpy_executors/custom_executor.py index 134f1eb7..87c4e806 100644 --- a/rclpy/executors/examples_rclpy_executors/custom_executor.py +++ b/rclpy/executors/examples_rclpy_executors/custom_executor.py @@ -46,7 +46,7 @@ class PriorityExecutor(Executor): def __init__(self): super().__init__() self.high_priority_nodes = set() - self.hp_executor = ThreadPoolExecutor(max_workers=(os.cpu_count() or 4)) + self.hp_executor = ThreadPoolExecutor(max_workers=os.cpu_count() or 4) self.lp_executor = ThreadPoolExecutor(max_workers=1) def add_high_priority_node(self, node):