Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
Fix output in invalid order stderr/stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
blackandred committed May 12, 2020
1 parent 7284781 commit c075d08
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 42 deletions.
30 changes: 26 additions & 4 deletions src/rkd/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Dict, List, Union
from argparse import ArgumentParser
from .inputoutput import IO
from .exception import UndefinedEnvironmentVariableUsageError
from .exception import UndefinedEnvironmentVariableUsageError, EnvironmentVariableNotUsed
from .taskutil import TaskUtilities


Expand Down Expand Up @@ -106,9 +106,28 @@ def __init__(self, declaration: TaskDeclarationInterface,
self.args = args
self.env = env

def getenv(self, name: str):
def getenv(self, name: str, error_on_not_used: bool = False):
""" Get environment variable value """
return self.declaration.get_task_to_execute().internal_getenv(name, self.env)
return self.declaration.get_task_to_execute().internal_getenv(name, self.env,
error_on_not_used=error_on_not_used)

def getarg(self, name: str) -> Union[str, None]:
try:
return self.args[name]
except KeyError:
raise Exception('"%s" is not a defined argument')

def get_arg_or_env(self, name: str) -> Union[str, None]:
env_name = name[2:].replace('-', '_').upper()

try:
return self.getenv(env_name, error_on_not_used=True)
except EnvironmentVariableNotUsed:
pass

arg_name = name[2:].replace('-', '_')

return self.args[arg_name]


class TaskInterface(TaskUtilities):
Expand Down Expand Up @@ -171,7 +190,7 @@ def get_declared_envs(self) -> Dict[str, str]:
""" Dictionary of allowed envs to override: KEY -> DEFAULT VALUE """
return {}

def internal_getenv(self, env_name: str, envs: Dict[str, str]) -> str:
def internal_getenv(self, env_name: str, envs: Dict[str, str], error_on_not_used: bool = False) -> str:
declared_envs = self.get_declared_envs()

if env_name not in declared_envs:
Expand All @@ -182,6 +201,9 @@ def internal_getenv(self, env_name: str, envs: Dict[str, str]) -> str:

# return default value
if env_name not in envs:
if error_on_not_used:
raise EnvironmentVariableNotUsed(env_name)

return declared_envs[env_name]

return envs[env_name]
Expand Down
4 changes: 4 additions & 0 deletions src/rkd/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class UndefinedEnvironmentVariableUsageError(TaskException):
pass


class EnvironmentVariableNotUsed(TaskException):
pass


class UserInputException(Exception):
pass

Expand Down
71 changes: 33 additions & 38 deletions src/rkd/taskutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,39 @@
import os
import sys
from typing import Union
from time import sleep
from subprocess import check_call, check_output, Popen, DEVNULL, PIPE as SUBPROCESS_PIPE, CalledProcessError
from queue import Queue
from subprocess import check_call, check_output, Popen, DEVNULL, PIPE as SUBPROCESS_PIPE, CalledProcessError, STDOUT as SUBPROCESS_STDOUT
from threading import Thread
from abc import ABC as AbstractClass
from abc import ABC as AbstractClass, abstractmethod
from .inputoutput import IO


class TaskUtilities(AbstractClass):
"""
Internal helpers for TaskInterface implementations
"""

@abstractmethod
def io(self) -> IO:
pass

def silent_sh(self, cmd: str, verbose: bool = False, strict: bool = True,
env: dict = None) -> bool:
"""
sh() shortcut that catches errors and displays using IO().error_msg()
"""

# kwargs is not used on purpose. For static analysis.

try:
self.sh(cmd=cmd, capture=False, verbose=verbose, strict=strict, env=env)
return True

except CalledProcessError as e:
self.io().error_msg(str(e))
return False

def sh(self, cmd: str, capture: bool = False, verbose: bool = False, strict: bool = True,
env: dict = None) -> Union[str, None]:

""" Executes a shell script in bash. Throws exception on error.
To capture output set capture=True
"""
Expand All @@ -38,38 +56,13 @@ def sh(self, cmd: str, capture: bool = False, verbose: bool = False, strict: boo
os.close(write)

if not capture:
process = Popen('bash', shell=True, stdin=read, stdout=SUBPROCESS_PIPE, stderr=SUBPROCESS_PIPE, bufsize=1,
close_fds='posix' in sys.builtin_module_names)
process = Popen('bash', shell=True, stdin=read, stdout=SUBPROCESS_PIPE, stderr=SUBPROCESS_STDOUT)

out_queue = Queue()
stdout_thread = Thread(target=self._enqueue_output, args=(process.stdout, out_queue))
stdout_thread = Thread(target=self._copy_stream, args=(process.stdout, sys.stdout, process))
stdout_thread.daemon = True
stdout_thread.start()

err_queue = Queue()
stderr_thread = Thread(target=self._enqueue_output, args=(process.stderr, err_queue))
stderr_thread.daemon = True
stderr_thread.start()

stderr = ''

# subprocess is having issues with giving stdout and stderr streams directory as arguments
# that's why the streams are copied there
def flush():
if not out_queue.empty():
out_line = out_queue.get(timeout=.1)
sys.stdout.write(out_line.decode('utf-8'))

if not err_queue.empty():
err_line = err_queue.get(timeout=.1)
stderr = err_line.decode('utf-8')
sys.stderr.write(stderr)

while process.poll() is None:
flush()
sleep(0.01) # important: to not dry the CPU (no sleep = full cpu usage at one core)

flush()
exit_code = process.wait()

if exit_code > 0:
Expand All @@ -80,10 +73,10 @@ def flush():
return check_output('bash', shell=True, stdin=read).decode('utf-8')

@staticmethod
def _enqueue_output(out, queue: Queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
def _copy_stream(in_stream, out_stream, process: Popen):
while process.poll() is None:
for line in iter(in_stream.readline, ''):
out_stream.write(line.decode('utf-8'))

def exec(self, cmd: str, capture: bool = False, background: bool = False) -> Union[str, None]:
""" Starts a process in shell. Throws exception on error.
Expand All @@ -103,9 +96,11 @@ def exec(self, cmd: str, capture: bool = False, background: bool = False) -> Uni

return check_output(cmd, shell=True).decode('utf-8')

def rkd(self, args: list) -> str:
def rkd(self, args: list, verbose: bool = False) -> str:
""" Spawns an RKD subprocess
"""

bash_opts = 'set -x; ' if verbose else ''
args_str = ' '.join(args)
return self.exec('rkd --no-ui %s' % args_str, capture=True, background=False)

return self.sh(bash_opts + ' rkd --no-ui %s' % args_str)

0 comments on commit c075d08

Please sign in to comment.