Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix support for win32 in iter_lines #500

Merged
merged 1 commit into from Mar 7, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 57 additions & 2 deletions plumbum/commands/processes.py
@@ -1,7 +1,6 @@
import time
import atexit
import heapq
from subprocess import Popen
from threading import Thread
from plumbum.lib import IS_WIN32, six

Expand All @@ -24,7 +23,7 @@ def _check_process(proc, retcode, timeout, stdout, stderr):
return proc.returncode, stdout, stderr


def _iter_lines(proc, decode, linesize, line_timeout=None):
def _iter_lines_posix(proc, decode, linesize, line_timeout=None):
try:
from selectors import DefaultSelector, EVENT_READ
except ImportError:
Expand Down Expand Up @@ -62,6 +61,62 @@ def selector():
yield 1, decode(line)


def _iter_lines_win32(proc, decode, linesize, line_timeout=None):

class Piper(Thread):

def __init__(self, fd, pipe):
super().__init__(name="PlumbumPiper%sThread" % fd)
self.pipe = pipe
self.fd = fd
self.empty = False
self.daemon = True
super().start()

def read_from_pipe(self):
return self.pipe.readline(linesize)

def run(self):
for line in iter(self.read_from_pipe, b''):
queue.put((self.fd, decode(line)))
# self.pipe.close()

if line_timeout is None:
line_timeout = float("inf")
queue = Queue()
pipers = [Piper(0, proc.stdout), Piper(1, proc.stderr)]
last_line_ts = time.time()
empty = True
while True:
try:
yield queue.get_nowait()
last_line_ts = time.time()
empty = False
except QueueEmpty:
empty = True
if time.time() - last_line_ts > line_timeout:
raise ProcessLineTimedOut("popen line timeout expired", getattr(proc, "argv", None), getattr(proc, "machine", None))
if proc.poll() is not None:
break
if empty:
time.sleep(0.1)

for piper in pipers:
piper.join()

while True:
try:
yield queue.get_nowait()
except QueueEmpty:
break


if IS_WIN32:
_iter_lines = _iter_lines_win32
else:
_iter_lines = _iter_lines_posix


#===================================================================================================
# Exceptions
#===================================================================================================
Expand Down