Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
#48: Add interactive mode
Browse files Browse the repository at this point in the history
  • Loading branch information
blackandred committed Oct 22, 2020
1 parent 6e5b801 commit f9ddb01
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 44 deletions.
4 changes: 2 additions & 2 deletions src/rkd/api/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,15 @@ def format_task_name(self, name: str) -> str:

return name

def py(self, code: str, become: str = None, capture: bool = False, script_path: str = None) -> Union[str, None]:
def py(self, code: str = '', become: str = None, capture: bool = False, script_path: str = None, arguments: str = '') -> Union[str, None]:
"""Executes a Python code in a separate process
NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
"""

return super().py(
code=code, become=become, capture=capture, script_path=script_path
code=code, become=become, capture=capture, script_path=script_path, arguments=arguments
)

def sh(self, cmd: str, capture: bool = False, verbose: bool = False, strict: bool = True,
Expand Down
8 changes: 1 addition & 7 deletions src/rkd/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,6 @@ def _execute_as_forked_process(become: str, task: TaskInterface, temp: TempManag

return False

code_file = temp.assign_temporary_file()
task.io().debug('Assigned source code temporary file at "%s"' % code_file)

with open(code_file, 'w') as f:
f.write(FORKED_EXECUTOR_TEMPLATE)

# set permissions to temporary file
if become:
task.io().debug('Setting temporary file permissions')
Expand All @@ -167,7 +161,7 @@ def _execute_as_forked_process(become: str, task: TaskInterface, temp: TempManag
return False

task.io().debug('Executing python code')
task.py(communication_file, become=become, capture=False, script_path=code_file)
task.py(FORKED_EXECUTOR_TEMPLATE, become=become, capture=False, arguments=communication_file)

# collect, process and pass result
task.io().debug('Parsing subprocess results from a serialized data')
Expand Down
2 changes: 1 addition & 1 deletion src/rkd/execution/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _communicate_return(val):
f.write(pickle.dumps(val))
communication_file = sys.stdin.read().strip()
communication_file = sys.argv[1]
try:
#
Expand Down
54 changes: 35 additions & 19 deletions src/rkd/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import os
import sys
import subprocess
from io import FileIO
import termios
import tty
import pty
import select
from typing import Optional
from threading import Thread

Expand Down Expand Up @@ -35,38 +38,51 @@ def get_value(self) -> str:
return self.text


def check_call(command: str, stdin=None, script: Optional[str] = ''):
def check_call(command: str, script: Optional[str] = ''):
if os.getenv('RKD_COMPAT_SUBPROCESS') == 'true':
subprocess.check_call(command, stdin=stdin, shell=True)
subprocess.check_call(command, shell=True)
return

os.environ['PYTHONUNBUFFERED'] = "1"

process = subprocess.Popen(command, shell=True, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
bufsize=1, close_fds=ON_POSIX, universal_newlines=True)
old_tty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())

out_buffer = TextBuffer(buffer_size=1024 * 10)
stdout_thread = Thread(target=push_output, args=(process.stdout, sys.stdout, out_buffer))
stdout_thread.daemon = True
stdout_thread.start()
# open a virtual terminal
primary_fd, replica_fd = pty.openpty()

exit_code = process.wait()
process = subprocess.Popen(command, shell=True, stdin=replica_fd, stdout=replica_fd, stderr=replica_fd,
bufsize=1, close_fds=ON_POSIX, universal_newlines=False, preexec_fn=os.setsid)

out_buffer = TextBuffer(buffer_size=1024 * 10)
stdout_thread = Thread(target=push_output, args=(process, primary_fd, out_buffer))
stdout_thread.daemon = True
stdout_thread.start()

exit_code = process.wait()
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

if exit_code > 0:
raise subprocess.CalledProcessError(
exit_code, script if script else command, stderr=out_buffer.get_value(), output=out_buffer.get_value()
)


def push_output(input_stream: FileIO, output_stream, out_buffer: TextBuffer):
for line in iter(input_stream.readline, ''):
output_stream.write(line)
output_stream.flush()
out_buffer.write(line)
def push_output(process: subprocess.Popen, primary_fd, out_buffer: TextBuffer):
while process.poll() is None:
r, w, e = select.select([sys.stdin, primary_fd], [], [])

input_stream.close()
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(primary_fd, d)

elif primary_fd in r:
o = os.read(primary_fd, 10240)

# def _debug(msg):
# with open('/dev/stdout', 'w') as f:
# f.write(str(msg))
# propagate to stdout
if o:
sys.stdout.write(o.decode('utf-8'))
sys.stdout.flush()
out_buffer.write(o.decode('utf-8'))
38 changes: 31 additions & 7 deletions src/rkd/taskutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from typing import Union
from subprocess import check_output, Popen, DEVNULL, CalledProcessError
from tempfile import NamedTemporaryFile
from abc import ABC as AbstractClass, abstractmethod
from .api.inputoutput import IO
from .process import check_call
Expand Down Expand Up @@ -86,38 +87,61 @@ def sh(self, cmd: str, capture: bool = False, verbose: bool = False, strict: boo
bash_script = "#!/bin/bash -eopipefail \n" + cmd
bash_script = bash_script.replace('%RKD%', self.get_rkd_binary())

if not capture:
with NamedTemporaryFile() as bash_temp_file:
bash_temp_file.write(bash_script.encode('utf-8'))
bash_temp_file.flush()

check_call('bash ' + bash_temp_file.name, script=bash_script)

return

read, write = os.pipe()
os.write(write, bash_script.encode('utf-8'))
os.close(write)

if not capture:
check_call('bash', stdin=read, script=bash_script)
return

return check_output('bash', shell=True, stdin=read).decode('utf-8')

def py(self, code: str, become: str = None, capture: bool = False, script_path: str = None) -> Union[str, None]:
def py(self, code: str = '', become: str = None, capture: bool = False,
script_path: str = None, arguments: str = '') -> Union[str, None]:

"""Executes a Python code in a separate process"""

if (not code and not script_path) or (code and script_path):
raise Exception('You need to provide only one of "code" or "script_path"')

read, write = os.pipe()
os.write(write, code.encode('utf-8'))
os.close(write)

cmd = 'python'
py_temp_file = None

if script_path:
cmd += ' ' + script_path + ' '

if code:
with NamedTemporaryFile(delete=False) as py_temp_file:
py_temp_file.write(code.encode('utf-8'))
py_temp_file.flush()

cmd += ' ' + py_temp_file.name

if become:
cmd = "sudo -E -u %s %s" % (become, cmd)

os.putenv('RKD_BIN', self.get_rkd_binary())

if not capture:
check_call(cmd, stdin=read, script=code)
check_call(cmd + ' ' + arguments, script=code)
os.unlink(py_temp_file.name) if py_temp_file else None
return

return check_output(cmd, shell=True, stdin=read).decode('utf-8')
if capture:
out = check_output(cmd + ' ' + arguments, shell=True, stdin=read).decode('utf-8')
os.unlink(py_temp_file.name) if py_temp_file else None

return out

def exec(self, cmd: str, capture: bool = False, background: bool = False) -> Union[str, None]:
""" Starts a process in shell. Throws exception on error.
Expand Down
2 changes: 1 addition & 1 deletion test/test_declarativeexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_executing_multiple_steps_one_by_one_the_order_is_preserved(self):
with io.capture_descriptors(target_files=[], stream=str_io, enable_standard_out=False):
executor.execute_steps_one_by_one(ctx, task_declaration.get_task_to_execute())

self.assertEqual("First\nSecond\nThird\n", str_io.getvalue())
self.assertEqual("First\nSecond\r\nThird\n", str_io.getvalue())

def test_one_failed_step_is_preventing_next_steps_from_execution_and_result_is_marked_as_failure(self):
"""Check the correctness of error handling"""
Expand Down
14 changes: 7 additions & 7 deletions test/test_taskutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_sh_captures_output_in_correct_order_with_various_timing(self):
echo "TENTH";
''')

self.assertEqual("FIRST\nSECOND\nTHIRD\nFOURTH\nFIFTH\nSIXTH\nSEVENTH\nNINETH\nTENTH\n", out.getvalue())
self.assertEqual("FIRST\r\nSECOND\r\nTHIRD\r\nFOURTH\r\nFIFTH\r\nSIXTH\r\nSEVENTH\r\nNINETH\r\nTENTH\r\n", out.getvalue())

def test_sh_producing_large_outputs(self):
"""Process a few megabytes of output and assert that:
Expand All @@ -92,7 +92,7 @@ def test_sh_producing_large_outputs(self):
''')

iterations = 1024 * 128
text_with_newlines_length = len(text) + 1
text_with_newlines_length = len(text) + 2 # \r + \n
memory_after = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024

self.assertEqual(iterations * text_with_newlines_length, len(out.getvalue()))
Expand Down Expand Up @@ -123,7 +123,7 @@ def test_sh_captures_output_in_correct_order_with_fixed_timing(self):
echo "THIRD";
''')

self.assertEqual("FIRST\nSECOND\nTHIRD\n", out.getvalue())
self.assertEqual("FIRST\r\nSECOND\r\nTHIRD\r\n", out.getvalue())

def test_sh_rkd_in_rkd_shows_first_lines_on_error(self):
"""Bugfix: sh() was loosing first line(s) of output, when exception was raised
Expand Down Expand Up @@ -254,12 +254,12 @@ def test_py_executes_a_custom_python_script(self):
task._io = IO()

with NamedTemporaryFile() as temp_file:
temp_file.write(b'import sys; print("STDIN: " + str(sys.stdin.read()))')
temp_file.write(b'import sys; print(sys.argv[1])')
temp_file.flush()

out = task.py('Hello!', capture=True, script_path=temp_file.name)
out = task.py('', capture=True, script_path=temp_file.name, arguments='Hello!')

self.assertEqual('STDIN: Hello!\n', out)
self.assertEqual('Hello!\n', out)

def test_py_inherits_environment_variables(self):
os.putenv('PY_INHERITS_ENVIRONMENT_VARIABLES', 'should')
Expand All @@ -283,4 +283,4 @@ def test_py_uses_sudo_when_become_specified(self):
mocked_subprocess: unittest.mock.MagicMock
task.py(code='print("test")', capture=True, become='root')

self.assertEqual('sudo -E -u root python', mocked_subprocess.call_args[0][0])
self.assertIn('sudo -E -u root python', mocked_subprocess.call_args[0][0])

0 comments on commit f9ddb01

Please sign in to comment.