Skip to content

Commit

Permalink
Fix Newly created worker may raise when pool is exiting
Browse files Browse the repository at this point in the history
* A worker communicates with its parent, sending a message and get the
  response. When a worker is created, e.g. unexpectedly abort and then
  re-created, it should receive a `ConfigSending` message, but if at
  this point all task is finished then an `Stop` message arrives, there
  will be stack information of error logged, which might lead to
  misunderstanding.
* Fix a log message which is not correctly formatted during handling
  inactive workers.
  • Loading branch information
raoyitao committed Jun 18, 2020
1 parent d1c66ca commit 8b52aef
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
8 changes: 6 additions & 2 deletions testplan/runners/pools/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ def _deco_worker(self, worker, message):
"""
Decommission a worker by move all assigned task back to pool
"""
self.logger.critical(message.format(worker))
self.logger.critical(message)
if os.path.exists(worker.outfile):
self.logger.critical("\tlogfile: {}".format(worker.outfile))
while worker.assigned:
Expand Down Expand Up @@ -586,7 +586,7 @@ def _query_worker_status(self, worker):
worker.status.STOPPING,
worker.status.STOPPED,
):
return "inactive", "Worker in stop/abort status"
return "inactive", "Worker {} in stop/abort status".format(worker)

if worker.status.tag in (worker.status.NONE, worker.status.STARTING):
return "initializing", None
Expand Down Expand Up @@ -622,9 +622,13 @@ def _query_worker_status(self, worker):
def _handle_inactive(self, worker, reason):
"""
Handle an inactive worker.
:param worker: worker object
:type worker: :py:class:`~testplan.runners.pool.base.Worker`
:param reason: why worker is considered inactive
:type reason: ``str``
:return: True if worker restarted, else False
:rtype: ``bool``
"""
if worker.status.tag != worker.status.STARTED:
return False
Expand Down
22 changes: 18 additions & 4 deletions testplan/runners/pools/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import signal
import socket
import shutil
import inspect
import logging
import argparse
import platform
Expand Down Expand Up @@ -143,9 +142,16 @@ def _send_and_expect(self, message, send, expect):

def _pre_loop_setup(self, message):
response = self._send_and_expect(
message, message.ConfigRequest, message.ConfigSending
message,
message.ConfigRequest,
[message.ConfigSending, message.Stop],
)

# The pool might be exiting after worker restarts and tries to connect
# Return False if stop signal received, otherwise True, or raise
if response.cmd == message.Stop:
return False

# Response.data: [cfg, cfg.parent, cfg.parent.parent, ...]
pool_cfg = response.data[0]
for idx, cfg in enumerate(response.data):
Expand All @@ -167,6 +173,8 @@ def _pre_loop_setup(self, message):
self.runpath = pool_metadata["runpath"]
self._setup_logfiles()

return True

def worker_loop(self):
"""
Child process worker loop. Manages an underlying thread pool, pulls and
Expand All @@ -177,7 +185,9 @@ def worker_loop(self):
message = Message(**self.metadata)

try:
self._pre_loop_setup(message)
if not self._pre_loop_setup(message):
self.logger.debug("Stop signal received, child exits.")
return
except Exception:
self._transport.send_and_receive(
message.make(message.SetupFailed, data=traceback.format_exc()),
Expand Down Expand Up @@ -277,7 +287,9 @@ def __init__(self, *args, **kwargs):
self._setup_metadata = None

def _pre_loop_setup(self, message):
super(RemoteChildLoop, self)._pre_loop_setup(message)
if not super(RemoteChildLoop, self)._pre_loop_setup(message):
return False

self._setup_metadata = self._send_and_expect(
message, message.MetadataPull, message.Metadata
).data
Expand All @@ -302,6 +314,8 @@ def _pre_loop_setup(self, message):
):
raise RuntimeError("Setup script exited with non 0 code.")

return True

def exit_loop(self):
if self._pool.cfg.delete_pushed:
for item in self._setup_metadata.push_dirs:
Expand Down
9 changes: 6 additions & 3 deletions testplan/runners/pools/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def send_and_receive(self, message, expect=None):
:param message: Message sent.
:type message:
:py:class:`~testplan.runners.pools.communication.Message`
:param expect: Assert message received command is the expected.
:type expect: ``NoneType`` or
:param expect: Expected command of message received.
:type expect: ``NoneType`` or ``tuple`` or ``list`` or
:py:class:`~testplan.runners.pools.communication.Message`
:return: Message received.
:rtype: ``object``
Expand All @@ -82,7 +82,10 @@ def send_and_receive(self, message, expect=None):
raise RuntimeError(
"Received None when {} was expected.".format(expect)
)
assert received.cmd == expect
if isinstance(expect, (tuple, list)):
assert received.cmd in expect
else:
assert received.cmd == expect
return received


Expand Down

0 comments on commit 8b52aef

Please sign in to comment.