Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Add Core Metadata API (as a dataclass) #498

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
991f08f
Add implementation of core metadata as a dataclass
abravalheri Jan 16, 2022
db51418
Make Requirement hashable
abravalheri Jan 16, 2022
b421d1d
Add tests for core metadata
abravalheri Jan 16, 2022
598288a
Remove positonal only arguments
abravalheri Jan 16, 2022
e8b51d2
Remove custom mapping class in metadata
abravalheri Jan 18, 2022
800844c
Remove micro-optmisation (cache) from metadata
abravalheri Jan 18, 2022
3aaa73e
Use flags in examples for metadata tests
abravalheri Jan 18, 2022
50c13b7
Improve grammar for comment in metadata
abravalheri Jan 18, 2022
147592b
Improve condition in metadata
abravalheri Jan 18, 2022
3c89525
Remove unused instructions in metadata
abravalheri Jan 18, 2022
0b7442c
Preserve order fields are defined in metadata
abravalheri Jan 18, 2022
7dce2a3
Make email headers don't leak into the metadata
abravalheri Jan 18, 2022
10439f0
Ensure serialized metadata is not empty
abravalheri Jan 18, 2022
dd57033
Rename _read_pkg_info to _parse_pkg_info
abravalheri Jan 18, 2022
991d65f
Use a plain tuple to represent emails in metadata
abravalheri Jan 18, 2022
dda204b
Revert "Make Requirement hashable"
abravalheri Jan 18, 2022
f1625d3
Change metadata tests to not rely on __hash__/__eq__
abravalheri Jan 18, 2022
95638b3
Use inspect.clean doc to unescape description in core metadata
abravalheri Jan 31, 2022
31f1d22
Remove __post_init__
abravalheri Jan 31, 2022
adb8ec1
Extract url parsing in its own method
abravalheri Feb 2, 2022
dd6db44
Fix dynamic validation
abravalheri Feb 2, 2022
f020115
Separate required fields validation
abravalheri Feb 2, 2022
bac39bc
Remove unused _setattr
abravalheri Feb 2, 2022
1ed4123
Remove 'frozen' behaviour from metadata
abravalheri Feb 2, 2022
1ca3573
Unify {to/from}_pkg_info with {to/from}_dist_info_metadata
abravalheri Feb 2, 2022
73eef47
Use type_extensions in TYPE_CHECKING guard
abravalheri Feb 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ _build/
build/
dist/
htmlcov/
tests/downloads
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.812
rev: v0.931
hooks:
- id: mypy
exclude: '^(docs|tasks|tests)|setup\.py'
Expand Down
385 changes: 385 additions & 0 deletions packaging/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,385 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

import dataclasses
import math
from email import message_from_bytes
from email.headerregistry import Address, AddressHeader
from email.message import EmailMessage
from email.policy import EmailPolicy, Policy
from functools import reduce
from inspect import cleandoc
from itertools import chain
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Collection,
Dict,
Iterable,
Iterator,
Mapping,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)

from .requirements import InvalidRequirement, Requirement
from .specifiers import SpecifierSet
from .version import Version

T = TypeVar("T", bound="CoreMetadata")
A = TypeVar("A")
B = TypeVar("B")

if TYPE_CHECKING: # pragma: no cover
from typing_extensions import Literal

NormalizedDynamicFields = Literal[
"platform",
"summary",
"description",
"keywords",
"home-page",
"author",
"author-email",
"license",
"supported-platform",
"download-url",
"classifier",
"maintainer",
"maintainer-email",
"requires-dist",
"requires-python",
"requires-external",
"project-url",
"provides-extra",
"provides-dist",
"obsoletes-dist",
"description-content-type",
]
else:
NormalizedDynamicFields = str


def _normalize_field_name_for_dynamic(field: str) -> NormalizedDynamicFields:
"""Normalize a metadata field name that is acceptable in `dynamic`.

The field name will be normalized to lower-case. JSON field names are
also acceptable and will be translated accordingly.

"""
return cast(NormalizedDynamicFields, field.lower().replace("_", "-"))


def _field_name(field: str) -> str:
"""Equivalent field name in the class representing metadata"""
return field.lower().replace("-", "_")


# In the following, comparison is disabled because currently `Requirement`
# objects are unhashable/not-comparable.


@dataclasses.dataclass(eq=False)
class CoreMetadata:
"""
Core metadata for Python packages, represented as an immutable
:obj:`dataclass <dataclasses>`.

Specification: https://packaging.python.org/en/latest/specifications/core-metadata/

Attribute names follow :pep:`PEP 566's JSON guidelines
<566#json-compatible-metadata>`.
"""

# 1.0
name: str
version: Union[Version, None] = None
brettcannon marked this conversation as resolved.
Show resolved Hide resolved
platform: Collection[str] = ()
summary: str = ""
description: str = ""
keywords: Collection[str] = ()
home_page: str = ""
author: str = ""
author_email: Collection[Tuple[Union[str, None], str]] = ()
license: str = ""
# license_file: Collection[str] = () # not standard yet
# 1.1
supported_platform: Collection[str] = ()
download_url: str = ""
classifier: Collection[str] = ()
# 1.2
maintainer: str = ""
maintainer_email: Collection[Tuple[Union[str, None], str]] = ()
requires_dist: Collection[Requirement] = ()
requires_python: SpecifierSet = dataclasses.field(default_factory=SpecifierSet)
requires_external: Collection[str] = ()
project_url: Mapping[str, str] = dataclasses.field(default_factory=dict)
provides_extra: Collection[str] = ()
provides_dist: Collection[Requirement] = ()
obsoletes_dist: Collection[Requirement] = ()
# 2.1
description_content_type: str = ""
# 2.2
dynamic: Collection[NormalizedDynamicFields] = ()

@property
def metadata_version(self) -> str:
"""
The data structure is always compatible with the latest approved
version of the spec, even when parsing files that use previous versions.
"""
return "2.2"

@classmethod
def _fields(cls) -> Collection[str]:
return [f.name for f in dataclasses.fields(cls)]

@classmethod
def _process_attrs(
cls, attrs: Iterable[Tuple[str, Any]]
) -> Iterable[Tuple[str, Any]]:
"""Transform input data to the matching attribute types."""

_as_set = (cls._MULTIPLE_USE | {"keywords"}) - {"project_url"}
_available_fields = cls._fields()

for field, value in attrs:
if field == "version":
yield ("version", Version(value))
elif field == "keywords":
yield (field, set(value.split(",")))
elif field == "requires_python":
yield (field, cls._parse_requires_python(value))
elif field == "project_url":
yield (field, cls._parse_url(value))
elif field == "dynamic":
values = (_normalize_field_name_for_dynamic(f) for f in value)
yield (field, set(values))
elif field.endswith("email"):
yield (field, set(cls._parse_emails(value.strip())))
elif field.endswith("dist"):
yield (field, {cls._parse_req(v) for v in value})
elif field in _as_set:
yield (field, set(value))
elif field in _available_fields:
yield (field, value)

@classmethod
def _parse_pkg_info(cls, pkg_info: bytes) -> Iterable[Tuple[str, Any]]:
"""Parse PKG-INFO data."""

msg = message_from_bytes(pkg_info, EmailMessage, policy=cls._PARSING_POLICY)
info = cast(EmailMessage, msg)
has_description = False

for key in info.keys():
field = _field_name(key)
if field in cls._UPDATES:
field = cls._UPDATES[field]

value = str(info.get(key)) # email.header.Header.__str__ handles encoding

if field in {"keywords", "summary"} or field.endswith("email"):
yield (field, cls._ensure_single_line(value))
elif field == "description":
has_description = True
yield (field, cls._unescape_description(value))
elif field in cls._MULTIPLE_USE:
yield (field, (str(v) for v in info.get_all(key)))
else:
yield (field, value)

if not has_description:
yield ("description", str(info.get_payload(decode=True), "utf-8"))

@classmethod
def from_pkg_info(
cls: Type[T], pkg_info: bytes, *, allow_unfilled_dynamic: bool = True
) -> T:
"""Parse PKG-INFO data."""

attrs = cls._process_attrs(cls._parse_pkg_info(pkg_info))
obj = cls(**dict(attrs))
obj._validate(allow_unfilled_dynamic)

return obj

def to_pkg_info(self, *, allow_unfilled_dynamic: bool = True) -> bytes:
"""Generate PKG-INFO data."""

self._validate(allow_unfilled_dynamic)

info = EmailMessage(self._PARSING_POLICY)
info.add_header("Metadata-Version", self.metadata_version)
# Use `sorted` in collections to improve reproducibility
for field in self._fields():
value = getattr(self, field)
if not value:
continue
key = self._canonical_field(field)
if field in "keywords":
info.add_header(key, ",".join(sorted(value)))
elif field.endswith("email"):
_emails = (self._serialize_email(v) for v in value if any(v))
emails = ", ".join(sorted(v for v in _emails if v))
if emails:
info.add_header(key, emails)
elif field == "project_url":
for kind in sorted(value):
info.add_header(key, f"{kind}, {value[kind]}")
elif field == "description":
info.set_payload(bytes(value, "utf-8"))
elif field in self._MULTIPLE_USE:
for single_value in sorted(str(v) for v in value):
info.add_header(key, single_value)
else:
info.add_header(key, str(value))

return info.as_bytes()

# --- Auxiliary Methods and Properties ---
# Not part of the API, but can be overwritten by subclasses
# (useful when providing a prof-of-concept for new PEPs)

_MANDATORY: ClassVar[Set[str]] = {"name", "version"}
_NOT_DYNAMIC: ClassVar[Set[str]] = {"metadata_version", "dynamic"} | _MANDATORY
_MULTIPLE_USE: ClassVar[Set[str]] = {
"dynamic",
"platform",
"supported_platform",
"classifier",
"requires_dist",
"requires_external",
"project_url",
"provides_extra",
"provides_dist",
"obsoletes_dist",
}
_UPDATES: ClassVar[Dict[str, str]] = {
"requires": "requires_dist", # PEP 314 => PEP 345
"provides": "provides_dist", # PEP 314 => PEP 345
"obsoletes": "obsoletes_dist", # PEP 314 => PEP 345
}
_PARSING_POLICY: ClassVar[Policy] = EmailPolicy(max_line_length=math.inf, utf8=True)

@classmethod
def _canonical_field(cls, field: str) -> str:
words = _normalize_field_name_for_dynamic(field).split("-")
ucfirst = "-".join(w[0].upper() + w[1:] for w in words)
replacements = {"Url": "URL", "Email": "email", "Page": "page"}.items()
return reduce(lambda acc, x: acc.replace(x[0], x[1]), replacements, ucfirst)

@classmethod
def _ensure_single_line(cls, value: str) -> str:
"""Existing distributions might include metadata with fields such as 'keywords'
or 'summary' showing up as multiline strings.
"""
return " ".join(value.splitlines())

@classmethod
def _parse_requires_python(cls, value: str) -> SpecifierSet:
if value and value[0].isnumeric():
value = f"=={value}"
return SpecifierSet(value)

@classmethod
def _parse_req(cls, value: str) -> Requirement:
try:
return Requirement(value)
except InvalidRequirement:
# Some old examples in PEPs use "()" around versions without an operator
# e.g.: `Provides: xmltools (1.3)`
name, _, rest = value.strip().partition("(")
value = f"{name}(=={rest}"
return Requirement(value)

@classmethod
def _parse_url(cls, value: Iterable[str]) -> Dict[str, str]:
urls = {}
for url in value:
key, _, value = url.partition(",")
urls[key.strip()] = value.strip()
return urls

@classmethod
def _parse_emails(cls, value: str) -> Iterator[Tuple[Union[str, None], str]]:
if value == "UNKNOWN":
return
address_list = AddressHeader.value_parser(value)
for mailbox in address_list.all_mailboxes:
yield (mailbox.display_name, mailbox.addr_spec)

@classmethod
def _serialize_email(cls, value: Tuple[Union[str, None], str]) -> str:
return str(Address(value[0] or "", addr_spec=value[1]))

@classmethod
def _unescape_description(cls, content: str) -> str:
"""Reverse RFC-822 escaping by removing leading whitespaces from content."""
lines = cleandoc(content).splitlines()
if not lines:
return ""

continuation = (line.lstrip("|") for line in lines[1:])
return "\n".join(chain(lines[:1], continuation))

def _validate(self, allow_unfilled_dynamic: bool) -> bool:
self._validate_required_fields()
self._validate_dynamic()
if not allow_unfilled_dynamic:
self._validate_unfilled_dynamic()

return True

def _validate_dynamic(self) -> bool:
for item in self.dynamic:
field = _field_name(item)
if not hasattr(self, field):
raise InvalidCoreMetadataField(item)
if field in self._NOT_DYNAMIC:
raise InvalidDynamicField(item)

return True

def _validate_unfilled_dynamic(self) -> bool:
unresolved = [k for k in self.dynamic if not getattr(self, _field_name(k))]
if unresolved:
raise UnfilledDynamicFields(unresolved)

return True

def _validate_required_fields(self) -> bool:
missing_fields = [k for k in self._MANDATORY if not getattr(self, k)]
if missing_fields:
raise MissingRequiredFields(missing_fields)

return True


class InvalidCoreMetadataField(ValueError):
def __init__(self, field: str):
super().__init__(f"{field!r} is not a valid core metadata field")


class InvalidDynamicField(ValueError):
def __init__(self, field: str):
super().__init__(f"{field!r} cannot be dynamic")


class UnfilledDynamicFields(ValueError):
def __init__(self, fields: Iterable[str]):
given = ", ".join(repr(f) for f in fields)
msg = f"Unfilled dynamic fields not allowed in this context (given: {given})"
super().__init__(msg)


class MissingRequiredFields(ValueError):
def __init__(self, fields: Iterable[str]):
missing = ", ".join(fields)
super().__init__(f"Required fields are missing: {missing}")
Loading