Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ classifiers = [
dependencies = [
"cryptography",
"packaging",
"pyasn1 ~= 0.6",
"pydantic",
"sigstore~=3.3",
"sigstore~=3.4",
"sigstore-protobuf-specs",
]
requires-python = ">=3.11"
Expand Down
5 changes: 2 additions & 3 deletions src/pypi_attestations/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pydantic import ValidationError
from sigstore.oidc import IdentityError, IdentityToken, Issuer
from sigstore.sign import SigningContext
from sigstore.verify import Verifier, policy
from sigstore.verify import policy

from pypi_attestations import Attestation, AttestationError, VerificationError, __version__
from pypi_attestations._impl import Distribution
Expand Down Expand Up @@ -256,7 +256,6 @@ def _inspect(args: argparse.Namespace) -> None:

def _verify(args: argparse.Namespace) -> None:
"""Verify the files passed as argument."""
verifier: Verifier = Verifier.staging() if args.staging else Verifier.production()
pol = policy.Identity(identity=args.identity)

# Validate that both the attestations and files exists
Expand Down Expand Up @@ -291,7 +290,7 @@ def _verify(args: argparse.Namespace) -> None:
_die(f"Invalid Python package distribution: {e}")

try:
attestation.verify(verifier, pol, dist)
attestation.verify(pol, dist, staging=args.staging)
except VerificationError as verification_error:
_die(f"Verification failed for {input}: {verification_error}")

Expand Down
141 changes: 133 additions & 8 deletions src/pypi_attestations/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from packaging.utils import parse_sdist_filename, parse_wheel_filename
from pyasn1.codec.der.decoder import decode as der_decode
from pyasn1.type.char import UTF8String
from pydantic import Base64Encoder, BaseModel, ConfigDict, EncodedBytes, Field, field_validator
from pydantic.alias_generators import to_snake
from pydantic_core import ValidationError
Expand All @@ -23,15 +25,16 @@
from sigstore.dsse import Error as DsseError
from sigstore.models import Bundle, LogEntry
from sigstore.sign import ExpiredCertificate, ExpiredIdentity
from sigstore.verify import Verifier, policy
from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope
from sigstore_protobuf_specs.io.intoto import Signature as _Signature

if TYPE_CHECKING:
from pathlib import Path # pragma: no cover
if TYPE_CHECKING: # pragma: no cover
from pathlib import Path

from sigstore.sign import Signer # pragma: no cover
from sigstore.verify import Verifier # pragma: no cover
from sigstore.verify.policy import VerificationPolicy # pragma: no cover
from cryptography.x509 import Certificate
from sigstore.sign import Signer
from sigstore.verify.policy import VerificationPolicy


class Base64EncoderSansNewline(Base64Encoder):
Expand Down Expand Up @@ -180,14 +183,36 @@ def sign(cls, signer: Signer, dist: Distribution) -> Attestation:

def verify(
self,
verifier: Verifier,
policy: VerificationPolicy,
identity: VerificationPolicy | Publisher,
dist: Distribution,
*,
staging: bool = False,
) -> tuple[str, dict[str, Any] | None]:
"""Verify against an existing Python distribution.

The `identity` can be an object confirming to
`sigstore.policy.VerificationPolicy` or a `Publisher`, which will be
transformed into an appropriate verification policy.

By default, Sigstore's production verifier will be used. The
`staging` parameter can be toggled to enable the staging verifier
instead.

On failure, raises an appropriate subclass of `AttestationError`.
"""
# NOTE: Can't do `isinstance` with `Publisher` since it's
# a `_GenericAlias`; instead we punch through to the inner
# `_Publisher` union.
if isinstance(identity, _Publisher):
policy = identity._as_policy() # noqa: SLF001
else:
policy = identity

if staging:
verifier = Verifier.staging()
else:
verifier = Verifier.production()

bundle = self.to_bundle()
try:
type_, payload = verifier.verify_dsse(bundle, policy)
Expand Down Expand Up @@ -364,6 +389,82 @@ class _PublisherBase(BaseModel):
kind: str
claims: dict[str, Any] | None = None

def _as_policy(self) -> VerificationPolicy:
"""Return an appropriate `sigstore.policy.VerificationPolicy` for this publisher."""
raise NotImplementedError # pragma: no cover


class _GitHubTrustedPublisherPolicy:
"""A custom sigstore-python policy for verifying against a GitHub-based Trusted Publisher."""

def __init__(self, repository: str, workflow: str) -> None:
self._repository = repository
self._workflow = workflow
# This policy must also satisfy some baseline underlying policies:
# the issuer must be GitHub Actions, and the repo must be the one
# we expect.
self._subpolicy = policy.AllOf(
[
policy.OIDCIssuerV2("https://token.actions.githubusercontent.com"),
policy.OIDCSourceRepositoryURI(f"https://github.com/{self._repository}"),
]
)

@classmethod
def _der_decode_utf8string(cls, der: bytes) -> str:
"""Decode a DER-encoded UTF8String."""
return der_decode(der, UTF8String)[0].decode() # type: ignore[no-any-return]

def verify(self, cert: Certificate) -> None:
"""Verify the certificate against the Trusted Publisher identity."""
self._subpolicy.verify(cert)

# This process has a few annoying steps, since a Trusted Publisher
# isn't aware of the commit or ref it runs on, while Sigstore's
# leaf certificate claims (like GitHub Actions' OIDC claims) only
# ever encode the workflow filename (which we need to check) next
# to the ref/sha (which we can't check).
#
# To get around this, we:
# (1) extract the `Build Config URI` extension;
# (2) extract the `Source Repository Digest` and
# `Source Repository Ref` extensions;
# (3) build the *expected* URI with the user-controlled
# Trusted Publisher identity *with* (2)
# (4) compare (1) with (3)

# (1) Extract the build config URI, which looks like this:
# https://github.com/OWNER/REPO/.github/workflows/WORKFLOW@REF
# where OWNER/REPO and WORKFLOW are controlled by the TP identity,
# and REF is controlled by the certificate's own claims.
build_config_uri = cert.extensions.get_extension_for_oid(policy._OIDC_BUILD_CONFIG_URI_OID) # noqa: SLF001
raw_build_config_uri = self._der_decode_utf8string(build_config_uri.value.public_bytes())

# (2) Extract the source repo digest and ref.
source_repo_digest = cert.extensions.get_extension_for_oid(
policy._OIDC_SOURCE_REPOSITORY_DIGEST_OID # noqa: SLF001
)
sha = self._der_decode_utf8string(source_repo_digest.value.public_bytes())

source_repo_ref = cert.extensions.get_extension_for_oid(
policy._OIDC_SOURCE_REPOSITORY_REF_OID # noqa: SLF001
)
ref = self._der_decode_utf8string(source_repo_ref.value.public_bytes())

# (3)-(4): Build the expected URIs and compare them
for suffix in [sha, ref]:
expected = (
f"https://github.com/{self._repository}/.github/workflows/{self._workflow}@{suffix}"
)
if raw_build_config_uri == expected:
return

# If none of the expected URIs matched, the policy fails.
raise sigstore.errors.VerificationError(
f"Certificate's Build Config URI ({build_config_uri}) does not match expected "
f"Trusted Publisher ({self._workflow} @ {self._repository})"
)


class GitHubPublisher(_PublisherBase):
"""A GitHub-based Trusted Publisher."""
Expand All @@ -388,6 +489,9 @@ class GitHubPublisher(_PublisherBase):
action was performed from.
"""

def _as_policy(self) -> VerificationPolicy:
return _GitHubTrustedPublisherPolicy(self.repository, self.workflow)


class GitLabPublisher(_PublisherBase):
"""A GitLab-based Trusted Publisher."""
Expand All @@ -406,8 +510,29 @@ class GitLabPublisher(_PublisherBase):
The optional environment that the publishing action was performed from.
"""

def _as_policy(self) -> VerificationPolicy:
policies: list[VerificationPolicy] = [
policy.OIDCIssuerV2("https://gitlab.com"),
policy.OIDCSourceRepositoryURI(f"https://gitlab.com/{self.repository}"),
]

if not self.claims:
raise VerificationError("refusing to build a policy without claims")

if ref := self.claims.get("ref"):
policies.append(
policy.OIDCBuildConfigURI(
f"https://gitlab.com/{self.repository}//.gitlab-ci.yml@{ref}"
)
)
else:
raise VerificationError("refusing to build a policy without a ref claim")

return policy.AllOf(policies)


Publisher = Annotated[GitHubPublisher | GitLabPublisher, Field(discriminator="kind")]
_Publisher = GitHubPublisher | GitLabPublisher
Publisher = Annotated[_Publisher, Field(discriminator="kind")]


class AttestationBundle(BaseModel):
Expand Down
Loading