diff --git a/pyproject.toml b/pyproject.toml index 650510b..ff7310e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,8 +12,9 @@ classifiers = [ dependencies = [ "cryptography", "packaging", + "pyasn1 ~= 0.6", "pydantic", - "sigstore~=3.3", + "sigstore~=3.4", "sigstore-protobuf-specs", ] requires-python = ">=3.11" diff --git a/src/pypi_attestations/_cli.py b/src/pypi_attestations/_cli.py index cf0172f..56b04ed 100644 --- a/src/pypi_attestations/_cli.py +++ b/src/pypi_attestations/_cli.py @@ -11,7 +11,7 @@ from pydantic import ValidationError from sigstore.oidc import IdentityError, IdentityToken, Issuer from sigstore.sign import SigningContext -from sigstore.verify import Verifier, policy +from sigstore.verify import policy from pypi_attestations import Attestation, AttestationError, VerificationError, __version__ from pypi_attestations._impl import Distribution @@ -256,7 +256,6 @@ def _inspect(args: argparse.Namespace) -> None: def _verify(args: argparse.Namespace) -> None: """Verify the files passed as argument.""" - verifier: Verifier = Verifier.staging() if args.staging else Verifier.production() pol = policy.Identity(identity=args.identity) # Validate that both the attestations and files exists @@ -291,7 +290,7 @@ def _verify(args: argparse.Namespace) -> None: _die(f"Invalid Python package distribution: {e}") try: - attestation.verify(verifier, pol, dist) + attestation.verify(pol, dist, staging=args.staging) except VerificationError as verification_error: _die(f"Verification failed for {input}: {verification_error}") diff --git a/src/pypi_attestations/_impl.py b/src/pypi_attestations/_impl.py index e050061..962c3db 100644 --- a/src/pypi_attestations/_impl.py +++ b/src/pypi_attestations/_impl.py @@ -14,6 +14,8 @@ from cryptography import x509 from cryptography.hazmat.primitives import serialization from packaging.utils import parse_sdist_filename, parse_wheel_filename +from pyasn1.codec.der.decoder import decode as der_decode +from pyasn1.type.char import UTF8String from pydantic import Base64Encoder, BaseModel, ConfigDict, EncodedBytes, Field, field_validator from pydantic.alias_generators import to_snake from pydantic_core import ValidationError @@ -23,15 +25,16 @@ from sigstore.dsse import Error as DsseError from sigstore.models import Bundle, LogEntry from sigstore.sign import ExpiredCertificate, ExpiredIdentity +from sigstore.verify import Verifier, policy from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope from sigstore_protobuf_specs.io.intoto import Signature as _Signature -if TYPE_CHECKING: - from pathlib import Path # pragma: no cover +if TYPE_CHECKING: # pragma: no cover + from pathlib import Path - from sigstore.sign import Signer # pragma: no cover - from sigstore.verify import Verifier # pragma: no cover - from sigstore.verify.policy import VerificationPolicy # pragma: no cover + from cryptography.x509 import Certificate + from sigstore.sign import Signer + from sigstore.verify.policy import VerificationPolicy class Base64EncoderSansNewline(Base64Encoder): @@ -180,14 +183,36 @@ def sign(cls, signer: Signer, dist: Distribution) -> Attestation: def verify( self, - verifier: Verifier, - policy: VerificationPolicy, + identity: VerificationPolicy | Publisher, dist: Distribution, + *, + staging: bool = False, ) -> tuple[str, dict[str, Any] | None]: """Verify against an existing Python distribution. + The `identity` can be an object confirming to + `sigstore.policy.VerificationPolicy` or a `Publisher`, which will be + transformed into an appropriate verification policy. + + By default, Sigstore's production verifier will be used. The + `staging` parameter can be toggled to enable the staging verifier + instead. + On failure, raises an appropriate subclass of `AttestationError`. """ + # NOTE: Can't do `isinstance` with `Publisher` since it's + # a `_GenericAlias`; instead we punch through to the inner + # `_Publisher` union. + if isinstance(identity, _Publisher): + policy = identity._as_policy() # noqa: SLF001 + else: + policy = identity + + if staging: + verifier = Verifier.staging() + else: + verifier = Verifier.production() + bundle = self.to_bundle() try: type_, payload = verifier.verify_dsse(bundle, policy) @@ -364,6 +389,82 @@ class _PublisherBase(BaseModel): kind: str claims: dict[str, Any] | None = None + def _as_policy(self) -> VerificationPolicy: + """Return an appropriate `sigstore.policy.VerificationPolicy` for this publisher.""" + raise NotImplementedError # pragma: no cover + + +class _GitHubTrustedPublisherPolicy: + """A custom sigstore-python policy for verifying against a GitHub-based Trusted Publisher.""" + + def __init__(self, repository: str, workflow: str) -> None: + self._repository = repository + self._workflow = workflow + # This policy must also satisfy some baseline underlying policies: + # the issuer must be GitHub Actions, and the repo must be the one + # we expect. + self._subpolicy = policy.AllOf( + [ + policy.OIDCIssuerV2("https://token.actions.githubusercontent.com"), + policy.OIDCSourceRepositoryURI(f"https://github.com/{self._repository}"), + ] + ) + + @classmethod + def _der_decode_utf8string(cls, der: bytes) -> str: + """Decode a DER-encoded UTF8String.""" + return der_decode(der, UTF8String)[0].decode() # type: ignore[no-any-return] + + def verify(self, cert: Certificate) -> None: + """Verify the certificate against the Trusted Publisher identity.""" + self._subpolicy.verify(cert) + + # This process has a few annoying steps, since a Trusted Publisher + # isn't aware of the commit or ref it runs on, while Sigstore's + # leaf certificate claims (like GitHub Actions' OIDC claims) only + # ever encode the workflow filename (which we need to check) next + # to the ref/sha (which we can't check). + # + # To get around this, we: + # (1) extract the `Build Config URI` extension; + # (2) extract the `Source Repository Digest` and + # `Source Repository Ref` extensions; + # (3) build the *expected* URI with the user-controlled + # Trusted Publisher identity *with* (2) + # (4) compare (1) with (3) + + # (1) Extract the build config URI, which looks like this: + # https://github.com/OWNER/REPO/.github/workflows/WORKFLOW@REF + # where OWNER/REPO and WORKFLOW are controlled by the TP identity, + # and REF is controlled by the certificate's own claims. + build_config_uri = cert.extensions.get_extension_for_oid(policy._OIDC_BUILD_CONFIG_URI_OID) # noqa: SLF001 + raw_build_config_uri = self._der_decode_utf8string(build_config_uri.value.public_bytes()) + + # (2) Extract the source repo digest and ref. + source_repo_digest = cert.extensions.get_extension_for_oid( + policy._OIDC_SOURCE_REPOSITORY_DIGEST_OID # noqa: SLF001 + ) + sha = self._der_decode_utf8string(source_repo_digest.value.public_bytes()) + + source_repo_ref = cert.extensions.get_extension_for_oid( + policy._OIDC_SOURCE_REPOSITORY_REF_OID # noqa: SLF001 + ) + ref = self._der_decode_utf8string(source_repo_ref.value.public_bytes()) + + # (3)-(4): Build the expected URIs and compare them + for suffix in [sha, ref]: + expected = ( + f"https://github.com/{self._repository}/.github/workflows/{self._workflow}@{suffix}" + ) + if raw_build_config_uri == expected: + return + + # If none of the expected URIs matched, the policy fails. + raise sigstore.errors.VerificationError( + f"Certificate's Build Config URI ({build_config_uri}) does not match expected " + f"Trusted Publisher ({self._workflow} @ {self._repository})" + ) + class GitHubPublisher(_PublisherBase): """A GitHub-based Trusted Publisher.""" @@ -388,6 +489,9 @@ class GitHubPublisher(_PublisherBase): action was performed from. """ + def _as_policy(self) -> VerificationPolicy: + return _GitHubTrustedPublisherPolicy(self.repository, self.workflow) + class GitLabPublisher(_PublisherBase): """A GitLab-based Trusted Publisher.""" @@ -406,8 +510,29 @@ class GitLabPublisher(_PublisherBase): The optional environment that the publishing action was performed from. """ + def _as_policy(self) -> VerificationPolicy: + policies: list[VerificationPolicy] = [ + policy.OIDCIssuerV2("https://gitlab.com"), + policy.OIDCSourceRepositoryURI(f"https://gitlab.com/{self.repository}"), + ] + + if not self.claims: + raise VerificationError("refusing to build a policy without claims") + + if ref := self.claims.get("ref"): + policies.append( + policy.OIDCBuildConfigURI( + f"https://gitlab.com/{self.repository}//.gitlab-ci.yml@{ref}" + ) + ) + else: + raise VerificationError("refusing to build a policy without a ref claim") + + return policy.AllOf(policies) + -Publisher = Annotated[GitHubPublisher | GitLabPublisher, Field(discriminator="kind")] +_Publisher = GitHubPublisher | GitLabPublisher +Publisher = Annotated[_Publisher, Field(discriminator="kind")] class AttestationBundle(BaseModel): diff --git a/test/test_impl.py b/test/test_impl.py index 79fc8ee..5fefdd5 100644 --- a/test/test_impl.py +++ b/test/test_impl.py @@ -59,20 +59,19 @@ class TestAttestation: @online def test_roundtrip(self, id_token: IdentityToken) -> None: sign_ctx = SigningContext.staging() - verifier = Verifier.staging() with sign_ctx.signer(id_token) as signer: attestation = impl.Attestation.sign(signer, dist) - attestation.verify(verifier, policy.UnsafeNoOp(), dist) + attestation.verify(policy.UnsafeNoOp(), dist, staging=True) # converting to a bundle and verifying as a bundle also works bundle = attestation.to_bundle() - verifier.verify_dsse(bundle, policy.UnsafeNoOp()) + Verifier.staging().verify_dsse(bundle, policy.UnsafeNoOp()) # converting back also works roundtripped_attestation = impl.Attestation.from_bundle(bundle) - roundtripped_attestation.verify(verifier, policy.UnsafeNoOp(), dist) + roundtripped_attestation.verify(policy.UnsafeNoOp(), dist, staging=True) def test_wrong_predicate_raises_exception(self, monkeypatch: pytest.MonkeyPatch) -> None: def dummy_predicate(self_: StatementBuilder, _: str) -> StatementBuilder: @@ -117,7 +116,6 @@ def get_bundle(*_: Any) -> Bundle: impl.Attestation.sign(signer, dist) def test_verify_github_attested(self) -> None: - verifier = Verifier.production() pol = policy.AllOf( [ policy.OIDCSourceRepositoryURI( @@ -130,29 +128,54 @@ def test_verify_github_attested(self) -> None: bundle = Bundle.from_json(gh_signed_dist_bundle_path.read_bytes()) attestation = impl.Attestation.from_bundle(bundle) - predicate_type, predicate = attestation.verify(verifier, pol, gh_signed_dist) + predicate_type, predicate = attestation.verify(pol, gh_signed_dist) assert predicate_type == "https://docs.pypi.org/attestations/publish/v1" assert predicate == {} + @pytest.mark.parametrize("claims", (None, {}, {"ref": "refs/tags/v0.0.4a2"})) + def test_verify_from_github_publisher(self, claims: dict | None) -> None: + publisher = impl.GitHubPublisher( + repository="trailofbits/pypi-attestation-models", + workflow="release.yml", + claims=claims, + ) + + bundle = Bundle.from_json(gh_signed_dist_bundle_path.read_bytes()) + attestation = impl.Attestation.from_bundle(bundle) + + predicate_type, predicate = attestation.verify(publisher, gh_signed_dist) + assert predicate_type == "https://docs.pypi.org/attestations/publish/v1" + assert predicate == {} + + def test_verify_from_github_publisher_wrong(self) -> None: + publisher = impl.GitHubPublisher( + repository="trailofbits/pypi-attestation-models", + workflow="wrong.yml", + ) + + bundle = Bundle.from_json(gh_signed_dist_bundle_path.read_bytes()) + attestation = impl.Attestation.from_bundle(bundle) + + with pytest.raises(impl.VerificationError, match=r"Build Config URI .+ does not match"): + attestation.verify(publisher, gh_signed_dist) + def test_verify(self) -> None: - verifier = Verifier.staging() # Our checked-in asset has this identity. pol = policy.Identity( identity="william@yossarian.net", issuer="https://github.com/login/oauth" ) attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) - predicate_type, predicate = attestation.verify(verifier, pol, dist) + predicate_type, predicate = attestation.verify(pol, dist, staging=True) assert predicate_type == "https://docs.pypi.org/attestations/publish/v1" assert predicate is None # convert the attestation to a bundle and verify it that way too bundle = attestation.to_bundle() - verifier.verify_dsse(bundle, policy.UnsafeNoOp()) + Verifier.staging().verify_dsse(bundle, policy.UnsafeNoOp()) def test_verify_digest_mismatch(self, tmp_path: Path) -> None: - verifier = Verifier.staging() # Our checked-in asset has this identity. pol = policy.Identity( identity="william@yossarian.net", issuer="https://github.com/login/oauth" @@ -169,10 +192,9 @@ def test_verify_digest_mismatch(self, tmp_path: Path) -> None: with pytest.raises( impl.VerificationError, match="subject does not match distribution digest" ): - attestation.verify(verifier, pol, modified_dist) + attestation.verify(pol, modified_dist, staging=True) def test_verify_filename_mismatch(self, tmp_path: Path) -> None: - verifier = Verifier.staging() # Our checked-in asset has this identity. pol = policy.Identity( identity="william@yossarian.net", issuer="https://github.com/login/oauth" @@ -189,43 +211,48 @@ def test_verify_filename_mismatch(self, tmp_path: Path) -> None: with pytest.raises( impl.VerificationError, match="subject does not match distribution name" ): - attestation.verify(verifier, pol, different_name_dist) + attestation.verify(pol, different_name_dist, staging=True) def test_verify_policy_mismatch(self) -> None: - verifier = Verifier.staging() # Wrong identity. pol = policy.Identity(identity="fake@example.com", issuer="https://github.com/login/oauth") attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match=r"Certificate's SANs do not match"): - attestation.verify(verifier, pol, dist) + attestation.verify(pol, dist, staging=True) - def test_verify_wrong_envelope(self) -> None: - verifier = pretend.stub( - verify_dsse=pretend.call_recorder(lambda bundle, policy: ("fake-type", None)) + def test_verify_wrong_envelope(self, monkeypatch: pytest.MonkeyPatch) -> None: + staging = pretend.call_recorder( + lambda: pretend.stub( + verify_dsse=pretend.call_recorder(lambda bundle, policy: ("fake-type", None)) + ) ) + monkeypatch.setattr(impl.Verifier, "staging", staging) pol = pretend.stub() attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="expected JSON envelope, got fake-type"): - attestation.verify(verifier, pol, dist) + attestation.verify(pol, dist, staging=True) - def test_verify_bad_payload(self) -> None: - verifier = pretend.stub( - verify_dsse=pretend.call_recorder( - lambda bundle, policy: ("application/vnd.in-toto+json", b"invalid json") + def test_verify_bad_payload(self, monkeypatch: pytest.MonkeyPatch) -> None: + staging = pretend.call_recorder( + lambda: pretend.stub( + verify_dsse=pretend.call_recorder( + lambda bundle, policy: ("application/vnd.in-toto+json", b"invalid json") + ) ) ) + monkeypatch.setattr(impl.Verifier, "staging", staging) pol = pretend.stub() attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="invalid statement"): - attestation.verify(verifier, pol, dist) + attestation.verify(pol, dist, staging=True) - def test_verify_too_many_subjects(self) -> None: + def test_verify_too_many_subjects(self, monkeypatch: pytest.MonkeyPatch) -> None: statement = ( StatementBuilder() # noqa: SLF001 .subjects( @@ -239,22 +266,25 @@ def test_verify_too_many_subjects(self) -> None: ._inner.model_dump_json() ) - verifier = pretend.stub( - verify_dsse=pretend.call_recorder( - lambda bundle, policy: ( - "application/vnd.in-toto+json", - statement.encode(), + staging = pretend.call_recorder( + lambda: pretend.stub( + verify_dsse=pretend.call_recorder( + lambda bundle, policy: ( + "application/vnd.in-toto+json", + statement.encode(), + ) ) ) ) + monkeypatch.setattr(impl.Verifier, "staging", staging) pol = pretend.stub() attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="too many subjects in statement"): - attestation.verify(verifier, pol, dist) + attestation.verify(pol, dist, staging=True) - def test_verify_subject_missing_name(self) -> None: + def test_verify_subject_missing_name(self, monkeypatch: pytest.MonkeyPatch) -> None: statement = ( StatementBuilder() # noqa: SLF001 .subjects( @@ -267,22 +297,25 @@ def test_verify_subject_missing_name(self) -> None: ._inner.model_dump_json() ) - verifier = pretend.stub( - verify_dsse=pretend.call_recorder( - lambda bundle, policy: ( - "application/vnd.in-toto+json", - statement.encode(), + staging = pretend.call_recorder( + lambda: pretend.stub( + verify_dsse=pretend.call_recorder( + lambda bundle, policy: ( + "application/vnd.in-toto+json", + statement.encode(), + ) ) ) ) + monkeypatch.setattr(impl.Verifier, "staging", staging) pol = pretend.stub() attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="invalid subject: missing name"): - attestation.verify(verifier, pol, dist) + attestation.verify(pol, dist, staging=True) - def test_verify_subject_invalid_name(self) -> None: + def test_verify_subject_invalid_name(self, monkeypatch: pytest.MonkeyPatch) -> None: statement = ( StatementBuilder() # noqa: SLF001 .subjects( @@ -298,22 +331,25 @@ def test_verify_subject_invalid_name(self) -> None: ._inner.model_dump_json() ) - verifier = pretend.stub( - verify_dsse=pretend.call_recorder( - lambda bundle, policy: ( - "application/vnd.in-toto+json", - statement.encode(), + staging = pretend.call_recorder( + lambda: pretend.stub( + verify_dsse=pretend.call_recorder( + lambda bundle, policy: ( + "application/vnd.in-toto+json", + statement.encode(), + ) ) ) ) + monkeypatch.setattr(impl.Verifier, "staging", staging) pol = pretend.stub() attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="invalid subject: Invalid wheel filename"): - attestation.verify(verifier, pol, dist) + attestation.verify(pol, dist, staging=True) - def test_verify_unknown_attestation_type(self) -> None: + def test_verify_unknown_attestation_type(self, monkeypatch: pytest.MonkeyPatch) -> None: statement = ( StatementBuilder() # noqa: SLF001 .subjects( @@ -335,20 +371,23 @@ def test_verify_unknown_attestation_type(self) -> None: ._inner.model_dump_json() ) - verifier = pretend.stub( - verify_dsse=pretend.call_recorder( - lambda bundle, policy: ( - "application/vnd.in-toto+json", - statement.encode(), + staging = pretend.call_recorder( + lambda: pretend.stub( + verify_dsse=pretend.call_recorder( + lambda bundle, policy: ( + "application/vnd.in-toto+json", + statement.encode(), + ) ) ) ) + monkeypatch.setattr(impl.Verifier, "staging", staging) pol = pretend.stub() attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="unknown attestation type: foo"): - attestation.verify(verifier, pol, dist) + attestation.verify(pol, dist, staging=True) def test_from_bundle_missing_signatures() -> None: @@ -509,6 +548,21 @@ def test_claims(self) -> None: } +class TestGitLabPublisher: + def test_as_policy(self) -> None: + publisher = impl.GitLabPublisher(repository="fake/fake", claims={"ref": "refs/heads/main"}) + pol: policy.AllOf = publisher._as_policy() # type: ignore[assignment] + + assert len(pol._children) == 3 + + @pytest.mark.parametrize("claims", [None, {}, {"something": "unrelated"}, {"ref": None}]) + def test_as_policy_invalid(self, claims: dict | None) -> None: + publisher = impl.GitLabPublisher(repository="fake/fake", claims=claims) + + with pytest.raises(impl.VerificationError, match="refusing to build a policy"): + publisher._as_policy() + + class TestProvenance: def test_version(self) -> None: attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_bytes())