Skip to content

Commit

Permalink
Change Testdir to only use pathlib internally
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoddemus committed Jun 27, 2020
1 parent 7450b6d commit 456857b
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 81 deletions.
160 changes: 85 additions & 75 deletions src/_pytest/pytester.py
Expand Up @@ -5,6 +5,7 @@
import os
import platform
import re
import shutil
import subprocess
import sys
import traceback
Expand All @@ -17,6 +18,7 @@
from typing import List
from typing import Optional
from typing import Sequence
from typing import TextIO
from typing import Tuple
from typing import Union
from weakref import WeakKeyDictionary
Expand All @@ -42,7 +44,7 @@
from _pytest.pathlib import Path
from _pytest.python import Module
from _pytest.reports import TestReport
from _pytest.tmpdir import TempdirFactory
from _pytest.tmpdir import TempPathFactory

if TYPE_CHECKING:
from typing import Type
Expand Down Expand Up @@ -380,15 +382,15 @@ def LineMatcher_fixture(request: FixtureRequest) -> "Type[LineMatcher]":


@pytest.fixture
def testdir(request: FixtureRequest, tmpdir_factory) -> "Testdir":
def testdir(request: FixtureRequest, tmp_path_factory) -> "Testdir":
"""
A :class: `TestDir` instance, that can be used to run and test pytest itself.
It is particularly useful for testing plugins. It is similar to the `tmpdir` fixture
but provides methods which aid in testing pytest itself.
"""
return Testdir(request, tmpdir_factory)
return Testdir(request, tmp_path_factory)


@pytest.fixture
Expand Down Expand Up @@ -560,7 +562,10 @@ class Testdir:
Attributes:
:ivar tmpdir: The :py:class:`py.path.local` instance of the temporary directory.
:ivar Path tmp_path: temporary directory path used to create files/run tests from, etc.
For backward compatibility, the read-only property ``tmpdir`` returns
a ``py.path.local`` instance.
:ivar plugins: A list of plugins to use with :py:meth:`parseconfig` and
:py:meth:`runpytest`. Initially this is an empty list but plugins can
Expand All @@ -576,7 +581,9 @@ class Testdir:
class TimeoutExpired(Exception):
pass

def __init__(self, request: FixtureRequest, tmpdir_factory: TempdirFactory) -> None:
def __init__(
self, request: FixtureRequest, tmp_path_factory: TempPathFactory
) -> None:
self.request = request
self._mod_collections = (
WeakKeyDictionary()
Expand All @@ -586,8 +593,7 @@ def __init__(self, request: FixtureRequest, tmpdir_factory: TempdirFactory) -> N
else:
name = request.node.name
self._name = name
self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True)
self.tmp_path = tmp_path_factory.mktemp(name, numbered=True) # type: Path
self.plugins = [] # type: List[Union[str, _PluggyPlugin]]
self._cwd_snapshot = CwdSnapshot()
self._sys_path_snapshot = SysPathsSnapshot()
Expand All @@ -597,23 +603,25 @@ def __init__(self, request: FixtureRequest, tmpdir_factory: TempdirFactory) -> N
self._method = self.request.config.getoption("--runpytest")

mp = self.monkeypatch = MonkeyPatch()
mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self.test_tmproot))
mp.setenv("PYTEST_DEBUG_TEMPROOT", str(tmp_path_factory.mktemp("tmproot")))
# Ensure no unexpected caching via tox.
mp.delenv("TOX_ENV_DIR", raising=False)
# Discard outer pytest options.
mp.delenv("PYTEST_ADDOPTS", raising=False)
# Ensure no user config is used.
tmphome = str(self.tmpdir)
tmphome = str(self.tmp_path)
mp.setenv("HOME", tmphome)
mp.setenv("USERPROFILE", tmphome)
# Do not use colors for inner runs by default.
mp.setenv("PY_COLORS", "0")

def __repr__(self):
return "<Testdir {!r}>".format(self.tmpdir)
@property
def tmpdir(self):
"""Backward compatibility: returns ``tmp_path`` as a py.path.local instance."""
return py.path.local(str(self.tmp_path))

def __str__(self):
return str(self.tmpdir)
def __repr__(self):
return "<Testdir {!r}>".format(self.tmp_path)

def finalize(self):
"""Clean up global state artifacts.
Expand Down Expand Up @@ -649,9 +657,9 @@ def chdir(self):
This is done automatically upon instantiation.
"""
self.tmpdir.chdir()
os.chdir(str(self.tmp_path))

def _makefile(self, ext, lines, files, encoding="utf-8"):
def _makefile(self, ext, lines, files, encoding="utf-8") -> py.path.local:
items = list(files.items())

def to_text(s):
Expand All @@ -664,16 +672,18 @@ def to_text(s):

ret = None
for basename, value in items:
p = self.tmpdir.join(basename).new(ext=ext)
p.dirpath().ensure_dir()
p = self.tmp_path.joinpath(basename).with_suffix(ext)
p.parent.mkdir(parents=True, exist_ok=True)
source_ = Source(value)
source = "\n".join(to_text(line) for line in source_.lines)
p.write(source.strip().encode(encoding), "wb")
p.write_text(source.strip(), encoding=encoding)
if ret is None:
ret = p
if ret is not None:
ret = py.path.local(ret)
return ret

def makefile(self, ext, *args, **kwargs):
def makefile(self, ext, *args, **kwargs) -> py.path.local:
r"""Create new file(s) in the testdir.
:param str ext: The extension the file(s) should use, including the dot, e.g. `.py`.
Expand All @@ -694,27 +704,27 @@ def makefile(self, ext, *args, **kwargs):
"""
return self._makefile(ext, args, kwargs)

def makeconftest(self, source):
def makeconftest(self, source) -> py.path.local:
"""Write a contest.py file with 'source' as contents."""
return self.makepyfile(conftest=source)

def makeini(self, source):
def makeini(self, source) -> py.path.local:
"""Write a tox.ini file with 'source' as contents."""
return self.makefile(".ini", tox=source)

def getinicfg(self, source):
def getinicfg(self, source) -> py.path.local:
"""Return the pytest section from the tox.ini config file."""
p = self.makeini(source)
return IniConfig(p)["pytest"]

def makepyprojecttoml(self, source):
def makepyprojecttoml(self, source) -> py.path.local:
"""Write a pyproject.toml file with 'source' as contents.
.. versionadded:: 6.0
"""
return self.makefile(".toml", pyproject=source)

def makepyfile(self, *args, **kwargs):
def makepyfile(self, *args, **kwargs) -> py.path.local:
r"""Shortcut for .makefile() with a .py extension.
Defaults to the test name with a '.py' extension, e.g test_foobar.py, overwriting
existing files.
Expand All @@ -733,7 +743,7 @@ def test_something(testdir):
"""
return self._makefile(".py", args, kwargs)

def maketxtfile(self, *args, **kwargs):
def maketxtfile(self, *args, **kwargs) -> py.path.local:
r"""Shortcut for .makefile() with a .txt extension.
Defaults to the test name with a '.txt' extension, e.g test_foobar.txt, overwriting
existing files.
Expand All @@ -759,30 +769,33 @@ def syspathinsert(self, path=None):
test.
"""
if path is None:
path = self.tmpdir
path = self.tmp_path

self.monkeypatch.syspath_prepend(str(path))

def mkdir(self, name):
def mkdir(self, name) -> py.path.local:
"""Create a new (sub)directory."""
return self.tmpdir.mkdir(name)
p = self.tmp_path / name
p.mkdir()
return py.path.local(str(p))

def mkpydir(self, name):
def mkpydir(self, name) -> py.path.local:
"""Create a new python package.
This creates a (sub)directory with an empty ``__init__.py`` file so it
gets recognised as a python package.
"""
p = self.mkdir(name)
p.ensure("__init__.py")
return p
p = self.tmp_path / name
p.mkdir()
p.joinpath("__init__.py").touch()
return py.path.local(p)

def copy_example(self, name=None):
def copy_example(self, name=None) -> py.path.local:
"""Copy file from project's directory into the testdir.
:param str name: The name of the file to copy.
:return: path to the copied directory (inside ``self.tmpdir``).
:return: path to the copied directory (inside ``self.tmp_path``).
"""
import warnings
Expand All @@ -792,18 +805,18 @@ def copy_example(self, name=None):
example_dir = self.request.config.getini("pytester_example_dir")
if example_dir is None:
raise ValueError("pytester_example_dir is unset, can't copy examples")
example_dir = self.request.config.rootdir.join(example_dir)
example_dir = Path(str(self.request.config.rootdir)) / example_dir

for extra_element in self.request.node.iter_markers("pytester_example_path"):
assert extra_element.args
example_dir = example_dir.join(*extra_element.args)
example_dir = example_dir.joinpath(*extra_element.args)

if name is None:
func_name = self._name
maybe_dir = example_dir / func_name
maybe_file = example_dir / (func_name + ".py")

if maybe_dir.isdir():
if maybe_dir.is_dir():
example_path = maybe_dir
elif maybe_file.isfile():
example_path = maybe_file
Expand All @@ -814,15 +827,18 @@ def copy_example(self, name=None):
)
)
else:
example_path = example_dir.join(name)

if example_path.isdir() and not example_path.join("__init__.py").isfile():
example_path.copy(self.tmpdir)
return self.tmpdir
elif example_path.isfile():
result = self.tmpdir.join(example_path.basename)
example_path.copy(result)
return result
example_path = example_dir.joinpath(name)

if example_path.is_dir() and not example_path.joinpath("__init__.py").is_file():
# TODO: py.path.local.copy can copy files to existing directories,
# while with shutil.copytree the destination directory cannot exist,
# we might need to roll our own
py.path.local(str(example_path)).copy(self.tmpdir)
return py.path.local(str(self.tmp_path))
elif example_path.is_file():
result = self.tmp_path.joinpath(example_path.name)
shutil.copy(str(example_path), str(result))
return py.path.local(str(result))
else:
raise LookupError(
'example "{}" is not found as a file or directory'.format(example_path)
Expand Down Expand Up @@ -1051,7 +1067,7 @@ def _ensure_basetemp(self, args):
if str(x).startswith("--basetemp"):
break
else:
args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp"))
args.append("--basetemp=%s" % self.tmp_path.parent.joinpath("basetemp"))
return args

def parseconfig(self, *args) -> Config:
Expand Down Expand Up @@ -1134,12 +1150,8 @@ def getmodulecol(self, source, configargs=(), withinit=False):
same directory to ensure it is a package
"""
if isinstance(source, Path):
path = self.tmpdir.join(str(source))
assert not withinit, "not supported for paths"
else:
kw = {self._name: Source(source).strip()}
path = self.makepyfile(**kw)
kw = {self._name: Source(source).strip()}
path = self.makepyfile(**kw)
if withinit:
self.makepyfile(__init__="#")
self.config = config = self.parseconfigure(path, *configargs)
Expand Down Expand Up @@ -1167,8 +1179,8 @@ def collect_by_name(
def popen(
self,
cmdargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout: Union[int, TextIO] = subprocess.PIPE,
stderr: Union[int, TextIO] = subprocess.PIPE,
stdin=CLOSE_STDIN,
**kw
):
Expand Down Expand Up @@ -1208,7 +1220,8 @@ def run(self, *cmdargs, timeout=None, stdin=CLOSE_STDIN) -> RunResult:
Run a process using subprocess.Popen saving the stdout and stderr.
:param args: the sequence of arguments to pass to `subprocess.Popen()`
:param cmdargs: the sequence of arguments to pass to `subprocess.Popen()`, with ``Path``
and ``py.path.local`` objects being converted to ``str`` automatically.
:kwarg timeout: the period in seconds after which to timeout and raise
:py:class:`Testdir.TimeoutExpired`
:kwarg stdin: optional standard input. Bytes are being send, closing
Expand All @@ -1222,15 +1235,17 @@ def run(self, *cmdargs, timeout=None, stdin=CLOSE_STDIN) -> RunResult:
__tracebackhide__ = True

cmdargs = tuple(
str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs
str(arg) if isinstance(arg, (py.path.local, Path)) else arg
for arg in cmdargs
)
p1 = self.tmpdir.join("stdout")
p2 = self.tmpdir.join("stderr")
p1 = self.tmp_path.joinpath("stdout")
p2 = self.tmp_path.joinpath("stderr")
print("running:", *cmdargs)
print(" in:", py.path.local())
f1 = open(str(p1), "w", encoding="utf8")
f2 = open(str(p2), "w", encoding="utf8")
try:
print(" in:", Path.cwd())

with open(str(p1), "w", encoding="utf8") as f1, open(
str(p2), "w", encoding="utf8"
) as f2:
now = timing.time()
popen = self.popen(
cmdargs,
Expand Down Expand Up @@ -1261,17 +1276,11 @@ def handle_timeout():
ret = popen.wait(timeout)
except subprocess.TimeoutExpired:
handle_timeout()
finally:
f1.close()
f2.close()
f1 = open(str(p1), encoding="utf8")
f2 = open(str(p2), encoding="utf8")
try:

with open(str(p1), encoding="utf8") as f1, open(str(p2), encoding="utf8") as f2:
out = f1.read().splitlines()
err = f2.read().splitlines()
finally:
f1.close()
f2.close()

self._dump_lines(out, sys.stdout)
self._dump_lines(err, sys.stderr)
try:
Expand Down Expand Up @@ -1318,7 +1327,7 @@ def runpytest_subprocess(self, *args, timeout=None) -> RunResult:
Returns a :py:class:`RunResult`.
"""
__tracebackhide__ = True
p = make_numbered_dir(root=Path(str(self.tmpdir)), prefix="runpytest-")
p = make_numbered_dir(root=self.tmp_path, prefix="runpytest-")
args = ("--basetemp=%s" % p,) + args
plugins = [x for x in self.plugins if isinstance(x, str)]
if plugins:
Expand All @@ -1337,7 +1346,8 @@ def spawn_pytest(
The pexpect child is returned.
"""
basetemp = self.tmpdir.mkdir("temp-pexpect")
basetemp = self.tmp_path / "temp-pexpect"
basetemp.mkdir()
invoke = " ".join(map(str, self._getpytestargs()))
cmd = "{} --basetemp={} {}".format(invoke, basetemp, string)
return self.spawn(cmd, expect_timeout=expect_timeout)
Expand All @@ -1353,7 +1363,7 @@ def spawn(self, cmd: str, expect_timeout: float = 10.0) -> "pexpect.spawn":
pytest.skip("pypy-64 bit not supported")
if not hasattr(pexpect, "spawn"):
pytest.skip("pexpect.spawn not available")
logfile = self.tmpdir.join("spawn.out").open("wb")
logfile = self.tmp_path.joinpath("spawn.out").open("wb")

child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
Expand Down

0 comments on commit 456857b

Please sign in to comment.