Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions Lib/importlib/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ class FastPath:

def __init__(self, root):
self.root = root
self.base = os.path.basename(root).lower()

def joinpath(self, child):
return pathlib.Path(self.root, child)
Expand All @@ -413,12 +414,11 @@ def zip_children(self):
)

def is_egg(self, search):
root_n_low = os.path.split(self.root)[1].lower()

base = self.base
return (
root_n_low == search.normalized + '.egg'
or root_n_low.startswith(search.prefix)
and root_n_low.endswith('.egg'))
base == search.versionless_egg_name
or base.startswith(search.prefix)
and base.endswith('.egg'))

def search(self, name):
for child in self.children():
Expand All @@ -439,6 +439,7 @@ class Prepared:
prefix = ''
suffixes = '.dist-info', '.egg-info'
exact_matches = [''][:0]
versionless_egg_name = ''

def __init__(self, name):
self.name = name
Expand All @@ -448,6 +449,7 @@ def __init__(self, name):
self.prefix = self.normalized + '-'
self.exact_matches = [
self.normalized + suffix for suffix in self.suffixes]
self.versionless_egg_name = self.normalized + '.egg'


class MetadataPathFinder(DistributionFinder):
Expand Down
23 changes: 21 additions & 2 deletions Lib/test/test_importlib/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,28 @@ def tempdir_as_cwd():
yield tmp


class SiteDir:
@contextlib.contextmanager
def install_finder(finder):
sys.meta_path.append(finder)
try:
yield
finally:
sys.meta_path.remove(finder)


class Fixtures:
def setUp(self):
self.fixtures = ExitStack()
self.addCleanup(self.fixtures.close)


class SiteDir(Fixtures):
def setUp(self):
super(SiteDir, self).setUp()
self.site_dir = self.fixtures.enter_context(tempdir())


class OnSysPath:
class OnSysPath(Fixtures):
@staticmethod
@contextlib.contextmanager
def add_sys_path(dir):
Expand Down Expand Up @@ -198,3 +212,8 @@ def build_files(file_defs, prefix=pathlib.Path()):
def DALS(str):
"Dedent and left-strip"
return textwrap.dedent(str).lstrip()


class NullFinder:
def find_module(self, name):
pass
10 changes: 10 additions & 0 deletions Lib/test/test_importlib/stubs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import unittest


class fake_filesystem_unittest:
"""
Stubbed version of the pyfakefs module
"""
class TestCase(unittest.TestCase):
def setUpPyfakefs(self):
self.skipTest("pyfakefs not available")
32 changes: 32 additions & 0 deletions Lib/test/test_importlib/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
import unittest
import importlib.metadata

try:
import pyfakefs.fake_filesystem_unittest as ffs
except ImportError:
from .stubs import fake_filesystem_unittest as ffs

from . import fixtures
from importlib.metadata import (
Distribution, EntryPoint,
Expand Down Expand Up @@ -185,6 +190,33 @@ def test_egg(self):
version('foo')


class MissingSysPath(fixtures.OnSysPath, unittest.TestCase):
site_dir = '/does-not-exist'

def test_discovery(self):
"""
Discovering distributions should succeed even if
there is an invalid path on sys.path.
"""
importlib.metadata.distributions()


class InaccessibleSysPath(fixtures.OnSysPath, ffs.TestCase):
site_dir = '/access-denied'

def setUp(self):
super(InaccessibleSysPath, self).setUp()
self.setUpPyfakefs()
self.fs.create_dir(self.site_dir, perm_bits=000)

def test_discovery(self):
"""
Discovering distributions should succeed even if
there is an invalid path on sys.path.
"""
list(importlib.metadata.distributions())


class TestEntryPoints(unittest.TestCase):
def __init__(self, *args):
super(TestEntryPoints, self).__init__(*args)
Expand Down
142 changes: 103 additions & 39 deletions Lib/test/test_zipfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2724,16 +2724,71 @@ def test_extract_command(self):
self.assertEqual(f.read(), zf.read(zi))


class TestExecutablePrependedZip(unittest.TestCase):
"""Test our ability to open zip files with an executable prepended."""

def setUp(self):
self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')

def _test_zip_works(self, name):
# bpo28494 sanity check: ensure is_zipfile works on these.
self.assertTrue(zipfile.is_zipfile(name),
f'is_zipfile failed on {name}')
# Ensure we can operate on these via ZipFile.
with zipfile.ZipFile(name) as zipfp:
for n in zipfp.namelist():
data = zipfp.read(n)
self.assertIn(b'FAVORITE_NUMBER', data)

def test_read_zip_with_exe_prepended(self):
self._test_zip_works(self.exe_zip)

def test_read_zip64_with_exe_prepended(self):
self._test_zip_works(self.exe_zip64)

@unittest.skipUnless(sys.executable, 'sys.executable required.')
@unittest.skipUnless(os.access('/bin/bash', os.X_OK),
'Test relies on #!/bin/bash working.')
def test_execute_zip2(self):
output = subprocess.check_output([self.exe_zip, sys.executable])
self.assertIn(b'number in executable: 5', output)

@unittest.skipUnless(sys.executable, 'sys.executable required.')
@unittest.skipUnless(os.access('/bin/bash', os.X_OK),
'Test relies on #!/bin/bash working.')
def test_execute_zip64(self):
output = subprocess.check_output([self.exe_zip64, sys.executable])
self.assertIn(b'number in executable: 5', output)


# Poor man's technique to consume a (smallish) iterable.
consume = tuple


# from jaraco.itertools 5.0
class jaraco:
class itertools:
class Counter:
def __init__(self, i):
self.count = 0
self._orig_iter = iter(i)

def __iter__(self):
return self

def __next__(self):
result = next(self._orig_iter)
self.count += 1
return result


def add_dirs(zf):
"""
Given a writable zip file zf, inject directory entries for
any directories implied by the presence of children.
"""
for name in zipfile.Path._implied_dirs(zf.namelist()):
for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
zf.writestr(name, b"")
return zf

Expand Down Expand Up @@ -2774,44 +2829,6 @@ def build_alpharep_fixture():
return zf


class TestExecutablePrependedZip(unittest.TestCase):
"""Test our ability to open zip files with an executable prepended."""

def setUp(self):
self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')

def _test_zip_works(self, name):
# bpo-28494 sanity check: ensure is_zipfile works on these.
self.assertTrue(zipfile.is_zipfile(name),
f'is_zipfile failed on {name}')
# Ensure we can operate on these via ZipFile.
with zipfile.ZipFile(name) as zipfp:
for n in zipfp.namelist():
data = zipfp.read(n)
self.assertIn(b'FAVORITE_NUMBER', data)

def test_read_zip_with_exe_prepended(self):
self._test_zip_works(self.exe_zip)

def test_read_zip64_with_exe_prepended(self):
self._test_zip_works(self.exe_zip64)

@unittest.skipUnless(sys.executable, 'sys.executable required.')
@unittest.skipUnless(os.access('/bin/bash', os.X_OK),
'Test relies on #!/bin/bash working.')
def test_execute_zip2(self):
output = subprocess.check_output([self.exe_zip, sys.executable])
self.assertIn(b'number in executable: 5', output)

@unittest.skipUnless(sys.executable, 'sys.executable required.')
@unittest.skipUnless(os.access('/bin/bash', os.X_OK),
'Test relies on #!/bin/bash working.')
def test_execute_zip64(self):
output = subprocess.check_output([self.exe_zip64, sys.executable])
self.assertIn(b'number in executable: 5', output)


class TestPath(unittest.TestCase):
def setUp(self):
self.fixtures = contextlib.ExitStack()
Expand Down Expand Up @@ -2849,6 +2866,14 @@ def test_iterdir_and_types(self):
i, = h.iterdir()
assert i.is_file()

def test_subdir_is_dir(self):
for alpharep in self.zipfile_alpharep():
root = zipfile.Path(alpharep)
assert (root / 'b').is_dir()
assert (root / 'b/').is_dir()
assert (root / 'g').is_dir()
assert (root / 'g/').is_dir()

def test_open(self):
for alpharep in self.zipfile_alpharep():
root = zipfile.Path(alpharep)
Expand Down Expand Up @@ -2910,6 +2935,45 @@ def test_missing_dir_parent(self):
root = zipfile.Path(alpharep)
assert (root / 'missing dir/').parent.at == ''

def test_mutability(self):
"""
If the underlying zipfile is changed, the Path object should
reflect that change.
"""
for alpharep in self.zipfile_alpharep():
root = zipfile.Path(alpharep)
a, b, g = root.iterdir()
alpharep.writestr('foo.txt', 'foo')
alpharep.writestr('bar/baz.txt', 'baz')
assert any(
child.name == 'foo.txt'
for child in root.iterdir())
assert (root / 'foo.txt').read_text() == 'foo'
baz, = (root / 'bar').iterdir()
assert baz.read_text() == 'baz'

HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13

def huge_zipfile(self):
"""Create a read-only zipfile with a huge number of entries entries."""
strm = io.BytesIO()
zf = zipfile.ZipFile(strm, "w")
for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
zf.writestr(entry, entry)
zf.mode = 'r'
return zf

def test_joinpath_constant_time(self):
"""
Ensure joinpath on items in zipfile is linear time.
"""
root = zipfile.Path(self.huge_zipfile())
entries = jaraco.itertools.Counter(root.iterdir())
for entry in entries:
entry.joinpath('suffix')
# Check the file iterated all items
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES


if __name__ == "__main__":
unittest.main()
Loading