Skip to content

Commit

Permalink
init package
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed Mar 8, 2016
1 parent 9646cf2 commit 88e28b7
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 0 deletions.
245 changes: 245 additions & 0 deletions duper.py
@@ -0,0 +1,245 @@
"""Capture C-level FD output on pipes
Use `duper.capture` or `duper.redirect_to_sys` as context managers.
"""
from __future__ import print_function

__version__ = '0.0.1'

__all__ = [
'capture',
'redirect_to_sys',
'Duper',
]

from contextlib import contextmanager
import ctypes
from fcntl import fcntl, F_GETFL, F_SETFL
import io
import os
import select
import sys
import threading

libc = ctypes.CDLL(None)

try:
c_stdout_p = ctypes.c_void_p.in_dll(libc, 'stdout')
c_stderr_p = ctypes.c_void_p.in_dll(libc, 'stderr')
except ValueError:
# libc.stdout is has a funny name on OS X
c_stdout_p = ctypes.c_void_p.in_dll(libc, '__stdoutp')
c_stderr_p = ctypes.c_void_p.in_dll(libc, '__stderrp')

STDOUT = 2
PIPE = 3

_default_encoding = getattr(sys.stdin, 'encoding', None) or 'utf8'
if _default_encoding.lower() == 'ascii':
# don't respect ascii
_default_encoding = 'utf8'

class Duper(object):
"""Class for Capturing Process-level FD output via dup2
Typically used via `duper.capture`
"""
flush_interval = 0.2

def __init__(self, stdout=None, stderr=None, encoding=_default_encoding):
"""
Parameters
----------
stdout: stream or None
The stream for forwarding stdout.
stderr = stream or None
The stream for forwarding stderr.
encoding: str or None
The encoding to use, if streams should be interpreted as text.
"""
self._stdout = stdout
if stderr == STDOUT:
self._stderr = self._stdout
else:
self._stderr = stderr
self.encoding = encoding
self._save_fds = {}
self._real_fds = {}
self._out_pipes = {}
self._handlers = {}
self._handlers['stderr'] = self._handle_stderr
self._handlers['stdout'] = self._handle_stdout

def _setup_pipe(self, name):
real_fd = getattr(sys, '__%s__' % name).fileno()
save_fd = os.dup(real_fd)
self._save_fds[name] = save_fd

pipe_out, pipe_in = os.pipe()
os.dup2(pipe_in, real_fd)
os.close(pipe_in)
self._real_fds[name] = real_fd

# make pipe_out non-blocking
flags = fcntl(pipe_out, F_GETFL)
fcntl(pipe_out, F_SETFL, flags|os.O_NONBLOCK)
return pipe_out

def _decode(self, data):
"""Decode data, if any
Called before pasing to stdout/stderr streams
"""
if self.encoding:
data = data.decode(self.encoding, 'replace')
return data

def _handle_stdout(self, data):
if self._stdout:
self._stdout.write(self._decode(data))

def _handle_stderr(self, data):
if self._stderr:
self._stderr.write(self._decode(data))

def _setup_handle(self):
"""Setup handle for output, if any"""
self.handle = (self._stdout, self._stderr)

def _finish_handle(self):
"""Finish handle, if anything should be done when it's all wrapped up."""
pass

def __enter__(self):
# setup handle
self._setup_handle()

# create pipe for stdout
pipes = []
names = {}
if self._stdout:
pipe = self._setup_pipe('stdout')
pipes.append(pipe)
names[pipe] = 'stdout'
if self._stderr:
pipe = self._setup_pipe('stderr')
pipes.append(pipe)
names[pipe] = 'stderr'

def forwarder():
"""Forward bytes on a pipe to stream messages"""
while True:
# flush libc's buffers before calling select
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)
r, w, x = select.select(pipes, [], [], self.flush_interval)
if not r:
# nothing to read, next iteration will flush and check again
continue
for pipe in r:
name = names[pipe]
data = os.read(pipe, 1024)
if not data:
# pipe closed, stop polling
pipes.remove(pipe)
else:
handler = getattr(self, '_handle_%s' % name)
handler(data)
if not pipes:
# pipes closed, we are done
break
self.thread = threading.Thread(target=forwarder)
self.thread.daemon = True
self.thread.start()

return self.handle

def __exit__(self, exc_type, exc_value, traceback):
# flush the underlying C buffers
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)
# close FDs, signaling output is complete
for real_fd in self._real_fds.values():
os.close(real_fd)
self.thread.join()

# close finished pipes
for pipe_out in self._out_pipes.values():
os.close(pipe_out)

# restore original state
for name, real_fd in self._real_fds.items():
save_fd = self._save_fds[name]
os.dup2(save_fd, real_fd)
os.close(save_fd)
# finalize handle
self._finish_handle()


@contextmanager
def capture(stdout=PIPE, stderr=PIPE, encoding=_default_encoding):
"""Capture C-level stdout/stderr in a context manager.
The return value for the context manager is (stdout, stderr).
Examples
--------
>>> with capture() as (stdout, stderr):
... printf("C-level stdout")
... output = stdout.read()
"""
stdout_pipe = stderr_pipe = False
# setup stdout
if stdout == PIPE:
stdout_r, stdout_w = os.pipe()
stdout_w = os.fdopen(stdout_w, 'wb')
stdout_r = os.fdopen(stdout_r, 'rb')
if encoding:
stdout_r = io.TextIOWrapper(stdout_r, encoding=encoding)
stdout_pipe = True
else:
stdout_r = stdout_w = stdout
# setup stderr
if stderr == STDOUT:
stderr_r = None
stderr_w = stdout_w
elif stderr == PIPE:
stderr_r, stderr_w = os.pipe()
stderr_w = os.fdopen(stderr_w, 'wb')
stderr_r = os.fdopen(stderr_r, 'rb')
if encoding:
stderr_r = io.TextIOWrapper(stderr_r, encoding=encoding)
stderr_pipe = True
else:
stderr_r = stderr_w = stderr
if stdout_pipe or stderr_pipe:
capture_encoding = None
else:
capture_encoding = encoding
duper = Duper(stdout=stdout_w, stderr=stderr_w, encoding=capture_encoding)
try:
with duper:
yield stdout_r, stderr_r
finally:
# close pipes
if stdout_pipe:
stdout_w.close()
if stderr_pipe:
stderr_w.close()


def redirect_to_sys(encoding=_default_encoding):
"""Redirect C-level stdout/stderr to sys.stdout/stderr
This is useful of sys.sdout/stderr are already being forwarded somewhere.
DO NOT USE THIS if sys.stdout and sys.stderr are not already being forwarded.
"""
return capture(sys.stdout, sys.stderr, encoding=encoding)

def redirect_everything_to_sys(encoding=_default_encoding):
"""Redirect all C output to sys.stdout/err
This does *not*
"""
2 changes: 2 additions & 0 deletions setup.cfg
@@ -0,0 +1,2 @@
[bdist_wheel]
universal=1
47 changes: 47 additions & 0 deletions setup.py
@@ -0,0 +1,47 @@
#!/usr/bin/env python
import sys

from distutils.core import setup

long_description = """
Context managers for capturing C-level output::
from duper import capture
with capture() as (stdout, stderr):
call_c_function()
out = stdout.read()
err = stderr.read()
"""

version_ns = {}
with open('duper.py') as f:
for line in f:
if line.startswith('__version__'):
exec(line, version_ns)
setup_args = dict(
name='duper',
version=version_ns['__version__'],
author="Min RK",
author_email="benjaminrk@gmail.com",
description="Capture C-level output in context managers",
long_description=long_description,
url="https://github.com/minrk/duper",
py_modules=['duper.py'],
license="MIT",
cmdclass={},
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
],
)

if 'bdist_wheel' in sys.argv:
from wheel.bdist_wheel import bdist_wheel
setup_args['cmdclass']['bdist_wheel'] = bdist_wheel

if __name__ == '__main__':
setup(**setup_args)

0 comments on commit 88e28b7

Please sign in to comment.