Skip to content

Commit

Permalink
Merge 7de073a into 7f08b0e
Browse files Browse the repository at this point in the history
  • Loading branch information
smarr committed Nov 27, 2022
2 parents 7f08b0e + 7de073a commit 4706fc5
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 65 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [ '2.7', '3.7', '3.9', 'pypy3' ]
python-version: [ '2.7', '3.7', '3.9', 'pypy3.9' ]
yaml-parser: ['', 'ruamel']
include:
- python-version: 2.7
Expand All @@ -23,10 +23,10 @@ jobs:
name: Python ${{ matrix.python-version }} ${{ matrix.yaml-parser }}
steps:
- name: Checkout ReBench
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v1
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
architecture: x64
Expand Down Expand Up @@ -79,9 +79,9 @@ jobs:
run: |
apt-get update
apt-get install -y --no-install-recommends time
- name: Checkout ReBench
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Install PyTest
run: pip install pytest
Expand Down
22 changes: 19 additions & 3 deletions rebench/denoise.py
Expand Up @@ -12,7 +12,8 @@
from cpuinfo import get_cpu_info

from .ui import escape_braces
from .subprocess_with_timeout import output_as_str
from .subprocess_with_timeout import output_as_str # pylint: disable=cyclic-import
from .subprocess_kill import kill_process # pylint: disable=cyclic-import

try:
from . import __version__ as rebench_version
Expand Down Expand Up @@ -144,6 +145,13 @@ def restore_noise(denoise_result, show_warning, ui):
ui.error(denoise_result.warn_msg)


def deliver_kill_signal(pid):
try:
cmd = ['sudo', '-n', 'rebench-denoise', '--json', 'kill', str(pid)]
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except (subprocess.CalledProcessError, FileNotFoundError):
pass

def _can_set_niceness():
"""
Check whether we can ask the operating system to influence the priority of
Expand Down Expand Up @@ -331,6 +339,10 @@ def _exec(num_cores, use_nice, use_shielding, args):
os.execvpe(cmd, cmdline, env)


def _kill(proc_id):
kill_process(int(proc_id), True, None, False)


def _calculate(core_id):
print("Started calculating: %d" % core_id)
try:
Expand Down Expand Up @@ -381,10 +393,12 @@ def _shell_options():
parser.add_argument('--for-profiling', action='store_true', default=False,
dest='for_profiling', help="Don't restrict CPU usage by profiler")
parser.add_argument('command',
help=("`minimize`|`restore`|`exec -- `|`test`: "
"`minimize` sets system to reduce noise. "
help=("`minimize`|`restore`|`exec -- `|`kill pid`|`test`: "
"`minimize` sets system to reduce noise. " +
"`restore` sets system to the assumed original settings. " +
"`exec -- ` executes the given arguments. " +
"`kill pid` send kill signal to the process with given id " +
"and all child processes. " +
"`test` executes a computation for 20 seconds in parallel. " +
"it is only useful to test rebench-denoise itself."),
default=None)
Expand All @@ -405,6 +419,8 @@ def main_func():
result = _restore_standard_settings(num_cores, args.use_shielding)
elif args.command == 'exec':
_exec(num_cores, args.use_nice, args.use_shielding, remaining_args)
elif args.command == 'kill':
_kill(remaining_args[0])
elif args.command == 'test':
_test(num_cores)
else:
Expand Down
4 changes: 3 additions & 1 deletion rebench/executor.py
Expand Up @@ -476,7 +476,9 @@ def _keep_alive(seconds):
cmdline, env=run_id.env, cwd=run_id.location, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=True, verbose=self.debug,
timeout=run_id.max_invocation_time,
keep_alive_output=_keep_alive)
keep_alive_output=_keep_alive,
uses_sudo=self.use_denoise
)
except OSError as err:
run_id.fail_immediately()
if err.errno == 2:
Expand Down
69 changes: 69 additions & 0 deletions rebench/subprocess_kill.py
@@ -0,0 +1,69 @@
from os import kill
from signal import SIGKILL
from subprocess import PIPE, Popen

IS_PY3 = None

try:
_ = ProcessLookupError
IS_PY3 = True
except NameError:
IS_PY3 = False

# Indicate timeout with standard exit code
E_TIMEOUT = -9


def _kill_py2(proc_id, uses_sudo):
if uses_sudo:
from .denoise import deliver_kill_signal # pylint: disable=import-outside-toplevel

deliver_kill_signal(proc_id)
return

try:
kill(proc_id, SIGKILL)
except IOError:
# it's a race condition, so let's simply ignore it
pass


def _kill_py3(proc_id, uses_sudo):
if uses_sudo:
from .denoise import deliver_kill_signal # pylint: disable=import-outside-toplevel

deliver_kill_signal(proc_id)
return

try:
kill(proc_id, SIGKILL)
except ProcessLookupError: # pylint: disable=undefined-variable
# it's a race condition, so let's simply ignore it
pass


def kill_process(pid, recursively, thread, uses_sudo):
pids = [pid]
if recursively:
pids.extend(_get_process_children(pid))

for proc_id in pids:
if IS_PY3:
_kill_py3(proc_id, uses_sudo)
else:
_kill_py2(proc_id, uses_sudo)

if thread:
thread.join()
return E_TIMEOUT, thread.stdout_result, thread.stderr_result
return E_TIMEOUT, None, None


def _get_process_children(pid):
# pylint: disable-next=consider-using-with
proc = Popen('pgrep -P %d' % pid, shell=True, stdout=PIPE, stderr=PIPE)
stdout, _stderr = proc.communicate()
result = [int(p) for p in stdout.split()]
for child in result[:]:
result.extend(_get_process_children(child))
return result
97 changes: 41 additions & 56 deletions rebench/subprocess_with_timeout.py
@@ -1,13 +1,14 @@
from __future__ import print_function

from os import kill
from select import select
from signal import SIGKILL
from subprocess import PIPE, STDOUT, Popen
from threading import Thread, Condition
from time import time

import sys
import signal

from .subprocess_kill import kill_process

IS_PY3 = None

Expand All @@ -33,6 +34,19 @@ def output_as_str(string_like):
else:
return string_like

_signals_setup = False


def keyboard_interrupt_on_sigterm(signum, frame):
raise KeyboardInterrupt()


def _setup_signal_handling_if_needed():
global _signals_setup # pylint: disable=global-statement
if not _signals_setup:
_signals_setup = True
signal.signal(signal.SIGTERM, keyboard_interrupt_on_sigterm)


class _SubprocessThread(Thread):

Expand Down Expand Up @@ -123,17 +137,41 @@ def _print_keep_alive(seconds_since_start):

def run(args, env, cwd=None, shell=False, kill_tree=True, timeout=-1,
verbose=False, stdout=PIPE, stderr=PIPE, stdin_input=None,
keep_alive_output=_print_keep_alive):
keep_alive_output=_print_keep_alive, uses_sudo=False):
"""
Run a command with a timeout after which it will be forcibly
killed.
"""
_setup_signal_handling_if_needed()
executable_name = args.split(' ', 1)[0]

thread = _SubprocessThread(executable_name, args, env, shell, cwd, verbose, stdout,
stderr, stdin_input)
thread.start()

was_interrupted = False

try:
_join_with_keep_alive(keep_alive_output, thread, timeout)
except KeyboardInterrupt:
was_interrupted = True

if (timeout != -1 or was_interrupted) and thread.is_alive():
assert thread.get_pid() is not None
result = kill_process(thread.get_pid(), kill_tree, thread, uses_sudo)
if was_interrupted:
raise KeyboardInterrupt()
return result

if not thread.is_alive():
exp = thread.exception
if exp:
raise exp # pylint: disable=raising-bad-type

return thread.returncode, thread.stdout_result, thread.stderr_result


def _join_with_keep_alive(keep_alive_output, thread, timeout):
if timeout == -1:
thread.join()
else:
Expand All @@ -154,56 +192,3 @@ def run(args, env, cwd=None, shell=False, kill_tree=True, timeout=-1,
diff = time() - start
if diff < timeout:
keep_alive_output(diff)

if timeout != -1 and thread.is_alive():
assert thread.get_pid() is not None
return _kill_process(thread.get_pid(), kill_tree, thread)

if not thread.is_alive():
exp = thread.exception
if exp:
raise exp # pylint: disable=raising-bad-type

return thread.returncode, thread.stdout_result, thread.stderr_result


def _kill_py2(proc_id):
try:
kill(proc_id, SIGKILL)
except IOError:
# it's a race condition, so let's simply ignore it
pass


def _kill_py3(proc_id):
try:
kill(proc_id, SIGKILL)
except ProcessLookupError: # pylint: disable=undefined-variable
# it's a race condition, so let's simply ignore it
pass


def _kill_process(pid, recursively, thread):
pids = [pid]
if recursively:
pids.extend(_get_process_children(pid))

for proc_id in pids:
if IS_PY3:
_kill_py3(proc_id)
else:
_kill_py2(proc_id)

thread.join()

return E_TIMEOUT, thread.stdout_result, thread.stderr_result


def _get_process_children(pid):
# pylint: disable-next=consider-using-with
proc = Popen('pgrep -P %d' % pid, shell=True, stdout=PIPE, stderr=PIPE)
stdout, _stderr = proc.communicate()
result = [int(p) for p in stdout.split()]
for child in result[:]:
result.extend(_get_process_children(child))
return result

0 comments on commit 4706fc5

Please sign in to comment.