From 13f139de09744ce9b18f3d299c29571a09f4a31d Mon Sep 17 00:00:00 2001 From: raoyitao Date: Wed, 17 Jun 2020 19:27:37 +0800 Subject: [PATCH] Fix Newly created worker may raise when pool is exiting * A worker communicates with its parent, sending a message and get the response. When a worker is created, e.g. unexpectedly abort and then re-created, it should receive a `ConfigSending` message, but if at this point all task is finished then an `Stop` message arrives, there will be stack information of error logged, which might lead to misunderstanding. * Fix a log message which is not correctly formatted during handling inactive workers. --- testplan/runners/pools/base.py | 6 +++++- testplan/runners/pools/child.py | 11 +++++++++-- testplan/runners/pools/connection.py | 9 ++++++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/testplan/runners/pools/base.py b/testplan/runners/pools/base.py index 53a991db3..972fb6f5b 100644 --- a/testplan/runners/pools/base.py +++ b/testplan/runners/pools/base.py @@ -664,7 +664,7 @@ def _query_worker_status(self, worker): worker.status.STOPPING, worker.status.STOPPED, ): - return "inactive", "Worker in stop/abort status" + return "inactive", "Worker {} in stop/abort status" if worker.status.tag in (worker.status.NONE, worker.status.STARTING): return "initializing", None @@ -700,9 +700,13 @@ def _query_worker_status(self, worker): def _handle_inactive(self, worker, reason): """ Handle an inactive worker. + :param worker: worker object + :type worker: :py:class:`~testplan.runners.pool.base.Worker` :param reason: why worker is considered inactive + :type reason: ``str`` :return: True if worker restarted, else False + :rtype: ``bool`` """ if worker.status.tag != worker.status.STARTED: return False diff --git a/testplan/runners/pools/child.py b/testplan/runners/pools/child.py index a38b991bd..727ef8ce8 100755 --- a/testplan/runners/pools/child.py +++ b/testplan/runners/pools/child.py @@ -6,7 +6,6 @@ import signal import socket import shutil -import inspect import logging import argparse import platform @@ -144,9 +143,17 @@ def _send_and_expect(self, message, send, expect): def _pre_loop_setup(self, message): response = self._send_and_expect( - message, message.ConfigRequest, message.ConfigSending + message, + message.ConfigRequest, + [message.ConfigSending, message.Stop], ) + # Process pool might be exiting after worker restarts and tries + # to connect, at this time worker can gracefully exit. + if response.cmd == message.Stop: + self.logger.debug("Stop message received, child exits.") + os._exit(0) + # Response.data: [cfg, cfg.parent, cfg.parent.parent, ...] pool_cfg = response.data[0] for idx, cfg in enumerate(response.data): diff --git a/testplan/runners/pools/connection.py b/testplan/runners/pools/connection.py index f6adca158..a2f6c6e8d 100644 --- a/testplan/runners/pools/connection.py +++ b/testplan/runners/pools/connection.py @@ -56,8 +56,8 @@ def send_and_receive(self, message, expect=None): :param message: Message sent. :type message: :py:class:`~testplan.runners.pools.communication.Message` - :param expect: Assert message received command is the expected. - :type expect: ``NoneType`` or + :param expect: Expected command of message received. + :type expect: ``NoneType`` or ``tuple`` or ``list`` or :py:class:`~testplan.runners.pools.communication.Message` :return: Message received. :rtype: ``object`` @@ -82,7 +82,10 @@ def send_and_receive(self, message, expect=None): raise RuntimeError( "Received None when {} was expected.".format(expect) ) - assert received.cmd == expect + if isinstance(expect, (tuple, list)): + assert received.cmd in expect + else: + assert received.cmd == expect return received