Permalink
Fetching contributors…
Cannot retrieve contributors at this time
1198 lines (870 sloc) 30.6 KB
# -*- coding: utf8 -*-
import os
import unittest
import tempfile
import sys
import sh
import platform
IS_OSX = platform.system() == "Darwin"
IS_PY3 = sys.version_info[0] == 3
if IS_PY3:
unicode = str
python = sh.Command(sh.which("python%d.%d" % sys.version_info[:2]))
else:
from sh import python
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
skipUnless = getattr(unittest, "skipUnless", None)
if not skipUnless:
def skipUnless(*args, **kwargs):
def wrapper(thing): return thing
return wrapper
requires_posix = skipUnless(os.name == "posix", "Requires POSIX")
def create_tmp_test(code):
py = tempfile.NamedTemporaryFile()
if IS_PY3: code = bytes(code, "UTF-8")
py.write(code)
py.flush()
# we don't explicitly close, because close will remove the file, and we
# don't want that until the test case is done. so we let the gc close it
# when it goes out of scope
return py
@requires_posix
class Basic(unittest.TestCase):
def test_print_command(self):
from sh import ls, which
actual_location = which("ls")
out = str(ls)
self.assertEqual(out, actual_location)
def test_unicode_arg(self):
from sh import echo
test = "漢字"
if not IS_PY3: test = test.decode("utf8")
p = echo(test).strip()
self.assertEqual(test, p)
def test_number_arg(self):
py = create_tmp_test("""
from optparse import OptionParser
parser = OptionParser()
options, args = parser.parse_args()
print(args[0])
""")
out = python(py.name, 3).strip()
self.assertEqual(out, "3")
def test_exit_code(self):
from sh import ls, ErrorReturnCode
self.assertEqual(ls("/").exit_code, 0)
self.assertRaises(ErrorReturnCode, ls, "/aofwje/garogjao4a/eoan3on")
def test_glob_warning(self):
from sh import ls
from glob import glob
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
ls(glob("ofjaoweijfaowe"))
self.assertTrue(len(w) == 1)
self.assertTrue(issubclass(w[-1].category, UserWarning))
self.assertTrue("glob" in str(w[-1].message))
def test_stdin_from_string(self):
from sh import sed
self.assertEqual(sed(_in="test", e="s/test/lol/").strip(), "lol")
def test_ok_code(self):
from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2
exc_to_test = ErrorReturnCode_2
code_to_pass = 2
if IS_OSX:
exc_to_test = ErrorReturnCode_1
code_to_pass = 1
self.assertRaises(exc_to_test, ls, "/aofwje/garogjao4a/eoan3on")
ls("/aofwje/garogjao4a/eoan3on", _ok_code=code_to_pass)
ls("/aofwje/garogjao4a/eoan3on", _ok_code=[code_to_pass])
def test_quote_escaping(self):
py = create_tmp_test("""
from optparse import OptionParser
parser = OptionParser()
options, args = parser.parse_args()
print(args)
""")
out = python(py.name, "one two three").strip()
self.assertEqual(out, "['one two three']")
out = python(py.name, "one \"two three").strip()
self.assertEqual(out, "['one \"two three']")
out = python(py.name, "one", "two three").strip()
self.assertEqual(out, "['one', 'two three']")
out = python(py.name, "one", "two \"haha\" three").strip()
self.assertEqual(out, "['one', 'two \"haha\" three']")
out = python(py.name, "one two's three").strip()
self.assertEqual(out, "[\"one two's three\"]")
out = python(py.name, 'one two\'s three').strip()
self.assertEqual(out, "[\"one two's three\"]")
def test_multiple_pipes(self):
from sh import tr, python
import time
py = create_tmp_test("""
import sys
import os
import time
for l in "andrew":
print(l)
time.sleep(.2)
""")
class Derp(object):
def __init__(self):
self.times = []
self.stdout = []
self.last_received = None
def agg(self, line):
self.stdout.append(line.strip())
now = time.time()
if self.last_received: self.times.append(now - self.last_received)
self.last_received = now
derp = Derp()
p = tr(
tr(
tr(
python(py.name, _piped=True),
"aw", "wa", _piped=True),
"ne", "en", _piped=True),
"dr", "rd", _out=derp.agg)
p.wait()
self.assertEqual("".join(derp.stdout), "werdna")
self.assertTrue(all([t > .15 for t in derp.times]))
def test_manual_stdin_string(self):
from sh import tr
out = tr("[:lower:]", "[:upper:]", _in="andrew").strip()
self.assertEqual(out, "ANDREW")
def test_manual_stdin_iterable(self):
from sh import tr
test = ["testing\n", "herp\n", "derp\n"]
out = tr("[:lower:]", "[:upper:]", _in=test)
match = "".join([t.upper() for t in test])
self.assertEqual(out, match)
def test_manual_stdin_file(self):
from sh import tr
import tempfile
test_string = "testing\nherp\nderp\n"
stdin = tempfile.NamedTemporaryFile()
stdin.write(test_string.encode())
stdin.flush()
stdin.seek(0)
out = tr("[:lower:]", "[:upper:]", _in=stdin)
self.assertEqual(out, test_string.upper())
def test_manual_stdin_queue(self):
from sh import tr
try: from Queue import Queue, Empty
except ImportError: from queue import Queue, Empty
test = ["testing\n", "herp\n", "derp\n"]
q = Queue()
for t in test: q.put(t)
q.put(None) # EOF
out = tr("[:lower:]", "[:upper:]", _in=q)
match = "".join([t.upper() for t in test])
self.assertEqual(out, match)
def test_environment(self):
import os
env = {"HERP": "DERP"}
py = create_tmp_test("""
import os
osx_cruft = ["__CF_USER_TEXT_ENCODING", "__PYVENV_LAUNCHER__"]
for key in osx_cruft:
try: del os.environ[key]
except: pass
print(os.environ["HERP"] + " " + str(len(os.environ)))
""")
out = python(py.name, _env=env).strip()
self.assertEqual(out, "DERP 1")
py = create_tmp_test("""
import os, sys
sys.path.insert(0, os.getcwd())
import sh
osx_cruft = ["__CF_USER_TEXT_ENCODING", "__PYVENV_LAUNCHER__"]
for key in osx_cruft:
try: del os.environ[key]
except: pass
print(sh.HERP + " " + str(len(os.environ)))
""")
out = python(py.name, _env=env, _cwd=THIS_DIR).strip()
self.assertEqual(out, "DERP 1")
def test_which(self):
from sh import which, ls
self.assertEqual(which("fjoawjefojawe"), None)
self.assertEqual(which("ls"), str(ls))
def test_foreground(self):
return
raise NotImplementedError
def test_no_arg(self):
import pwd
from sh import whoami
u1 = whoami().strip()
u2 = pwd.getpwuid(os.geteuid())[0]
self.assertEqual(u1, u2)
def test_incompatible_special_args(self):
from sh import ls
self.assertRaises(TypeError, ls, _iter=True, _piped=True)
def test_exception(self):
from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2
exc_to_test = ErrorReturnCode_2
if IS_OSX: exc_to_test = ErrorReturnCode_1
self.assertRaises(exc_to_test, ls, "/aofwje/garogjao4a/eoan3on")
def test_command_not_found(self):
from sh import CommandNotFound
def do_import(): from sh import aowjgoawjoeijaowjellll
self.assertRaises(CommandNotFound, do_import)
def test_command_wrapper_equivalence(self):
from sh import Command, ls, which
self.assertEqual(Command(which("ls")), ls)
def test_multiple_args_short_option(self):
py = create_tmp_test("""
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-l", dest="long_option")
options, args = parser.parse_args()
print(len(options.long_option.split()))
""")
num_args = int(python(py.name, l="one two three"))
self.assertEqual(num_args, 3)
num_args = int(python(py.name, "-l", "one's two's three's"))
self.assertEqual(num_args, 3)
def test_multiple_args_long_option(self):
py = create_tmp_test("""
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-l", "--long-option", dest="long_option")
options, args = parser.parse_args()
print(len(options.long_option.split()))
""")
num_args = int(python(py.name, long_option="one two three"))
self.assertEqual(num_args, 3)
num_args = int(python(py.name, "--long-option", "one's two's three's"))
self.assertEqual(num_args, 3)
def test_short_bool_option(self):
py = create_tmp_test("""
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-s", action="store_true", default=False, dest="short_option")
options, args = parser.parse_args()
print(options.short_option)
""")
self.assertTrue(python(py.name, s=True).strip() == "True")
self.assertTrue(python(py.name, s=False).strip() == "False")
self.assertTrue(python(py.name).strip() == "False")
def test_long_bool_option(self):
py = create_tmp_test("""
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-l", "--long-option", action="store_true", default=False, dest="long_option")
options, args = parser.parse_args()
print(options.long_option)
""")
self.assertTrue(python(py.name, long_option=True).strip() == "True")
self.assertTrue(python(py.name).strip() == "False")
def test_composition(self):
from sh import ls, wc
c1 = int(wc(ls("-A1"), l=True))
c2 = len(os.listdir("."))
self.assertEqual(c1, c2)
def test_incremental_composition(self):
from sh import ls, wc
c1 = int(wc(ls("-A1", _piped=True), l=True).strip())
c2 = len(os.listdir("."))
if c1 != c2:
with open("/tmp/fail", "a") as h: h.write("FUCK\n")
self.assertEqual(c1, c2)
def test_short_option(self):
from sh import sh
s1 = sh(c="echo test").strip()
s2 = "test"
self.assertEqual(s1, s2)
def test_long_option(self):
py = create_tmp_test("""
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-l", "--long-option", action="store", default="", dest="long_option")
options, args = parser.parse_args()
print(options.long_option.upper())
""")
self.assertTrue(python(py.name, long_option="testing").strip() == "TESTING")
self.assertTrue(python(py.name).strip() == "")
def test_command_wrapper(self):
from sh import Command, which
ls = Command(which("ls"))
wc = Command(which("wc"))
c1 = int(wc(ls("-A1"), l=True))
c2 = len(os.listdir("."))
self.assertEqual(c1, c2)
def test_background(self):
from sh import sleep
import time
start = time.time()
sleep_time = .5
p = sleep(sleep_time, _bg=True)
now = time.time()
self.assertTrue(now - start < sleep_time)
p.wait()
now = time.time()
self.assertTrue(now - start > sleep_time)
def test_background_exception(self):
from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2
p = ls("/ofawjeofj", _bg=True) # should not raise
exc_to_test = ErrorReturnCode_2
if IS_OSX: exc_to_test = ErrorReturnCode_1
self.assertRaises(exc_to_test, p.wait) # should raise
def test_with_context(self):
from sh import whoami
import getpass
py = create_tmp_test("""
import sys
import os
import subprocess
print("with_context")
subprocess.Popen(sys.argv[1:], shell=False).wait()
""")
cmd1 = python.bake(py.name, _with=True)
with cmd1:
out = whoami()
self.assertTrue("with_context" in out)
self.assertTrue(getpass.getuser() in out)
def test_with_context_args(self):
from sh import whoami
import getpass
py = create_tmp_test("""
import sys
import os
import subprocess
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-o", "--opt", action="store_true", default=False, dest="opt")
options, args = parser.parse_args()
if options.opt:
subprocess.Popen(args[0], shell=False).wait()
""")
with python(py.name, opt=True, _with=True):
out = whoami()
self.assertTrue(getpass.getuser() == out.strip())
with python(py.name, _with=True):
out = whoami()
self.assertTrue(out == "")
def test_err_to_out(self):
py = create_tmp_test("""
import sys
import os
sys.stdout.write("stdout")
sys.stdout.flush()
sys.stderr.write("stderr")
sys.stderr.flush()
""")
stdout = python(py.name, _err_to_out=True)
self.assertTrue(stdout == "stdoutstderr")
def test_out_redirection(self):
import tempfile
from sh import ls
file_obj = tempfile.TemporaryFile()
out = ls(_out=file_obj)
self.assertTrue(len(out) != 0)
file_obj.seek(0)
actual_out = file_obj.read()
file_obj.close()
self.assertTrue(len(actual_out) != 0)
def test_err_redirection(self):
import tempfile
file_obj = tempfile.TemporaryFile()
py = create_tmp_test("""
import sys
import os
sys.stdout.write("stdout")
sys.stderr.write("stderr")
""")
stdout = python(py.name, _err=file_obj, u=True).wait()
file_obj.seek(0)
stderr = file_obj.read().decode()
file_obj.close()
self.assertTrue(stdout == "stdout")
self.assertTrue(stderr == "stderr")
def test_err_redirection_actual_file(self):
import tempfile
file_obj = tempfile.NamedTemporaryFile()
py = create_tmp_test("""
import sys
import os
sys.stdout.write("stdout")
sys.stderr.write("stderr")
""")
stdout = python(py.name, _err=file_obj.name, u=True).wait()
file_obj.seek(0)
stderr = file_obj.read().decode()
file_obj.close()
self.assertTrue(stdout == "stdout")
self.assertTrue(stderr == "stderr")
def test_subcommand_and_bake(self):
from sh import ls
import getpass
py = create_tmp_test("""
import sys
import os
import subprocess
print("subcommand")
subprocess.Popen(sys.argv[1:], shell=False).wait()
""")
cmd1 = python.bake(py.name)
out = cmd1.whoami()
self.assertTrue("subcommand" in out)
self.assertTrue(getpass.getuser() in out)
def test_multiple_bakes(self):
from sh import whoami
import getpass
py = create_tmp_test("""
import sys
import subprocess
subprocess.Popen(sys.argv[1:], shell=False).wait()
""")
out = python.bake(py.name).bake("whoami")()
self.assertTrue(getpass.getuser() == out.strip())
def test_bake_args_come_first(self):
from sh import ls
ls = ls.bake(h=True)
ran = ls("-la").ran
ft = ran.index("-h")
self.assertTrue("-la" in ran[ft:])
def test_output_equivalence(self):
from sh import whoami
iam1 = whoami()
iam2 = whoami()
self.assertEqual(iam1, iam2)
def test_stdout_callback(self):
py = create_tmp_test("""
import sys
import os
for i in range(5): print(i)
""")
stdout = []
def agg(line):
stdout.append(line)
p = python(py.name, _out=agg, u=True)
p.wait()
self.assertTrue(len(stdout) == 5)
def test_stdout_callback_no_wait(self):
import time
py = create_tmp_test("""
import sys
import os
import time
for i in range(5):
print(i)
time.sleep(.5)
""")
stdout = []
def agg(line): stdout.append(line)
p = python(py.name, _out=agg, u=True)
# we give a little pause to make sure that the NamedTemporaryFile
# exists when the python process actually starts
time.sleep(.5)
self.assertTrue(len(stdout) != 5)
def test_stdout_callback_line_buffered(self):
py = create_tmp_test("""
import sys
import os
for i in range(5): print("herpderp")
""")
stdout = []
def agg(line): stdout.append(line)
p = python(py.name, _out=agg, _out_bufsize=1, u=True)
p.wait()
self.assertTrue(len(stdout) == 5)
def test_stdout_callback_line_unbuffered(self):
py = create_tmp_test("""
import sys
import os
for i in range(5): print("herpderp")
""")
stdout = []
def agg(char): stdout.append(char)
p = python(py.name, _out=agg, _out_bufsize=0, u=True)
p.wait()
# + 5 newlines
self.assertTrue(len(stdout) == (len("herpderp") * 5 + 5))
def test_stdout_callback_buffered(self):
py = create_tmp_test("""
import sys
import os
for i in range(5): sys.stdout.write("herpderp")
""")
stdout = []
def agg(chunk): stdout.append(chunk)
p = python(py.name, _out=agg, _out_bufsize=4, u=True)
p.wait()
self.assertTrue(len(stdout) == (len("herp")/2 * 5))
def test_stdout_callback_with_input(self):
py = create_tmp_test("""
import sys
import os
IS_PY3 = sys.version_info[0] == 3
if IS_PY3: raw_input = input
for i in range(5): print(str(i))
derp = raw_input("herp? ")
print(derp)
""")
def agg(line, stdin):
if line.strip() == "4": stdin.put("derp\n")
p = python(py.name, _out=agg, u=True)
p.wait()
self.assertTrue("derp" in p)
def test_stdout_callback_exit(self):
py = create_tmp_test("""
import sys
import os
for i in range(5): print(i)
""")
stdout = []
def agg(line):
line = line.strip()
stdout.append(line)
if line == "2": return True
p = python(py.name, _out=agg, u=True)
p.wait()
self.assertTrue("4" in p)
self.assertTrue("4" not in stdout)
def test_stdout_callback_terminate(self):
import signal
py = create_tmp_test("""
import sys
import os
import time
for i in range(5):
print(i)
time.sleep(.5)
""")
stdout = []
def agg(line, stdin, process):
line = line.strip()
stdout.append(line)
if line == "3":
process.terminate()
return True
p = python(py.name, _out=agg, u=True)
p.wait()
self.assertEqual(p.process.exit_code, -signal.SIGTERM)
self.assertTrue("4" not in p)
self.assertTrue("4" not in stdout)
def test_stdout_callback_kill(self):
import signal
py = create_tmp_test("""
import sys
import os
import time
for i in range(5):
print(i)
time.sleep(.5)
""")
stdout = []
def agg(line, stdin, process):
line = line.strip()
stdout.append(line)
if line == "3":
process.kill()
return True
p = python(py.name, _out=agg, u=True)
p.wait()
self.assertEqual(p.process.exit_code, -signal.SIGKILL)
self.assertTrue("4" not in p)
self.assertTrue("4" not in stdout)
def test_general_signal(self):
import signal
from signal import SIGINT
py = create_tmp_test("""
import sys
import os
import time
import signal
def sig_handler(sig, frame):
print(10)
exit(0)
signal.signal(signal.SIGINT, sig_handler)
for i in range(5):
print(i)
sys.stdout.flush()
time.sleep(0.5)
""")
stdout = []
def agg(line, stdin, process):
line = line.strip()
stdout.append(line)
if line == "3":
process.signal(SIGINT)
return True
p = python(py.name, _out=agg)
p.wait()
self.assertEqual(p.process.exit_code, 0)
self.assertEqual(p, "0\n1\n2\n3\n10\n")
def test_iter_generator(self):
py = create_tmp_test("""
import sys
import os
import time
for i in range(42):
print(i)
sys.stdout.flush()
""")
out = []
for line in python(py.name, _iter=True):
out.append(int(line.strip()))
self.assertTrue(len(out) == 42 and sum(out) == 861)
def test_nonblocking_iter(self):
import tempfile
from sh import tail
from errno import EWOULDBLOCK
tmp = tempfile.NamedTemporaryFile()
for line in tail("-f", tmp.name, _iter_noblock=True): break
self.assertEqual(line, EWOULDBLOCK)
def test_for_generator_to_err(self):
py = create_tmp_test("""
import sys
import os
for i in range(42):
sys.stderr.write(str(i)+"\\n")
""")
out = []
for line in python(py.name, _iter="err", u=True): out.append(line)
self.assertTrue(len(out) == 42)
# verify that nothing is going to stdout
out = []
for line in python(py.name, _iter="out", u=True): out.append(line)
self.assertTrue(len(out) == 0)
def test_piped_generator(self):
from sh import tr
from string import ascii_uppercase
import time
py1 = create_tmp_test("""
import sys
import os
import time
for letter in "andrew":
time.sleep(0.5)
print(letter)
""")
py2 = create_tmp_test("""
import sys
import os
import time
while True:
line = sys.stdin.readline()
if not line: break
print(line.strip().upper())
""")
times = []
last_received = None
letters = ""
for line in python(python(py1.name, _piped="out", u=True), py2.name, _iter=True, u=True):
if not letters: start = time.time()
letters += line.strip()
now = time.time()
if last_received: times.append(now - last_received)
last_received = now
self.assertEqual("ANDREW", letters)
self.assertTrue(all([t > .3 for t in times]))
def test_generator_and_callback(self):
py = create_tmp_test("""
import sys
import os
for i in range(42):
sys.stderr.write(str(i * 2)+"\\n")
print(i)
""")
stderr = []
def agg(line): stderr.append(int(line.strip()))
out = []
for line in python(py.name, _iter=True, _err=agg, u=True): out.append(line)
self.assertTrue(len(out) == 42)
self.assertTrue(sum(stderr) == 1722)
def test_bg_to_int(self):
from sh import echo
# bugs with background might cause the following error:
# ValueError: invalid literal for int() with base 10: ''
self.assertEqual(int(echo("123", _bg=True)), 123)
def test_cwd(self):
from sh import pwd
from os.path import realpath
self.assertEqual(str(pwd(_cwd="/tmp")), realpath("/tmp")+"\n")
self.assertEqual(str(pwd(_cwd="/etc")), realpath("/etc")+"\n")
def test_huge_piped_data(self):
from sh import tr
stdin = tempfile.NamedTemporaryFile()
data = "herpderp" * 4000 + "\n"
stdin.write(data.encode())
stdin.flush()
stdin.seek(0)
out = tr(tr("[:lower:]", "[:upper:]", _in=data), "[:upper:]", "[:lower:]")
self.assertTrue(out == data)
def test_tty_input(self):
py = create_tmp_test("""
import sys
import os
if os.isatty(sys.stdin.fileno()):
sys.stdout.write("password?\\n")
sys.stdout.flush()
pw = sys.stdin.readline().strip()
sys.stdout.write("%s\\n" % ("*" * len(pw)))
sys.stdout.flush()
else:
sys.stdout.write("no tty attached!\\n")
sys.stdout.flush()
""")
test_pw = "test123"
expected_stars = "*" * len(test_pw)
d = {}
def password_enterer(line, stdin):
line = line.strip()
if not line: return
if line == "password?":
stdin.put(test_pw+"\n")
elif line.startswith("*"):
d["stars"] = line
return True
pw_stars = python(py.name, _tty_in=True, _out=password_enterer)
pw_stars.wait()
self.assertEqual(d["stars"], expected_stars)
response = python(py.name)
self.assertEqual(response, "no tty attached!\n")
def test_stringio_output(self):
from sh import echo
if IS_PY3:
from io import StringIO
from io import BytesIO as cStringIO
else:
from StringIO import StringIO
from cStringIO import StringIO as cStringIO
out = StringIO()
echo("-n", "testing 123", _out=out)
self.assertEqual(out.getvalue(), "testing 123")
out = cStringIO()
echo("-n", "testing 123", _out=out)
self.assertEqual(out.getvalue().decode(), "testing 123")
def test_stringio_input(self):
from sh import cat
if IS_PY3:
from io import StringIO
from io import BytesIO as cStringIO
else:
from StringIO import StringIO
from cStringIO import StringIO as cStringIO
input = StringIO()
input.write("herpderp")
input.seek(0)
out = cat(_in=input)
self.assertEqual(out, "herpderp")
def test_internal_bufsize(self):
from sh import cat
output = cat(_in="a"*1000, _internal_bufsize=100, _out_bufsize=0)
self.assertEqual(len(output), 100)
output = cat(_in="a"*1000, _internal_bufsize=50, _out_bufsize=2)
self.assertEqual(len(output), 100)
def test_change_stdout_buffering(self):
py = create_tmp_test("""
import sys
import os
# this proves that we won't get the output into our callback until we send
# a newline
sys.stdout.write("switch ")
sys.stdout.flush()
sys.stdout.write("buffering\\n")
sys.stdout.flush()
sys.stdin.read(1)
sys.stdout.write("unbuffered")
sys.stdout.flush()
# this is to keep the output from being flushed by the process ending, which
# would ruin our test. we want to make sure we get the string "unbuffered"
# before the process ends, without writing a newline
sys.stdin.read(1)
""")
d = {"success": False}
def interact(line, stdin, process):
line = line.strip()
if not line: return
if line == "switch buffering":
process.out_bufsize(0)
stdin.put("a")
elif line == "unbuffered":
stdin.put("b")
d["success"] = True
return True
# start with line buffered stdout
pw_stars = python(py.name, _out=interact, _out_bufsize=1, u=True)
pw_stars.wait()
self.assertTrue(d["success"])
def test_encoding(self):
return
raise NotImplementedError("what's the best way to test a different \
'_encoding' special keyword argument?")
def test_timeout(self):
from sh import sleep
from time import time
# check that a normal sleep is more or less how long the whole process
# takes
sleep_for = 3
started = time()
sh.sleep(sleep_for).wait()
elapsed = time() - started
self.assertTrue(abs(elapsed - sleep_for) < 0.1)
# now make sure that killing early makes the process take less time
sleep_for = 3
timeout = 1
started = time()
sh.sleep(sleep_for, _timeout=timeout).wait()
elapsed = time() - started
self.assertTrue(abs(elapsed - timeout) < 0.1)
def test_binary_pipe(self):
binary = b'\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9'
py1 = create_tmp_test("""
import sys
import os
sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0)
sys.stdout.write(%r)
""" % binary)
py2 = create_tmp_test("""
import sys
import os
sys.stdin = os.fdopen(sys.stdin.fileno(), "rb", 0)
sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0)
sys.stdout.write(sys.stdin.read())
""")
out = python(python(py1.name), py2.name)
self.assertEqual(out.stdout, binary)
def test_auto_change_buffering(self):
binary = b'\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9'
py1 = create_tmp_test("""
import sys
import os
import time
sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0)
sys.stdout.write(b"testing")
sys.stdout.flush()
# to ensure that sh's select loop picks up the write before we write again
time.sleep(0.5)
sys.stdout.write(b"again\\n")
sys.stdout.flush()
time.sleep(0.5)
sys.stdout.write(%r)
sys.stdout.flush()
""" % binary)
out = python(py1.name, _out_bufsize=1)
self.assertTrue(out.stdout == b'testingagain\n\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9')
# designed to trigger the "... (%d more, please see e.stdout)" output
# of the ErrorReturnCode class
def test_failure_with_large_output(self):
from sh import ErrorReturnCode_1
py = create_tmp_test("""
print("andrewmoffat" * 1000)
exit(1)
""")
self.assertRaises(ErrorReturnCode_1, python, py.name)
# designed to check if the ErrorReturnCode constructor does not raise
# an UnicodeDecodeError
def test_non_ascii_error(self):
from sh import ls, ErrorReturnCode
test = ""
if not IS_PY3: test = test.decode("utf8")
self.assertRaises(ErrorReturnCode, ls, test)
if __name__ == "__main__":
if len(sys.argv) > 1:
unittest.main()
else:
suite = unittest.TestLoader().loadTestsFromTestCase(Basic)
unittest.TextTestRunner(verbosity=2).run(suite)