Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
TaskUtilities: Fix output capturing
Browse files Browse the repository at this point in the history
  • Loading branch information
blackandred committed May 13, 2020
1 parent c00f8c7 commit cece64b
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 25 deletions.
54 changes: 54 additions & 0 deletions src/rkd/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python3

import os
import sys
import subprocess
from threading import Thread
from io import StringIO


def check_call(command: str, stdin=None):
os.environ['PYTHONUNBUFFERED'] = "1"

stdout_pipe_r, stdout_pipe_w = os.pipe()
stderr_pipe_r, stderr_pipe_w = os.pipe()

# keep the last 1024 characters of stderr
err_buffer = StringIO()
out_buffer = StringIO()

process = subprocess.Popen('bash', shell=True, stdin=stdin, stdout=stdout_pipe_w, stderr=stderr_pipe_w,
bufsize=1)

stdout_thread = Thread(target=_copy_stream, args=(stdout_pipe_r, sys.stdout, process, out_buffer))
stdout_thread.daemon = True
stdout_thread.start()

stderr_thread = Thread(target=_copy_stream, args=(stderr_pipe_r, sys.stderr, process, err_buffer))
stderr_thread.daemon = True
stderr_thread.start()

exit_code = process.wait()

if exit_code > 0:
raise subprocess.CalledProcessError(
exit_code, command, stderr=err_buffer.getvalue(), output=out_buffer.getvalue()
)


def _copy_stream(in_stream_fd: int, out_stream, process: subprocess.Popen, copy: StringIO = None):
buffer_wrote_size = 0

while process.poll() is None:
read = os.read(in_stream_fd, 1024).decode('utf-8')
out_stream.write(read)

if copy:
if buffer_wrote_size >= 1024:
copy.truncate()

buffer_wrote_size += len(read)
copy.write(read)

read = os.read(in_stream_fd, 1024).decode('utf-8')
out_stream.write(read)
28 changes: 4 additions & 24 deletions src/rkd/taskutil.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@

import os
import sys
from typing import Union
from subprocess import check_call, check_output, Popen, DEVNULL, PIPE as SUBPROCESS_PIPE, CalledProcessError, STDOUT as SUBPROCESS_STDOUT
from threading import Thread
from subprocess import check_output, Popen, DEVNULL, CalledProcessError
from abc import ABC as AbstractClass, abstractmethod
from .inputoutput import IO
from .process import check_call


class TaskUtilities(AbstractClass):
Expand Down Expand Up @@ -56,30 +55,11 @@ def sh(self, cmd: str, capture: bool = False, verbose: bool = False, strict: boo
os.close(write)

if not capture:
# process = Popen('bash', shell=True, stdin=read, stdout=SUBPROCESS_PIPE, stderr=SUBPROCESS_STDOUT, bufsize=1)
#
# stdout_thread = Thread(target=self._copy_stream, args=(process.stdout, sys.stdout, process))
# stdout_thread.daemon = True
# stdout_thread.start()
#
# exit_code = process.wait()
#
# if exit_code > 0:
# raise CalledProcessError(exit_code, cmd)

check_call('bash', shell=True, stdin=read)
check_call('bash', stdin=read)
return

return check_output('bash', shell=True, stdin=read).decode('utf-8')

# @staticmethod
# def _copy_stream(in_stream, out_stream, process: Popen):
# while process.poll() is None:
# for line in iter(in_stream.readline, ''):
# out_stream.write(line.decode('utf-8'))
#
# out_stream.write(in_stream.read().decode('utf-8'))

def exec(self, cmd: str, capture: bool = False, background: bool = False) -> Union[str, None]:
""" Starts a process in shell. Throws exception on error.
To capture output set capture=True
Expand All @@ -93,7 +73,7 @@ def exec(self, cmd: str, capture: bool = False, background: bool = False) -> Uni
return

if not capture:
check_call(cmd, shell=True)
check_call(cmd)
return

return check_output(cmd, shell=True).decode('utf-8')
Expand Down
37 changes: 37 additions & 0 deletions test/test_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import unittest
import os
import subprocess
from io import StringIO
from rkd.standardlib import InitTask
from rkd.inputoutput import IO

CURRENT_SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__))

Expand All @@ -26,3 +29,37 @@ def test():
task.exec('ls', background=True, capture=True)

self.assertRaises(Exception, test)

def test_sh_captures_output_in_correct_order(self):
self.maxDiff = None # unittest setting
task = InitTask()

io = IO()
out = StringIO()

with io.capture_descriptors(stream=out, enable_standard_out=False):
task.sh(''' set +e;
sleep 0.5;
echo "FIRST";
sleep 0.5;
echo "SECOND" >&2;
echo "THIRD";
echo "FOURTH" >&2;
echo "FIFTH" >&2;
''')

self.assertEqual("FIRST\nSECOND\nTHIRD\nFOURTH\nFIFTH\n", out.getvalue())

def test_sh_provides_stdout_and_stderr_in_exception(self):
task = InitTask()

try:
task.sh('''
echo "Freedom, Equality, Mutual Aid!"
echo "The state and capitalism is failing" >&2
exit 161
''')
except subprocess.CalledProcessError as e:
self.assertEqual("Freedom, Equality, Mutual Aid!\n", e.output)
self.assertEqual("The state and capitalism is failing\n", e.stderr)
2 changes: 1 addition & 1 deletion test/test_yaml_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def test_create_bash_callable_successful_case(self):
self.assertIn("Python", test_result.read())
self.assertTrue(result, msg='python --version should result with a True')

def test_create_bash_callable_failure_case(self):
def test_create_bash_callable_failure_case_on_invalid_exit_code(self):
""" Bash callable test: Check if failures are correctly catched """

result = self._create_callable_tester('exit 161', language='bash')
Expand Down

0 comments on commit cece64b

Please sign in to comment.