Skip to content
Browse files


  • Loading branch information...
kennethreitz committed Jan 20, 2017
0 parents commit be4b70e646e6232834e9f9917fdc1adde2156f47
Showing with 546 additions and 0 deletions.
  1. 0 README.rst
  2. +227 −0
  3. +32 −0
  4. +21 −0 pipfile/
  5. +11 −0 pipfile/
  6. +54 −0 pipfile/
  7. +196 −0 pipfile/
  8. +5 −0 requirements.txt
No changes.
@@ -0,0 +1,227 @@
import os
import subprocess
import shlex

from pexpect.popen_spawn import PopenSpawn
import daemon

class Command(object):
def __init__(self, cmd):
super(Command, self).__init__()
self.cmd = cmd
self.subprocess = None
self.blocking = None
self.was_run = False
self.__out = None

def __repr__(self):
return '<Commmand {!r}>'.format(self.cmd)

def _popen_args(self):
return self.cmd

def _default_popen_kwargs(self):
return {
'env': os.environ.copy(),
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE,
'shell': True,
'universal_newlines': True,
'bufsize': 0,

def _default_pexpect_kwargs(self):
return {
'env': os.environ.copy(),

def _uses_subprocess(self):
return isinstance(self.subprocess, subprocess.Popen)

def _uses_pexpect(self):
return isinstance(self.subprocess, PopenSpawn)

def std_out(self):
return self.subprocess.stdout

def _pexpect_out(self):
result = ''

if self.subprocess.before:
result += self.subprocess.before

if isinstance(self.subprocess.after, str):
result += self.subprocess.after

result +=
return result

def out(self):
"""Std/out output (cached), as well as stderr for non-blocking runs."""
if self.__out:
return self.__out

if self._uses_subprocess:
self.__out =
self.__out = self._pexpect_out

return self.__out

def std_err(self):
return self.subprocess.stderr

def err(self):
if self._uses_subprocess:
return self._pexpect_out

def pid(self):
"""The process' PID."""
# Support for pexpect's functionality.
if hasattr(self.subprocess, 'proc'):
# Standard subprocess method.

def return_code(self):
return self.subprocess.returncode

def std_in(self):
return self.subprocess.stdin

def run(self, block=True):
"""Runs the given command, with or without pexpect functionality enabled."""
self.blocking = block

# Use subprocess.
if self.blocking:
s = subprocess.Popen(self._popen_args, **self._default_popen_kwargs)

# Otherwise, use pexpect.
s = PopenSpawn(self._popen_args, **self._default_pexpect_kwargs)
self.subprocess = s
self.was_run = True

def expect(self, pattern, timeout=-1):
"""Waits on the given pattern to appear in std_out"""

if self.blocking:
raise RuntimeError('expect can only be used on non-blocking commands.')

self.subprocess.expect(pattern=pattern, timeout=timeout)

def send(self, s, end=os.linesep, signal=False):
"""Sends the given string or signal to std_in."""

if self.blocking:
raise RuntimeError('send can only be used on non-blocking commands.')

if not signal:
if self._uses_subprocess:
return self.subprocess.communicate(s + end)
return self.subprocess.send(s + end)

def terminate(self):

def kill(self):

def block(self):
"""Blocks until process is complete."""

def daemonize(self):
"""Daemonizes a non-blocking process."""
if self.blocking:
raise RuntimeError('daemonize can only be used on non-blocking commands.')

with daemon.DaemonContext():

def pipe(self, command):
"""Runs the current command and passes its output to the next
given process.
if not self.was_run:

data = self.out

c = Command(command)
if data:
return c

def _expand_args(command):
"""Parses command strings and returns a Popen-ready list."""

# Prepare arguments.
if isinstance(command, (str, unicode)):
splitter = shlex.shlex(command.encode('utf-8'))
splitter.whitespace = '|'
splitter.whitespace_split = True
command = []

while True:
token = splitter.get_token()
if token:

command = list(map(shlex.split, command))

return command

def chain(command):
commands = _expand_args(command)
data = None

for command in commands:

c = run(command, block=False)

if data:

data = c.out

return c

def run(command, block=True):
c = Command(command)

if block:

return c
@@ -0,0 +1,32 @@
import delegator
import click
import crayons

def ensure_latest_pip():

# Ensure that pip is installed.
c ='pip install pip')

# Check if version is out of date.
if 'however' in c.err:
# If version is out of date, update.
print crayons.yellow('Pip is out of date... updating to latest.')
c ='pip install pip --upgrade', block=False)

def ensure_virtualenv():
c ='pip install virtualenv')
print c.out

def main():
# Ensure that pip is installed and up-to-date.

# Ensure that virtualenv is installed.

if __name__ == '__main__':
@@ -0,0 +1,21 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import absolute_import, division, print_function

__all__ = [
"__title__", "__summary__", "__uri__", "__version__", "__author__",
"__email__", "__license__", "__copyright__",

__title__ = "pipfile"
__summary__ = ""
__uri__ = ""

__version__ = "16.0.dev0"

__author__ = "Kenneth Reitz and individual contributors"
__email__ = ""

__license__ = "BSD or Apache License, Version 2.0"
__copyright__ = "Copyright 2017 %s" % __author__
@@ -0,0 +1,11 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import absolute_import, division, print_function

from .__about__ import (
__author__, __copyright__, __email__, __license__, __summary__, __title__,
__uri__, __version__

from .api import load, Pipfile
@@ -0,0 +1,54 @@
import _ctypes
import json
import re
from collections import OrderedDict

def di(obj_id):
# from
""" Reverse of id() function. """
return _ctypes.PyObj_FromPtr(obj_id)

class NoIndent(object):
def __init__(self, value):
self.value = value
def __repr__(self):
if isinstance(self.value, OrderedDict):
return json.dumps(self.value)
if not isinstance(self.value, list):
return repr(self.value)
else: # the sort the representation of any dicts in the list
reps = ('{{{}}}'.format(', '.join(('{!r}:{}'.format(
k, v) for k, v in sorted(v.items()))))
if isinstance(v, dict) else repr(v) for v in self.value)

return '[' + ', '.join(reps) + ']'

class NoIndentEncoder(json.JSONEncoder):
FORMAT_SPEC = "@@{}@@"
regex = re.compile(FORMAT_SPEC.format(r"(\d+)"))

def default(self, obj):
if not isinstance(obj, NoIndent):
return super(NoIndentEncoder, self).default(obj)
return self.FORMAT_SPEC.format(id(obj))

def encode(self, obj):
format_spec = self.FORMAT_SPEC # local var to expedite access
result = super(NoIndentEncoder, self).encode(obj)
for match in self.regex.finditer(result):
id = int(
result = result.replace('"{}"'.format(format_spec.format(id)),
return result

def dumps(obj):
"""Returns specific data in a specific format."""
obj['_meta']['requires'] = [NoIndent(i) for i in obj['_meta']['requires']]
obj['_meta']['sources'] = [NoIndent(i) for i in obj['_meta']['sources']]
obj['default'] = [NoIndent(i) for i in obj['default']]
obj['develop'] = [NoIndent(i) for i in obj['develop']]

return json.dumps(obj, sort_keys=True, cls=NoIndentEncoder, indent=4, separators=(',', ': '))

0 comments on commit be4b70e

Please sign in to comment.
You can’t perform that action at this time.