Skip to content

Commit

Permalink
Windows compatibility fixes
Browse files Browse the repository at this point in the history
- the close_fd's handling was buggy
- on python 3.4+ we always overrode _communicate, but that doesn't work
  on windows. As a bonus output_callback now works on windows
  • Loading branch information
seveas committed Dec 18, 2015
1 parent 7c3c37b commit 03406c8
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 89 deletions.
4 changes: 2 additions & 2 deletions docs/conf.py
Expand Up @@ -3,8 +3,8 @@
master_doc = 'index'
project = u'whelk'
copyright = u'2010-2015, Dennis Kaarsemaker'
version = '2.5'
release = '2.5.1'
version = '2.6'
release = '2.6'
pygments_style = 'sphinx'
html_theme = 'cloud'
html_theme_options = {
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -3,7 +3,7 @@
from distutils.core import setup

setup(name = "whelk",
version = "2.5.1",
version = "2.6",
author = "Dennis Kaarsemaker",
author_email = "dennis@kaarsemaker.net",
url = "http://github.com/seveas/whelk",
Expand Down
2 changes: 1 addition & 1 deletion whelk/__init__.py
Expand Up @@ -139,7 +139,7 @@ def __call__(self, *args, **kwargs):
kwargs[stream] = PIPE

# close_fds is not supported under windows when redirecting stdin/out/err
if sys.platform != 'win32' or (kwargs['stdin'], kwargs['stdout'], kwargs['stderr']).count(None) == 3:
if sys.platform != 'win32' or (kwargs.get('stdin'), kwargs.get('stdout', None), kwargs.get('stderr', None)).count(None) == 3:
kwargs['close_fds'] = True

self.input = kwargs.pop('input','')
Expand Down
185 changes: 100 additions & 85 deletions whelk/subprocess_34.py
Expand Up @@ -13,8 +13,9 @@
import sys
import errno


class Popen(subprocess.Popen):
output_callback_supported = sys.platform != 'mswindows'
output_callback_supported = True
def communicate(self, input=None, timeout=None):
if self._communication_started and input:
raise ValueError("Cannot send input after starting communication")
Expand All @@ -34,97 +35,111 @@ def communicate(self, input=None, timeout=None):
sts = self.wait(timeout=self._remaining_time(endtime))
return (stdout, stderr)

def _communicate(self, input, endtime, orig_timeout):
if self.stdin and not self._communication_started:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
self.stdin.flush()
if not input:
self.stdin.close()

stdout = None
stderr = None

# Only create this mapping if we haven't already.
if not self._communication_started:
self._fileobj2output = {}
if self.stdout:
self._fileobj2output[self.stdout] = []
if self.stderr:
self._fileobj2output[self.stderr] = []

if self.stdout:
stdout = self._fileobj2output[self.stdout]
if self.stderr:
stderr = self._fileobj2output[self.stderr]

self._save_input(input)

if self._input:
input_view = memoryview(self._input)
if sys.platform == 'win32':
def _readerthread(self, fh, buffer):
bufferx = []
while True:
data = fh.read(100)
if not data:
break
if self.output_callback:
self.output_callback[0](self.shell, self, fh, data, *self.output_callback[1:])
bufferx.append(data)
fh.close()
buffer.append(b''.join(bufferx))

else:
def _communicate(self, input, endtime, orig_timeout):
if self.stdin and not self._communication_started:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
self.stdin.flush()
if not input:
self.stdin.close()

stdout = None
stderr = None

# Only create this mapping if we haven't already.
if not self._communication_started:
self._fileobj2output = {}
if self.stdout:
self._fileobj2output[self.stdout] = []
if self.stderr:
self._fileobj2output[self.stderr] = []

with subprocess._PopenSelector() as selector:
if self.stdin and input:
selector.register(self.stdin, selectors.EVENT_WRITE)
if self.stdout:
selector.register(self.stdout, selectors.EVENT_READ)
stdout = self._fileobj2output[self.stdout]
if self.stderr:
selector.register(self.stderr, selectors.EVENT_READ)

while selector.get_map():
timeout = self._remaining_time(endtime)
if timeout is not None and timeout < 0:
raise TimeoutExpired(self.args, orig_timeout)

ready = selector.select(timeout)
self._check_timeout(endtime, orig_timeout)

# XXX Rewrite these to use non-blocking I/O on the file
# objects; they are no longer using C stdio!

for key, events in ready:
if key.fileobj is self.stdin:
chunk = input_view[self._input_offset :
self._input_offset + subprocess._PIPE_BUF]
try:
self._input_offset += os.write(key.fd, chunk)
except OSError as e:
if e.errno == errno.EPIPE:
selector.unregister(key.fileobj)
key.fileobj.close()
stderr = self._fileobj2output[self.stderr]

self._save_input(input)

if self._input:
input_view = memoryview(self._input)

with subprocess._PopenSelector() as selector:
if self.stdin and input:
selector.register(self.stdin, selectors.EVENT_WRITE)
if self.stdout:
selector.register(self.stdout, selectors.EVENT_READ)
if self.stderr:
selector.register(self.stderr, selectors.EVENT_READ)

while selector.get_map():
timeout = self._remaining_time(endtime)
if timeout is not None and timeout < 0:
raise TimeoutExpired(self.args, orig_timeout)

ready = selector.select(timeout)
self._check_timeout(endtime, orig_timeout)

# XXX Rewrite these to use non-blocking I/O on the file
# objects; they are no longer using C stdio!

for key, events in ready:
if key.fileobj is self.stdin:
chunk = input_view[self._input_offset :
self._input_offset + subprocess._PIPE_BUF]
try:
self._input_offset += os.write(key.fd, chunk)
except OSError as e:
if e.errno == errno.EPIPE:
selector.unregister(key.fileobj)
key.fileobj.close()
else:
raise
else:
raise
else:
if self._input_offset >= len(self._input):
if self._input_offset >= len(self._input):
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
data = os.read(key.fd, 32768)
if self.output_callback:
self.output_callback[0](self.shell, self, key.fd, data, *self.output_callback[1:])
if not data:
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
data = os.read(key.fd, 32768)
if self.output_callback:
self.output_callback[0](self.shell, self, key.fd, data, *self.output_callback[1:])
if not data:
selector.unregister(key.fileobj)
key.fileobj.close()
if self.output_callback:
self.output_callback[0](self.shell, self, key.fd, None, *self.output_callback[1:])
self._fileobj2output[key.fileobj].append(data)
if self.output_callback:
self.output_callback[0](self.shell, self, key.fd, None, *self.output_callback[1:])
self._fileobj2output[key.fileobj].append(data)

self.wait(timeout=self._remaining_time(endtime))
self.wait(timeout=self._remaining_time(endtime))

# All data exchanged. Translate lists into strings.
if stdout is not None:
stdout = b''.join(stdout)
if stderr is not None:
stderr = b''.join(stderr)

# Translate newlines, if requested.
# This also turns bytes into strings.
if self.universal_newlines:
# All data exchanged. Translate lists into strings.
if stdout is not None:
stdout = self._translate_newlines(stdout,
self.stdout.encoding)
stdout = b''.join(stdout)
if stderr is not None:
stderr = self._translate_newlines(stderr,
self.stderr.encoding)

return (stdout, stderr)
stderr = b''.join(stderr)

# Translate newlines, if requested.
# This also turns bytes into strings.
if self.universal_newlines:
if stdout is not None:
stdout = self._translate_newlines(stdout,
self.stdout.encoding)
if stderr is not None:
stderr = self._translate_newlines(stderr,
self.stderr.encoding)

return (stdout, stderr)

0 comments on commit 03406c8

Please sign in to comment.