diff --git a/src/wheel_inspect/__init__.py b/src/wheel_inspect/__init__.py index 520f2f2..7c390bd 100644 --- a/src/wheel_inspect/__init__.py +++ b/src/wheel_inspect/__init__.py @@ -21,6 +21,7 @@ FileProvider, WheelFile, ) +from .consts import PathType, Tree from .inspecting import inspect, inspect_dist_info_dir, inspect_wheel from .schema import WHEEL_SCHEMA @@ -35,6 +36,8 @@ "DistInfoDir", "DistInfoProvider", "FileProvider", + "PathType", + "Tree", "WHEEL_SCHEMA", "WheelFile", "inspect", diff --git a/src/wheel_inspect/classes.py b/src/wheel_inspect/classes.py index 2dd4e47..57c84cd 100644 --- a/src/wheel_inspect/classes.py +++ b/src/wheel_inspect/classes.py @@ -1,19 +1,34 @@ from __future__ import annotations import abc -import io +from io import TextIOWrapper import os -from pathlib import Path +import pathlib import sys -from typing import IO, Any, Dict, List, Optional, Set, TextIO, TypeVar, Union, overload +from typing import ( + IO, + Any, + Dict, + Iterator, + List, + Mapping, + Optional, + Set, + TextIO, + Tuple, + TypeVar, + Union, + overload, +) from zipfile import ZipFile import attr from entry_points_txt import EntryPointSet from entry_points_txt import load as load_entry_points from wheel_filename import ParsedWheelFilename, parse_wheel_filename from . import errors as exc -from .consts import AnyPath, PathType +from .consts import AnyPath, PathType, Tree from .metadata import parse_metadata -from .record import Record, RecordPath +from .path import Path +from .record import FileData, Record, RecordPath from .util import ( digest_file, filedata_is_optional, @@ -31,8 +46,12 @@ from typing_extensions import Literal +FiletreeID = Union[Tree, str] # str = folder under .data + T = TypeVar("T", bound="DistInfoProvider") +P = TypeVar("P", bound="TreePath") + @attr.define(slots=False) # slots=False so that cached_property works class DistInfoProvider(abc.ABC): @@ -182,6 +201,12 @@ def zip_safe(self) -> Optional[bool]: else: return None + @cached_property + def filetrees(self) -> FiletreeMapping: + return FiletreeMapping( + record=self.record, root_is_purelib=self.wheel_info["root_is_purelib"] + ) + class FileProvider(abc.ABC): @abc.abstractmethod @@ -253,7 +278,7 @@ def get_file_digest(self, path: str, algorithm: str) -> str: @overload def open( self, - path: Union[str, RecordPath], + path: Union[str, RecordPath, TreePath], mode: Literal["r"] = "r", encoding: Optional[str] = None, errors: Optional[str] = None, @@ -264,7 +289,7 @@ def open( @overload def open( self, - path: Union[str, RecordPath], + path: Union[str, RecordPath, TreePath], mode: Literal["rb"], encoding: None = None, errors: None = None, @@ -275,7 +300,7 @@ def open( @abc.abstractmethod def open( self, - path: Union[str, RecordPath], + path: Union[str, RecordPath, TreePath], mode: Literal["r", "rb"] = "r", encoding: Optional[str] = None, errors: Optional[str] = None, @@ -301,7 +326,9 @@ def verify_record(self, record: Record, digest: bool = True) -> None: if not is_signature_file(path): raise exc.UnrecordedPathError(path) - def verify_file(self, path: RecordPath, digest: bool = True) -> None: + def verify_file( + self, path: Union[RecordPath, TreePath], digest: bool = True + ) -> None: rpath = path # For readability spath = str(rpath) filedata = rpath.filedata @@ -367,11 +394,11 @@ def verify_file(self, path: RecordPath, digest: bool = True) -> None: @attr.define class DistInfoDir(DistInfoProvider): - path: Path + path: pathlib.Path @classmethod def from_path(cls, path: AnyPath, strict: bool = True) -> DistInfoDir: - d = cls(Path(os.fsdecode(path))) + d = cls(pathlib.Path(os.fsdecode(path))) if strict: d.validate() return d @@ -437,6 +464,14 @@ def dist_info_dirname(self) -> str: def data_dirname(self) -> Optional[str]: return self.record.data_dirname + @cached_property + def filetrees(self) -> BackedFiletreeMapping: + return BackedFiletreeMapping( + record=self.record, + root_is_purelib=self.wheel_info["root_is_purelib"], + backing=self, + ) + def has_dist_info_file(self, path: str) -> bool: return self.has_file(self.dist_info_dirname + "/" + path) @@ -451,8 +486,10 @@ def validate(self) -> None: def verify(self, digest: bool = True) -> None: self.verify_record(self.record, digest=digest) - def verify_file(self, path: Union[str, RecordPath], digest: bool = True) -> None: - if not isinstance(path, RecordPath): + def verify_file( + self, path: Union[str, RecordPath, TreePath], digest: bool = True + ) -> None: + if isinstance(path, str): path = self.record.filetree / path super().verify_file(path, digest=digest) @@ -467,7 +504,7 @@ class WheelFile(BackedDistInfo): @classmethod def from_path(cls, path: AnyPath, strict: bool = True) -> WheelFile: - p = Path(os.fsdecode(path)) + p = pathlib.Path(os.fsdecode(path)) return cls.from_file(p.open("rb"), path=p, strict=strict) @classmethod @@ -603,7 +640,7 @@ def get_file_size(self, path: str) -> int: @overload def open( self, - path: Union[str, RecordPath], + path: Union[str, RecordPath, TreePath], mode: Literal["r"] = "r", encoding: Optional[str] = None, errors: Optional[str] = None, @@ -614,7 +651,7 @@ def open( @overload def open( self, - path: Union[str, RecordPath], + path: Union[str, RecordPath, TreePath], mode: Literal["rb"], encoding: None = None, errors: None = None, @@ -624,7 +661,7 @@ def open( def open( self, - path: Union[str, RecordPath], + path: Union[str, RecordPath, TreePath], mode: Literal["r", "rb"] = "r", encoding: Optional[str] = None, errors: Optional[str] = None, @@ -647,8 +684,248 @@ def open( raise exc.NotFileError(path) fp = self.zipfile.open(zi) if mode == "r": - return io.TextIOWrapper( - fp, encoding=encoding, errors=errors, newline=newline - ) + return TextIOWrapper(fp, encoding=encoding, errors=errors, newline=newline) else: return fp + + +@attr.define +class TreePath(Path): + # .parts and str() contain the full path from the root of the wheel, not + # from the root of the filetree + + tree_id: FiletreeID + + # For ROOT/lib, this is pruned of the .dist-info and .data trees + _record_path: RecordPath + + # How many leading path components to strip from .path to get the parts + # relative to the root of the filetree + _root_depth: int + + def __repr__(self) -> str: + if isinstance(self.tree_id, Tree): + tree = str(self.tree_id) + else: + tree = repr(self.tree_id) + return f"{type(self).__name__}({str(self)!r}, tree_id={tree})" + + @property + def filedata(self) -> Optional[FileData]: + return self._record_path.filedata + + @property + def relative_parts(self) -> Tuple[str, ...]: + # Relative to the root of the tree + return self.parts[self._root_depth :] + + @property + def relative_path(self) -> str: + # Relative to the root of the tree + return "/".join(self.relative_parts) + + def get_subpath(self: P, name: str) -> P: + if self.is_file(): + raise exc.NotDirectoryError(str(self)) + elif name == ".": + return self + elif name == "..": + return self.parent + else: + subnode = self._record_path / name + return attr.evolve(self, parts=subnode.parts, record_path=subnode) + + @property + def parent(self: P) -> P: + if self.is_root(): + return self + else: + supernode = self._record_path.parent + return attr.evolve(self, parts=supernode.parts, record_path=supernode) + + def is_root(self) -> bool: + # Detects whether we're at the root of the filetree + return len(self.parts) == self._root_depth + + @property + def path_type(self) -> PathType: + return self._record_path.path_type + + def exists(self) -> bool: + return self._record_path.exists() + + def is_file(self) -> bool: + return self._record_path.is_file() + + def is_dir(self) -> bool: + return self._record_path.is_dir() + + def iterdir(self: P) -> Iterator[P]: + for n in self._record_path.iterdir(): + yield attr.evolve(self, parts=n.parts, record_path=n) + + +# Need to be explicit because we're inheriting a __repr__: +@attr.define(repr=False) +class BackedTreePath(TreePath): + backing: BackedDistInfo + + @overload + def open( + self, + mode: Literal["r"] = "r", + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> TextIO: + ... + + @overload + def open( + self, + mode: Literal["rb"], + encoding: None = None, + errors: None = None, + newline: None = None, + ) -> IO[bytes]: + ... + + def open( + self, + mode: Literal["r", "rb"] = "r", + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> IO: + if mode not in ("r", "rb"): + raise ValueError(f"Unsupported file mode: {mode!r}") + if mode == "r": + return self.backing.open( + self, + mode=mode, + encoding=encoding, + errors=errors, + newline=newline, + ) + else: + return self.backing.open(self, mode=mode) + + def read_bytes(self) -> bytes: + with self.open("rb") as fp: + return fp.read() + + def read_text( + self, encoding: Optional[str] = None, errors: Optional[str] = None + ) -> str: + with self.open("r", encoding=encoding, errors=errors) as fp: + return fp.read() + + def verify(self, digest: bool = True) -> None: + if not self.exists(): + ### TODO: Replace this with a different exception: + raise exc.NoSuchPathError(str(self)) + else: + self.backing.verify_file(self._record_path, digest=digest) + + +@attr.define +class FiletreeMapping(Mapping[FiletreeID, TreePath]): + record: Record + root_is_purelib: bool + _cache: Dict[FiletreeID, TreePath] = attr.field(factory=dict, init=False) + + # When root-is-purelib, Tree.PLATLIB and "platlib" are the same (because + # the latter just accesses the *.data subdir of that name), but "purelib" + # does not exist (unless the wheel actually has a *.data/purelib directory) + def __getitem__(self, key: FiletreeID) -> TreePath: + rkey = self._resolve_key(key) + if rkey not in self._cache: + root = self._get_root(rkey) + if root is None: + raise KeyError(key) + self._cache[rkey] = TreePath( + parts=root.parts, + tree_id=rkey, + record_path=root, + root_depth=len(root.parts), + ) + return self._cache[rkey] + + def __iter__(self) -> Iterator[FiletreeID]: + keys: Set[FiletreeID] = set(map(self._resolve_key, Tree)) + data_dirname = self.record.data_dirname + if data_dirname is not None: + data_dir = self.record.filetree / data_dirname + if self.root_is_purelib and not (data_dir / "platlib").exists(): + keys.discard(Tree.PLATLIB) + elif not self.root_is_purelib and not (data_dir / "purelib").exists(): + keys.discard(Tree.PURELIB) + for p in data_dir.iterdir(): + if p.is_dir(): + keys.add(self._resolve_key(p.name)) + return iter(keys) + + def __len__(self) -> int: + return sum(1 for _ in self) + + def _resolve_key(self, key: FiletreeID) -> FiletreeID: + if key is Tree.ROOT: + if self.root_is_purelib: + return Tree.PURELIB + else: + return Tree.PLATLIB + elif not key or (isinstance(key, str) and "/" in key) or key in (".", ".."): + raise KeyError(key) + else: + return key + + def _get_root(self, tree_id: FiletreeID) -> Optional[RecordPath]: + data_dirname = self.record.data_dirname + if tree_id is Tree.ALL: + root = self.record.filetree + elif (tree_id is Tree.PURELIB and self.root_is_purelib) or ( + tree_id is Tree.PLATLIB and not self.root_is_purelib + ): + pruned = [self.record.dist_info_dirname] + if data_dirname is not None: + pruned.append(data_dirname) + root = self.record.filetree._prune(pruned) + elif tree_id is Tree.DIST_INFO: + root = self.record.filetree / self.record.dist_info_dirname + elif tree_id is Tree.DATA: + if data_dirname is None: + return None + root = self.record.filetree / data_dirname + elif data_dirname is None: + return None + else: + root = ( + self.record.filetree + / data_dirname + / (tree_id if isinstance(tree_id, str) else tree_id.value) + ) + if not root.is_dir(): + return None + return root + + +@attr.define +class BackedFiletreeMapping(FiletreeMapping, Mapping[FiletreeID, BackedTreePath]): + backing: BackedDistInfo + + def __getitem__(self, key: FiletreeID) -> TreePath: + rkey = self._resolve_key(key) + if rkey not in self._cache: + root = self._get_root(rkey) + if root is None: + raise KeyError(key) + self._cache[rkey] = BackedTreePath( + parts=root.parts, + tree_id=rkey, + record_path=root, + root_depth=len(root.parts), + backing=self.backing, + ) + tp = self._cache[rkey] + assert isinstance(tp, BackedTreePath) + return tp diff --git a/src/wheel_inspect/consts.py b/src/wheel_inspect/consts.py index 6f391fb..3611e67 100644 --- a/src/wheel_inspect/consts.py +++ b/src/wheel_inspect/consts.py @@ -24,3 +24,12 @@ class PathType(Enum): FILE = "file" DIRECTORY = "directory" OTHER = "other" # for symlinks, devices, sockets, etc. in the backing + + +class Tree(Enum): + ALL = "ALL" + ROOT = "ROOT" # alias for purelib or platlib, depending + PURELIB = "purelib" + PLATLIB = "platlib" + DIST_INFO = "dist-info" + DATA = "data" # The whole .data directory diff --git a/src/wheel_inspect/path.py b/src/wheel_inspect/path.py index d77bf53..6b3690c 100644 --- a/src/wheel_inspect/path.py +++ b/src/wheel_inspect/path.py @@ -19,8 +19,11 @@ def __repr__(self) -> str: @property def name(self) -> str: - # Returns "" for the root of a filetree - return (("",) + self.parts)[-1] + if self.is_root(): + return "" + else: + assert self.parts + return self.parts[-1] @abc.abstractmethod def get_subpath(self: P, name: str) -> P: diff --git a/src/wheel_inspect/record.py b/src/wheel_inspect/record.py index f1c4a5a..8d81cdb 100644 --- a/src/wheel_inspect/record.py +++ b/src/wheel_inspect/record.py @@ -101,6 +101,28 @@ def _mkdir(self, name: str) -> RecordPath: else: raise errors.RecordConflictError(str(n)) + def _prune(self, names: List[str]) -> RecordPath: + if not self.is_dir(): + raise TypeError("Cannot prune a non-directory") + assert self._children is not None + pruned = self._children.copy() + for n in names: + pruned.pop(n, None) + newdir = attr.evolve(self, children=pruned) + newdir._adopt() + return newdir + + def _adopt(self) -> None: + if not self.is_dir(): + raise TypeError("Non-directories cannot adopt") + assert self._children is not None + self._children = { + k: attr.evolve(v, parent=self) for k, v in self._children.items() + } + for v in self._children.values(): + if v.is_dir(): + v._adopt() + @property def parent(self) -> RecordPath: return self._parent if self._parent is not None else self diff --git a/test/data/wheels/netkiller_devops-0.2.6-py3-none-any.json b/test/data/wheels/netkiller_devops-0.2.6-py3-none-any.json new file mode 100644 index 0000000..9dc5616 --- /dev/null +++ b/test/data/wheels/netkiller_devops-0.2.6-py3-none-any.json @@ -0,0 +1,291 @@ +{ + "derived": { + "dependencies": [ + "pyttsx3", + "pyyaml", + "redis", + "requests" + ], + "description_in_body": true, + "description_in_headers": false, + "keyword_separator": null, + "keywords": [], + "modules": [ + "netkiller", + "netkiller.docker", + "netkiller.git", + "netkiller.kubernetes", + "netkiller.nagios", + "netkiller.rsync", + "netkiller.wework", + "netkiller.whiptail" + ], + "readme_renders": true + }, + "dist_info": { + "dependency_links": null, + "entry_points": null, + "metadata": { + "author": "Neo Chen", + "author_email": "netkiller@msn.com", + "classifier": [ + "Development Status :: 5 - Production/Stable", + "Environment :: Console", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent" + ], + "description": { + "length": 1187 + }, + "description_content_type": "text/markdown", + "home_page": "https://github.com/oscm/devops", + "license": "BSD", + "license_file": [ + "LICENSE" + ], + "metadata_version": "2.1", + "name": "netkiller-devops", + "platform": [], + "requires_dist": [ + { + "extras": [], + "marker": null, + "name": "pyyaml", + "specifier": "", + "url": null + }, + { + "extras": [], + "marker": null, + "name": "requests", + "specifier": "", + "url": null + }, + { + "extras": [], + "marker": null, + "name": "redis", + "specifier": "", + "url": null + }, + { + "extras": [], + "marker": null, + "name": "pyttsx3", + "specifier": "", + "url": null + } + ], + "summary": "DevOps of useful deployment and automation", + "version": "0.2.6" + }, + "namespace_packages": null, + "record": { + "netkiller/__init__.py": { + "algorithm": "sha256", + "digest": "4KCG6HW6TFtOVNi6qv2avEnqWto9dGgvRrT78R1aq0c", + "size": 71 + }, + "netkiller/docker.py": { + "algorithm": "sha256", + "digest": "P85q3d2HZhLtgPmcGWI-AJ-SCW9-OhqHeSq_7M56uWU", + "size": 13172 + }, + "netkiller/git.py": { + "algorithm": "sha256", + "digest": "q4qe2bJ4VVDWf8957CXluW7sZ2vInIP6h9b2j0v8gHo", + "size": 2141 + }, + "netkiller/kubernetes.py": { + "algorithm": "sha256", + "digest": "DcysaMKfFuvgmVvFA0bHcxlAr3vJh3cGgQMiFAAoNJM", + "size": 4948 + }, + "netkiller/nagios.py": { + "algorithm": "sha256", + "digest": "sQDdYARGDL_RqO-5TXbrQgI4S38Ql9oVCM5KAuEIbAs", + "size": 2195 + }, + "netkiller/rsync.py": { + "algorithm": "sha256", + "digest": "a9gq01Esaehi-OmWzhJtmLTKBCC-EWvSNpQHUTpALa4", + "size": 1705 + }, + "netkiller/wework.py": { + "algorithm": "sha256", + "digest": "7sUMxYtO8dEnoYdDJRJwh5q6JCU4VDGPIheFEb5QL8M", + "size": 1786 + }, + "netkiller/whiptail.py": { + "algorithm": "sha256", + "digest": "XtI4KaFYCt7KOOCiFkn7KzAVAEtUmc6E5LTWe3AVrxM", + "size": 2320 + }, + "netkiller_devops-0.2.6.data/data/etc/deployment.cfg": { + "algorithm": "sha256", + "digest": "k32CGZi-S_2uZq-hXeqhJkX9KwPGhapgKwgzbLMPbjc", + "size": 336 + }, + "netkiller_devops-0.2.6.data/data/etc/notification.ini": { + "algorithm": "sha256", + "digest": "pMXnBoDcxvL46X4dDTps4_-wc3PZgzeXyFM-YGmDdek", + "size": 87 + }, + "netkiller_devops-0.2.6.data/data/etc/os.ini": { + "algorithm": "sha256", + "digest": "xypYUmck9pV4R16eYcY3bHlAh2MufJ6vAmUYp9GZtog", + "size": 218 + }, + "netkiller_devops-0.2.6.data/data/etc/schedule.cfg": { + "algorithm": "sha256", + "digest": "CWNUKAv6Wqijj1hIxn9Wn1UfmGOH6VkK2QRcsDbJCZk", + "size": 125 + }, + "netkiller_devops-0.2.6.data/data/etc/task.cfg": { + "algorithm": "sha256", + "digest": "-OzNjkUrBTc5SjUeNJpOp69xTgyIxWGWldyvQadjZfM", + "size": 1117 + }, + "netkiller_devops-0.2.6.data/data/libexec/devops/backup.mysql.gpg.sh": { + "algorithm": "sha256", + "digest": "o7L6mRl41TJoHMF60wqFe7T_b6iEtZrAk8rl9YkFDLw", + "size": 1500 + }, + "netkiller_devops-0.2.6.data/data/libexec/devops/backup.mysql.sh": { + "algorithm": "sha256", + "digest": "yNOF31crnVm98X7Uj55Z6nZCwulZgAIEZ0KIuIS-LDU", + "size": 1412 + }, + "netkiller_devops-0.2.6.data/data/libexec/devops/backup.mysql.struct.sh": { + "algorithm": "sha256", + "digest": "9zpuRbtjlzqxYUrLsuqDhHkuRGXw2vhjwO6lhiTRpSQ", + "size": 2382 + }, + "netkiller_devops-0.2.6.data/data/share/devops.sh": { + "algorithm": "sha256", + "digest": "dXC19mPiuUDTlqpTRsMfB-Z9DqKytMPBMQ2dYSDxsRU", + "size": 34 + }, + "netkiller_devops-0.2.6.data/data/share/devops/voice.md": { + "algorithm": "sha256", + "digest": "zyQ-QttBwpVG5-nH8dy1ShHgc0YaH89325d9yzJerM4", + "size": 1130 + }, + "netkiller_devops-0.2.6.data/data/share/devops/wechat.md": { + "algorithm": "sha256", + "digest": "KEcZD5yAnyQGVgtyUmYIKli7CGFtSGgkiHakADtNKA8", + "size": 769 + }, + "netkiller_devops-0.2.6.data/data/share/example.com.ini": { + "algorithm": "sha256", + "digest": "bnlEqLzGNHhN6y3EHTKO0bOXHVoM5KcT2M5shznfdnE", + "size": 1121 + }, + "netkiller_devops-0.2.6.data/scripts/backup": { + "algorithm": "sha256", + "digest": "e7yAF3uiCebWkzs_LPNIEvHbiYTKjVMjb4A5htZIDf4", + "size": 8161 + }, + "netkiller_devops-0.2.6.data/scripts/chpasswd.sh": { + "algorithm": "sha256", + "digest": "y5Z_Q9wcIBK4HqwWrgCC7PpG3h57N-_Z1VPR-TxBfcU", + "size": 512 + }, + "netkiller_devops-0.2.6.data/scripts/deployment": { + "algorithm": "sha256", + "digest": "Vok_C5PXv1yLTE2Q9xuwD-c7rZ3Rddl6-vWGVp2TqM4", + "size": 18030 + }, + "netkiller_devops-0.2.6.data/scripts/gitsync": { + "algorithm": "sha256", + "digest": "XKmCZI9u0e9vohhVc2X9WJqtHBRSnRi-2n7PrYgF1BE", + "size": 1929 + }, + "netkiller_devops-0.2.6.data/scripts/lrsync": { + "algorithm": "sha256", + "digest": "5iiObQMndPhQAjFzZj5kpz6IAyMk3ELbETjwzNTa7zk", + "size": 3080 + }, + "netkiller_devops-0.2.6.data/scripts/matrixpasswd": { + "algorithm": "sha256", + "digest": "ELwngk6mC_B_6t3mHZFZqQKa_8xFCdhxAZ-QO228pUE", + "size": 60 + }, + "netkiller_devops-0.2.6.data/scripts/mysqlshell": { + "algorithm": "sha256", + "digest": "4AME6QiQfPIX5FbFAxF4EZKO_2mCWrJmohXsEcuyKso", + "size": 5103 + }, + "netkiller_devops-0.2.6.data/scripts/osconf": { + "algorithm": "sha256", + "digest": "MCEjw3zLzM0UwA0DRnRhvXEOGxgyFuQdqrCCO1N8WFo", + "size": 9457 + }, + "netkiller_devops-0.2.6.data/scripts/randpasswd": { + "algorithm": "sha256", + "digest": "QjHu62b9f5-O3hCdDiUT587PRp1jcim_SYeVugdV6js", + "size": 48 + }, + "netkiller_devops-0.2.6.data/scripts/voice": { + "algorithm": "sha256", + "digest": "2NMFNmrnQ7K8H8LnX8ZrQ7xqPneNgiLkImmqzHvtmWk", + "size": 3848 + }, + "netkiller_devops-0.2.6.data/scripts/wechat": { + "algorithm": "sha256", + "digest": "4nZKunqD81RuFYe52fIkYa4TTCmoNAV0qyAkq23N9lo", + "size": 1889 + }, + "netkiller_devops-0.2.6.dist-info/LICENSE": { + "algorithm": "sha256", + "digest": "_RRcNTcT56nDkrSN1nRI9nWzMpBeEBrYTn4fnHh6Wkw", + "size": 1081 + }, + "netkiller_devops-0.2.6.dist-info/METADATA": { + "algorithm": "sha256", + "digest": "iOHoc6dUwMYsd88rNm7A36lu-2nnE3WyzWW2Aw8GAwg", + "size": 1825 + }, + "netkiller_devops-0.2.6.dist-info/RECORD": null, + "netkiller_devops-0.2.6.dist-info/WHEEL": { + "algorithm": "sha256", + "digest": "ewwEueio1C2XeHTvT17n8dZUJgOvyCWCt0WVNLClP9o", + "size": 92 + }, + "netkiller_devops-0.2.6.dist-info/top_level.txt": { + "algorithm": "sha256", + "digest": "43lHmF7KzdqgcxA6_yrYmSQOg8YXBGeCRbt7xtHPF-g", + "size": 10 + } + }, + "top_level": [ + "netkiller" + ], + "wheel": { + "generator": "bdist_wheel (0.37.0)", + "root_is_purelib": true, + "tag": [ + "py3-none-any" + ], + "wheel_version": "1.0" + }, + "zip_safe": null + }, + "valid": true, + "wheel_name": { + "abi_tags": [ + "none" + ], + "build": null, + "name": "netkiller_devops-0.2.6-py3-none-any.whl", + "platform_tags": [ + "any" + ], + "project": "netkiller_devops", + "python_tags": [ + "py3" + ], + "version": "0.2.6" + } +} diff --git a/test/data/wheels/netkiller_devops-0.2.6-py3-none-any.whl b/test/data/wheels/netkiller_devops-0.2.6-py3-none-any.whl new file mode 100644 index 0000000..376997b Binary files /dev/null and b/test/data/wheels/netkiller_devops-0.2.6-py3-none-any.whl differ diff --git a/test/test_filetree.py b/test/test_filetree.py new file mode 100644 index 0000000..a324938 --- /dev/null +++ b/test/test_filetree.py @@ -0,0 +1,243 @@ +import pytest +from testing_lib import DATA_DIR +from wheel_inspect import PathType, Tree, WheelFile +from wheel_inspect.classes import BackedTreePath +from wheel_inspect.errors import NoSuchPathError, NotDirectoryError +from wheel_inspect.record import FileData + + +def test_backedfiletree() -> None: + with WheelFile.from_path( + DATA_DIR / "wheels" / "netkiller_devops-0.2.6-py3-none-any.whl" + ) as whl: + assert set(whl.filetrees) == { + Tree.ALL, + Tree.PURELIB, + Tree.DATA, + Tree.DIST_INFO, + "scripts", + "data", + } + assert len(whl.filetrees) == 6 + + purelib = whl.filetrees[Tree.PURELIB] + assert isinstance(purelib, BackedTreePath) + assert purelib.parts == () + assert purelib.name == "" + assert purelib.stem == "" + assert purelib.suffix == "" + assert purelib.parent is purelib + assert purelib.parents == () + assert purelib.tree_id is Tree.PURELIB + assert str(purelib) == "" + assert repr(purelib) == "BackedTreePath('', tree_id=Tree.PURELIB)" + assert purelib.exists() + assert purelib.is_dir() + assert purelib.is_root() + assert purelib.root_path is purelib + assert sorted(p.name for p in purelib.iterdir()) == ["netkiller"] + assert purelib.filedata is None + assert purelib.path_type is PathType.DIRECTORY + with pytest.raises(ValueError): + purelib.with_suffix(".txt") + + netkiller = purelib / "netkiller" + assert netkiller.parts == ("netkiller",) + assert netkiller.name == "netkiller" + assert netkiller.stem == "netkiller" + assert netkiller.suffix == "" + assert netkiller.parent == purelib + assert netkiller.parents == (purelib,) + assert (netkiller / ".") is netkiller + assert (netkiller / "..") == purelib + assert netkiller.tree_id is Tree.PURELIB + assert str(netkiller) == "netkiller" + assert repr(netkiller) == "BackedTreePath('netkiller', tree_id=Tree.PURELIB)" + assert netkiller.exists() + assert netkiller.is_dir() + assert not netkiller.is_root() + assert netkiller.root_path == purelib + assert sorted(p.name for p in netkiller.iterdir()) == [ + "__init__.py", + "docker.py", + "git.py", + "kubernetes.py", + "nagios.py", + "rsync.py", + "wework.py", + "whiptail.py", + ] + assert netkiller.filedata is None + assert netkiller.path_type is PathType.DIRECTORY + netkiller.verify() + assert netkiller.with_suffix(".txt") == purelib / "netkiller.txt" + for s in [".txt/", "/.txt", ".t/xt", ".", "foo"]: + with pytest.raises(ValueError) as excinfo: + netkiller.with_suffix(s) + assert str(excinfo.value) == f"Invalid suffix: {s!r}" + + initfile = netkiller / "__init__.py" + assert initfile == purelib / "netkiller" / "__init__.py" + assert initfile == purelib / "netkiller/__init__.py" + assert initfile.parts == ("netkiller", "__init__.py") + assert initfile.name == "__init__.py" + assert initfile.parent == netkiller + assert initfile.parents == (netkiller, purelib) + assert initfile.tree_id is Tree.PURELIB + assert str(initfile) == "netkiller/__init__.py" + assert ( + repr(initfile) + == "BackedTreePath('netkiller/__init__.py', tree_id=Tree.PURELIB)" + ) + assert initfile.exists() + assert initfile.is_file() + assert not initfile.is_root() + assert initfile.root_path == purelib + assert ( + initfile.read_text(encoding="utf-8") + == "__version__ = '1.1.0'\n__author__ = 'Neo Chen'\n__all__ = ['docker','.']\n" + ) + assert initfile.filedata == FileData( + size=71, + algorithm="sha256", + digest="4KCG6HW6TFtOVNi6qv2avEnqWto9dGgvRrT78R1aq0c", + ) + assert initfile.path_type is PathType.FILE + initfile.verify() + with pytest.raises(NotDirectoryError): + list(initfile.iterdir()) + with pytest.raises(NotDirectoryError): + initfile / "foo" + with pytest.raises(NotDirectoryError): + initfile / "." + with pytest.raises(NotDirectoryError): + initfile / ".." + + pure_dist = purelib / "netkiller_devops-0.2.6.dist-info" + assert pure_dist.parts == ("netkiller_devops-0.2.6.dist-info",) + assert pure_dist.name == "netkiller_devops-0.2.6.dist-info" + assert pure_dist.parent == purelib + assert pure_dist.tree_id is Tree.PURELIB + assert str(pure_dist) == "netkiller_devops-0.2.6.dist-info" + assert not pure_dist.exists() + with pytest.raises(NoSuchPathError): + pure_dist.path_type + with pytest.raises(NoSuchPathError): + list(pure_dist.iterdir()) + assert sorted(p.name for p in purelib.iterdir()) == ["netkiller"] + + assert whl.filetrees[Tree.ROOT] is purelib + + alltree = whl.filetrees[Tree.ALL] + assert alltree != purelib + assert alltree.tree_id is Tree.ALL + assert sorted(p.name for p in alltree.iterdir()) == [ + "netkiller", + "netkiller_devops-0.2.6.data", + "netkiller_devops-0.2.6.dist-info", + ] + + dist_info = whl.filetrees[Tree.DIST_INFO] + assert sorted(p.name for p in dist_info.iterdir()) == [ + "LICENSE", + "METADATA", + "RECORD", + "WHEEL", + "top_level.txt", + ] + assert (dist_info / "RECORD").exists() + assert (dist_info / "RECORD").filedata is None + + data_dir = whl.filetrees[Tree.DATA] + assert sorted(p.name for p in data_dir.iterdir()) == ["data", "scripts"] + + datalib = whl.filetrees["data"] + assert isinstance(datalib, BackedTreePath) + assert datalib.parts == ("netkiller_devops-0.2.6.data", "data") + assert datalib.name == "" + assert datalib.parent is datalib + assert (datalib / ".") is datalib + assert (datalib / "..") is datalib + assert datalib.tree_id == "data" + assert str(datalib) == "netkiller_devops-0.2.6.data/data" + assert ( + repr(datalib) + == "BackedTreePath('netkiller_devops-0.2.6.data/data', tree_id='data')" + ) + assert datalib.exists() + assert datalib.relative_parts == () + assert datalib.relative_path == "" + assert datalib.is_root() + assert datalib.root_path is datalib + assert sorted(p.name for p in datalib.iterdir()) == ["etc", "libexec", "share"] + + devops = datalib / "share" / "devops.sh" + assert devops.root_path == datalib + assert ( + devops.read_text(encoding="utf-8") == "export PATH=$PATH:/srv/devops/bin\n" + ) + assert devops.parts == ( + "netkiller_devops-0.2.6.data", + "data", + "share", + "devops.sh", + ) + assert str(devops) == "netkiller_devops-0.2.6.data/data/share/devops.sh" + assert ( + repr(devops) + == "BackedTreePath('netkiller_devops-0.2.6.data/data/share/devops.sh', tree_id='data')" + ) + assert devops.relative_parts == ("share", "devops.sh") + assert devops.relative_path == "share/devops.sh" + assert devops.suffix == ".sh" + assert devops.suffixes == [".sh"] + assert devops.stem == "devops" + assert ( + devops.with_name("example.com.ini") == datalib / "share" / "example.com.ini" + ) + assert devops.with_suffix(".tar") == datalib / "share" / "devops.tar" + assert devops.with_suffix(".tar.gz") == datalib / "share" / "devops.tar.gz" + assert devops.with_suffix("") == datalib / "share" / "devops" + + devops2 = datalib.joinpath("libexec", "devops") + mysql_gpg_sh = devops2 / "backup.mysql.gpg.sh" + assert mysql_gpg_sh.name == "backup.mysql.gpg.sh" + assert mysql_gpg_sh.suffix == ".sh" + assert mysql_gpg_sh.suffixes == [".mysql", ".gpg", ".sh"] + assert mysql_gpg_sh.stem == "backup.mysql.gpg" + assert mysql_gpg_sh.with_name("foo") == devops2 / "foo" + assert mysql_gpg_sh.with_stem("foo") == devops2 / "foo.sh" + assert mysql_gpg_sh.with_suffix(".txt") == devops2 / "backup.mysql.gpg.txt" + assert mysql_gpg_sh.with_suffix("") == devops2 / "backup.mysql.gpg" + assert mysql_gpg_sh.match("*.sh") + assert mysql_gpg_sh.match("backup.*.sh") + assert mysql_gpg_sh.match("backup.*") + assert not mysql_gpg_sh.match("*.gpg") + assert mysql_gpg_sh.match("*/*/*.sh") + assert mysql_gpg_sh.match("*.data/data/*/*/*.sh") + assert not mysql_gpg_sh.match("*.data/data/*/*/*/*.sh") + with pytest.raises(ValueError): + mysql_gpg_sh.match("/*.data/data/*/*/*.sh") + with pytest.raises(ValueError): + mysql_gpg_sh.match("") + + with pytest.raises(KeyError): + whl.filetrees["purelib"] + with pytest.raises(KeyError): + whl.filetrees[Tree.PLATLIB] + with pytest.raises(KeyError): + whl.filetrees["platlib"] + with pytest.raises(KeyError): + whl.filetrees["headers"] + with pytest.raises(KeyError): + whl.filetrees["foobar"] + with pytest.raises(KeyError): + whl.filetrees["scripts/"] + with pytest.raises(KeyError): + whl.filetrees["/scripts"] + with pytest.raises(KeyError): + whl.filetrees[""] + with pytest.raises(KeyError): + whl.filetrees["."] + with pytest.raises(KeyError): + whl.filetrees[".."]