Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
Fix: Not flushing output immediately
Browse files Browse the repository at this point in the history
  • Loading branch information
blackandred committed May 12, 2020
1 parent 2963ad9 commit 331c851
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions src/rkd/contract.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@

import os
import sys
from time import sleep
from abc import abstractmethod, ABC as AbstractClass
from typing import Dict, List, Union
from argparse import ArgumentParser
from subprocess import check_call, check_output, Popen, DEVNULL, PIPE as SUBPROCESS_PIPE, CalledProcessError
from queue import Queue
from threading import Thread
from .inputoutput import IO
from .exception import UndefinedEnvironmentVariableUsageError

Expand Down Expand Up @@ -211,16 +214,38 @@ def sh(self, cmd: str, capture: bool = False, verbose: bool = False, strict: boo
os.close(write)

if not capture:
process = Popen('bash', shell=True, stdin=read, stdout=SUBPROCESS_PIPE, stderr=SUBPROCESS_PIPE)
process = Popen('bash', shell=True, stdin=read, stdout=SUBPROCESS_PIPE, stderr=SUBPROCESS_PIPE, bufsize=1,
close_fds='posix' in sys.builtin_module_names)

out_queue = Queue()
stdout_thread = Thread(target=self._enqueue_output, args=(process.stdout, out_queue))
stdout_thread.daemon = True
stdout_thread.start()

err_queue = Queue()
stderr_thread = Thread(target=self._enqueue_output, args=(process.stderr, err_queue))
stderr_thread.daemon = True
stderr_thread.start()

stderr = ''

# subprocess is having issues with giving stdout and stderr streams directory as arguments
# that's why the streams are copied there
def flush():
if not out_queue.empty():
out_line = out_queue.get(timeout=.1)
sys.stdout.write(out_line.decode('utf-8'))

if not err_queue.empty():
err_line = err_queue.get(timeout=.1)
stderr = err_line.decode('utf-8')
sys.stderr.write(stderr)

while process.poll() is None:
sys.stdout.write(process.stdout.read().decode('utf-8'))
stderr = process.stderr.read().decode('utf-8')
sys.stderr.write(stderr)
flush()
sleep(0.01) # important: to not dry the CPU (no sleep = full cpu usage at one core)

flush()
exit_code = process.wait()

if exit_code > 0:
Expand All @@ -230,6 +255,12 @@ def sh(self, cmd: str, capture: bool = False, verbose: bool = False, strict: boo

return check_output('bash', shell=True, stdin=read).decode('utf-8')

@staticmethod
def _enqueue_output(out, queue: Queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()

def exec(self, cmd: str, capture: bool = False, background: bool = False) -> Union[str, None]:
""" Starts a process in shell. Throws exception on error.
To capture output set capture=True
Expand Down

0 comments on commit 331c851

Please sign in to comment.