Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions src/pkgcheck/checks/network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Various checks that require network support."""

import re
import socket
import traceback
import urllib.request
Expand Down Expand Up @@ -337,3 +338,125 @@ def _get_urls(self, pkg):

def schedule(self, pkgs, *args, **kwargs):
super().schedule(pkgs[-1], *args, **kwargs)


class PyPIAttestationAvailable(results.VersionResult, results.Info):
"""PyPI attestation can be used for the package."""

def __init__(self, filename, **kwargs):
super().__init__(**kwargs)
self.filename = filename

@property
def desc(self):
return (
f"PyPI attestation is available for distfile {self.filename}. "
"Consider adding PYPI_VERIFY_REPO."
)


class PyPIAttestationAvailableCheck(NetworkCheck):
"""Check for available PyPI attestations."""

required_addons = (addons.UseAddon,)

_source = sources.LatestVersionRepoSource

known_results = frozenset(
{
PyPIAttestationAvailable,
SSLCertificateError,
}
)

pypi_uri_re = re.compile(
r"^https://files\.pythonhosted\.org/packages/source/./(?P<project>.+?)/"
r"(?P<filename>[^/]+-(?P<version>[^/]+)\.tar\.gz)$"
)

def __init__(self, *args, use_addon, **kwargs):
super().__init__(*args, **kwargs)
self.fetch_filter = use_addon.get_filter("fetchables")

def _provenance_check(self, filename, url, *, pkg):
"""Check provenance URLs."""
result = None
try:
self.session.head(url, allow_redirects=False)
except RequestError as e:
pass
except SSLError as e:
result = SSLCertificateError(attr, url, str(e), pkg=pkg)
else:
result = PyPIAttestationAvailable(filename, pkg=pkg)
return result

def task_done(self, pkg, filename, future):
"""Determine the result of a given URL verification task."""
exc = future.exception()
if exc is not None:
# traceback can't be pickled so serialize it
tb = traceback.format_exc()
# return exceptions that occurred in threads
self.results_q.put(tb)
return

result = future.result()
if result is not None:
if pkg is not None:
# recreate result object with different pkg target and attr
attrs = result._attrs.copy()
attrs["filename"] = filename
result = result._create(**attrs, pkg=pkg)
self.results_q.put([result])

def _schedule_check(self, filename, url, executor, futures, **kwargs):
"""Schedule verification method to run in a separate thread against a given URL.

Note that this tries to avoid hitting the network for the same URL
twice using a mapping from requested URLs to future objects, adding
result-checking callbacks to the futures of existing URLs.
"""
future = futures.get(url)
if future is None:
future = executor.submit(self._provenance_check, filename, url, **kwargs)
future.add_done_callback(partial(self.task_done, None, None))
futures[url] = future
else:
future.add_done_callback(partial(self.task_done, kwargs["pkg"], filename))

def _get_urls(self, pkg):
# ignore conditionals
fetchables, _ = self.fetch_filter(
(fetchable,),
pkg,
pkg.generate_fetchables(
allow_missing_checksums=True, ignore_unknown_mirrors=True, skip_default_mirrors=True
),
)
for f in fetchables.keys():
for url in f.uri:
if m := self.pypi_uri_re.match(url):
provenance_url = (
f"https://pypi.org/integrity/{m.group('project')}/"
f"v{m.group('version')}/{m.group('filename')}/provenance"
)
yield (f.filename, provenance_url)
return []

def schedule(self, pkg, executor, futures):
"""Schedule verification methods to run in separate threads for all flagged URLs."""

# short-circuit for packages not using pypi.eclass
# (they will be reported separately as missing the eclass)
if "pypi" not in pkg.inherited:
return

# skip ebuilds that enable attestations already
with pkg.ebuild.bytes_fileobj() as f:
for line in f.readlines():
if line.startswith(b"PYPI_VERIFY_REPO="):
return

for filename, url in self._get_urls(pkg):
self._schedule_check(filename, url, executor, futures, pkg=pkg)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"__class__": "PyPIAttestationAvailable", "category": "PyPIAttestationAvailableCheck", "package": "PyPIAttestationAvailable", "version": "0", "filename": "pypiattestationavailable-0.tar.gz"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DIST pypiattestationavailable-0.tar.gz 153310 BLAKE2B b7484cd9bebe912f9c8877c0f09df059130c2dc5c4da8c926f8df7945bcb7b255ccf810ce8cd16a957fb5bca3d1e71c088cd894968641db5dfae1c4c059df836 SHA512 86ff9e1c4b9353b1fbb475c7bb9d2a97bd9db8421ea5190b5a84832930b34cb5b79f8c3da68a5eb8db334f06851ec129cc6611a371e47b7c5de7a615feec5e05
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
inherit pypi

DESCRIPTION="Ebuild with PyPI attestation available"
HOMEPAGE="https://github.com/pkgcore/pkgcheck"

LICENSE="BSD"
SLOT="0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import io

from requests.models import Response

r = Response()
r.status_code = 200
r.reason = "OK"
r.url = "https://pypi.org/integrity/PyPIAttestationAvailable/v0/pypiattestationavailable-0.tar.gz/provenance"
r.raw = io.StringIO()

responses = [r]
1 change: 1 addition & 0 deletions testdata/repos/network/eclass/pypi.eclass
1 change: 1 addition & 0 deletions testdata/repos/network/profiles/categories
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
FetchablesUrlCheck
HomepageUrlCheck
MetadataUrlCheck
PyPIAttestationAvailableCheck
Loading