Skip to content

Commit

Permalink
Windows Fixes (#87)
Browse files Browse the repository at this point in the history
* have LaunchService.run() return non-0 when there are exceptions

* update launch_counters.py example for Windows

* fix bug that would cause mismatched asyncio loops in some futures

* signal.SIGKILL doesn’t exist on Windows, so emulate it in our Event

* remove left over debug statement

* fix issue that resulted in spurious asyncio errors in LaunchService test
  • Loading branch information
wjwwood committed Jun 21, 2018
1 parent 0354557 commit 5526541
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 24 deletions.
21 changes: 13 additions & 8 deletions launch/examples/launch_counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""

import os
import platform
import sys
from typing import cast
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) # noqa
Expand All @@ -39,10 +40,11 @@ def main(argv=sys.argv[1:]):
"""Main."""
# Any number of actions can optionally be given to the constructor of LaunchDescription.
# Or actions/entities can be added after creating the LaunchDescription.
user_env_var = 'USERNAME' if platform.system() == 'Windows' else 'USER'
ld = LaunchDescription([
launch.actions.LogInfo(msg='Hello World!'),
launch.actions.LogInfo(msg=(
'Is that you, ', launch.substitutions.EnvironmentVariable(name='USER'), '?'
'Is that you, ', launch.substitutions.EnvironmentVariable(name=user_env_var), '?'
)),
])

Expand All @@ -63,8 +65,13 @@ def on_output(event: launch.Event) -> None:
# Prefix just the whoami process with `time`.
ld.add_action(launch.actions.SetLaunchConfiguration('launch-prefix', 'time'))
# Run whoami, but keep handle to action to make a targeted event handler.
if platform.system() == 'Windows':
whoami_cmd = ['echo', '%USERNAME%']
else:
whoami_cmd = [launch.substitutions.FindExecutable(name='whoami')]
whoami_action = launch.actions.ExecuteProcess(
cmd=[launch.substitutions.FindExecutable(name='whoami')],
cmd=whoami_cmd,
shell=(platform.system() == 'Windows')
)
ld.add_action(whoami_action)
# Make event handler that uses the output.
Expand All @@ -79,7 +86,7 @@ def on_output(event: launch.Event) -> None:
ld.add_action(launch.actions.SetLaunchConfiguration('launch-prefix', ''))

# Run the counting program, with default options.
counter_action = launch.actions.ExecuteProcess(cmd=['python3', '-u', './counter.py'])
counter_action = launch.actions.ExecuteProcess(cmd=[sys.executable, '-u', './counter.py'])
ld.add_action(counter_action)

# Setup an event handler for just this process which will exit when `Counter: 4` is seen.
Expand All @@ -98,10 +105,10 @@ def counter_output_handler(event):

# Run the counter a few more times, with various options.
ld.add_action(launch.actions.ExecuteProcess(
cmd=['python3', '-u', './counter.py', '--ignore-sigint']
cmd=[sys.executable, '-u', './counter.py', '--ignore-sigint']
))
ld.add_action(launch.actions.ExecuteProcess(
cmd=['python3', '-u', './counter.py', '--ignore-sigint', '--ignore-sigterm']
cmd=[sys.executable, '-u', './counter.py', '--ignore-sigint', '--ignore-sigterm']
))

# Add our own message for when shutdown is requested.
Expand All @@ -124,9 +131,7 @@ def counter_output_handler(event):
# ls = LaunchService(argv=argv, debug=True) # Use this instead to get more debug messages.
ls = LaunchService(argv=argv)
ls.include_launch_description(ld)
ls.run()

return 0
return ls.run()


if __name__ == '__main__':
Expand Down
5 changes: 4 additions & 1 deletion launch/launch/actions/execute_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ def __on_signal_process_event(
_logger.info("sending signal '{}' to process[{}]".format(
typed_event.signal_name, self.process_details['name']
))
if typed_event.signal_name == 'SIGKILL':
self._subprocess_transport.kill() # works on both Windows and POSIX
return None
self._subprocess_transport.send_signal(typed_event.signal)
return None

Expand Down Expand Up @@ -293,7 +296,7 @@ def printer(context, msg, timeout_substitutions):
args=(base_msg.format('{}', '{}', 'SIGTERM', 'SIGKILL'), sigkill_timeout)
),
EmitEvent(event=SignalProcess(
signal_number=signal.SIGKILL,
signal_number='SIGKILL',
process_matcher=matches_action(self)
))
])
Expand Down
18 changes: 13 additions & 5 deletions launch/launch/events/process/signal_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import signal as signal_module # to avoid confusion with .signal property in type annotations
from typing import Callable
from typing import Text
from typing import Union

from .process_targeted_event import ProcessTargetedEvent
from ...utilities import ensure_argument_type
Expand All @@ -33,23 +34,30 @@ class SignalProcess(ProcessTargetedEvent):

def __init__(
self, *,
signal_number: signal_module.Signals,
signal_number: Union[Text, signal_module.Signals],
process_matcher: Callable[['ExecuteProcess'], bool]
) -> None:
"""
Constructor.
Takes an optional process matcher, which can be used to match the
signal to a process.
Since Windows does not support SIGKILL, the string 'SIGKILL' can be
given instead of `signal.SIGKILL`.
Handlers of this event can then check for the 'SIGKILL' string and do
the appropriate thing on Windows instead of sending a signal.
:signal_number: either the string 'SIGKILL' or a signal.Signals
"""
super().__init__(process_matcher=process_matcher)
ensure_argument_type(
signal_number, (signal_module.Signals,), 'signal_number', 'SignalProcess')
signal_number, (str, signal_module.Signals), 'signal_number', 'SignalProcess')
self.__signal = signal_number

@property
def signal(self) -> signal_module.Signals:
"""Getter for signal, it will match something from the signal module."""
def signal(self) -> Union[Text, signal_module.Signals]:
"""Getter for signal, it will be 'SIGKILL' or match something from the signal module."""
return self.__signal

@property
Expand All @@ -59,4 +67,4 @@ def signal_name(self) -> Text:
It will be something like (e.g.) 'SIGINT'.
"""
return self.__signal.name
return self.__signal if isinstance(self.__signal, str) else self.__signal.name
36 changes: 33 additions & 3 deletions launch/launch/launch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from .utilities import visit_all_entities_and_collect_futures

_logger = logging.getLogger('launch.LaunchService')
_g_loops_used = set()
_g_loops_used = set() # type: Set


# This atexit handler ensures all the loops are closed at exit.
Expand Down Expand Up @@ -118,6 +118,9 @@ def __init__(
self.__shutting_down = False
self.__shutdown_when_idle = False

# Used to keep track of whether or not there were unexpected exceptions.
self.__return_code = 0

def emit_event(self, event: Event) -> None:
"""
Emit an event synchronously and thread-safely.
Expand All @@ -127,10 +130,11 @@ def emit_event(self, event: Event) -> None:
with self.__loop_from_run_thread_lock:
if self.__loop_from_run_thread is not None:
# loop is in use, asynchronously emit the event
asyncio.run_coroutine_threadsafe(
future = asyncio.run_coroutine_threadsafe(
self.__context.emit_event(event),
self.__loop_from_run_thread
)
future.result()
else:
# loop is not in use, synchronously emit the event, and it will be processed later
self.__context.emit_event_sync(event)
Expand Down Expand Up @@ -244,6 +248,7 @@ def run(self, *, shutdown_when_idle=True) -> None:
'LaunchService.run() called from multiple threads concurrently.')
self.__running = True

self.__return_code = 0 # reset the return_code for this run()
self.__shutdown_when_idle = shutdown_when_idle

# Acquire the lock and initialize the asyncio loop.
Expand All @@ -255,7 +260,23 @@ def run(self, *, shutdown_when_idle=True) -> None:
_g_loops_used.add(self.__loop_from_run_thread)
if self.__debug:
self.__loop_from_run_thread.set_debug(True)

# Setup the exception handler to make sure we return non-0 when there are errors.
def exception_handler(loop, context):
self.__return_code = 1
return loop.default_exception_handler(context)
self.__loop_from_run_thread.set_exception_handler(exception_handler)

# Set the asyncio loop for the context.
self.__context._set_asyncio_loop(self.__loop_from_run_thread)
# Recreate the event queue to ensure the same event loop is being used.
new_queue = asyncio.Queue(loop=self.__loop_from_run_thread)
while True:
try:
new_queue.put_nowait(self.__context._event_queue.get_nowait())
except asyncio.QueueEmpty:
break
self.__context._event_queue = new_queue

# Run the asyncio loop over the main coroutine that processes events.
try:
Expand Down Expand Up @@ -308,13 +329,22 @@ def _on_sigquit(signum, frame):
with self.__running_lock:
self.__running = False

return self.__return_code

def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]:
self.__shutting_down = True
return None

def _shutdown(self, *, reason, due_to_sigint):
# Assumption is that this method is only called when running.
if not self.__shutting_down:
self.emit_event(Shutdown(reason=reason, due_to_sigint=due_to_sigint))
shutdown_event = Shutdown(reason=reason, due_to_sigint=due_to_sigint)
if self.__loop_from_run_thread == asyncio.get_event_loop():
# If in the thread of the loop.
self.__loop_from_run_thread.create_task(self.__context.emit_event(shutdown_event))
else:
# Otherwise in a different thread, so use the thread-safe method.
self.emit_event(shutdown_event)
self.__shutting_down = True
self.__context._set_is_shutdown(True)

Expand Down
18 changes: 13 additions & 5 deletions launch/test/launch/test_launch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,22 @@ class MockEvent:
t = threading.Thread(target=ls.run, kwargs={'shutdown_when_idle': False})
t.start()

# First event (after including description of event handler).
handled_events.get(block=True, timeout=5.0)

# Emit and then check for a second event.
ls.emit_event(MockEvent())
handled_events.get(timeout=5.0) # First event (after including description of event handler).
handled_events.get(timeout=5.0) # Second event.
ls.shutdown()
handled_events.get(block=True, timeout=5.0)

# Shutdown (generates a third event) and join the thread.
ls.shutdown()
t.join()
# Check that the shutdown event was handled.
handled_events.get(block=False)

assert handled_events.qsize() == 0
ls.emit_event(MockEvent())
assert handled_events.qsize() == 0

ls.run(shutdown_when_idle=True)
handled_events.get(timeout=5.0)
assert ls.run(shutdown_when_idle=True) == 0
handled_events.get(block=False)
2 changes: 1 addition & 1 deletion launch_ros/examples/lifecycle_pub_sub_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def main(argv=sys.argv[1:]):
ls = launch.LaunchService(argv=argv)
ls.include_launch_description(get_default_launch_description(prefix_output_with_name=False))
ls.include_launch_description(ld)
ls.run()
return ls.run()


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion launch_ros/examples/pub_sub_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def main(argv=sys.argv[1:]):
ls = LaunchService()
ls.include_launch_description(get_default_launch_description(prefix_output_with_name=False))
ls.include_launch_description(ld)
ls.run()
return ls.run()


if __name__ == '__main__':
Expand Down

0 comments on commit 5526541

Please sign in to comment.