diff --git a/CHANGELOG.md b/CHANGELOG.md index c9a419c..d058e55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- `Attestation.sign` now only returns `AttestationError` when failing to sign a distribution file ([#28](https://github.com/trailofbits/pypi-attestations/pull/28)) + ## [0.0.6] ### Added diff --git a/src/pypi_attestations/_impl.py b/src/pypi_attestations/_impl.py index d69ff8c..735494a 100644 --- a/src/pypi_attestations/_impl.py +++ b/src/pypi_attestations/_impl.py @@ -12,13 +12,20 @@ from annotated_types import MinLen # noqa: TCH002 from cryptography import x509 from cryptography.hazmat.primitives import serialization -from packaging.utils import parse_sdist_filename, parse_wheel_filename +from packaging.utils import ( + InvalidSdistFilename, + InvalidWheelFilename, + parse_sdist_filename, + parse_wheel_filename, +) from pydantic import Base64Bytes, BaseModel from pydantic_core import ValidationError from sigstore._utils import _sha256_streaming from sigstore.dsse import Envelope as DsseEnvelope +from sigstore.dsse import Error as DsseError from sigstore.dsse import _DigestSet, _Statement, _StatementBuilder, _Subject from sigstore.models import Bundle, LogEntry +from sigstore.sign import ExpiredCertificate, ExpiredIdentity from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope from sigstore_protobuf_specs.io.intoto import Signature as _Signature @@ -86,34 +93,47 @@ class Attestation(BaseModel): def sign(cls, signer: Signer, dist: Path) -> Attestation: """Create an envelope, with signature, from a distribution file. - On failure, raises `AttestationError` or an appropriate subclass. + On failure, raises `AttestationError`. """ - with dist.open(mode="rb", buffering=0) as io: - # Replace this with `hashlib.file_digest()` once - # our minimum supported Python is >=3.11 - digest = _sha256_streaming(io).hex() + try: + with dist.open(mode="rb", buffering=0) as io: + # Replace this with `hashlib.file_digest()` once + # our minimum supported Python is >=3.11 + digest = _sha256_streaming(io).hex() + except OSError as e: + raise AttestationError(str(e)) try: name = _ultranormalize_dist_filename(dist.name) - except ValueError as e: + except (ValueError, InvalidWheelFilename, InvalidSdistFilename) as e: raise AttestationError(str(e)) - stmt = ( - _StatementBuilder() - .subjects( - [ - _Subject( - name=name, - digest=_DigestSet(root={"sha256": digest}), - ) - ] + try: + stmt = ( + _StatementBuilder() + .subjects( + [ + _Subject( + name=name, + digest=_DigestSet(root={"sha256": digest}), + ) + ] + ) + .predicate_type("https://docs.pypi.org/attestations/publish/v1") + .build() ) - .predicate_type("https://docs.pypi.org/attestations/publish/v1") - .build() - ) - bundle = signer.sign_dsse(stmt) + except DsseError as e: + raise AttestationError(str(e)) - return Attestation.from_bundle(bundle) + try: + bundle = signer.sign_dsse(stmt) + except (ExpiredCertificate, ExpiredIdentity) as e: + raise AttestationError(str(e)) + + try: + return Attestation.from_bundle(bundle) + except ConversionError as e: + raise AttestationError(str(e)) def verify( self, verifier: Verifier, policy: VerificationPolicy, dist: Path diff --git a/test/test_impl.py b/test/test_impl.py index d1b4a02..5a8c832 100644 --- a/test/test_impl.py +++ b/test/test_impl.py @@ -6,6 +6,7 @@ import pretend import pypi_attestations._impl as impl import pytest +import sigstore from sigstore.dsse import _DigestSet, _StatementBuilder, _Subject from sigstore.models import Bundle from sigstore.oidc import IdentityToken @@ -57,6 +58,67 @@ def test_sign_invalid_dist_filename(self, tmp_path: Path) -> None: ): impl.Attestation.sign(pretend.stub(), bad_dist) + def test_sign_raises_attestation_exception( + self, id_token: IdentityToken, tmp_path: Path + ) -> None: + non_existing_file = tmp_path / "invalid-name.tar.gz" + with pytest.raises(impl.AttestationError, match="No such file"): + impl.Attestation.sign(pretend.stub(), non_existing_file) + + bad_wheel_filename = tmp_path / "invalid-name.whl" + bad_wheel_filename.write_bytes(b"junk") + + with pytest.raises(impl.AttestationError, match="Invalid wheel filename"): + impl.Attestation.sign(pretend.stub(), bad_wheel_filename) + + bad_sdist_filename = tmp_path / "invalid_name.tar.gz" + bad_sdist_filename.write_bytes(b"junk") + + with pytest.raises(impl.AttestationError, match="Invalid sdist filename"): + impl.Attestation.sign(pretend.stub(), bad_sdist_filename) + + def test_wrong_predicate_raises_exception( + self, id_token: IdentityToken, monkeypatch: pytest.MonkeyPatch + ) -> None: + def dummy_predicate(self_: _StatementBuilder, _: str) -> _StatementBuilder: + # wrong type here to have a validation error + self_._predicate_type = False + return self_ + + monkeypatch.setattr(sigstore.dsse._StatementBuilder, "predicate_type", dummy_predicate) + with pytest.raises(impl.AttestationError, match="invalid statement"): + impl.Attestation.sign(pretend.stub(), artifact_path) + + def test_expired_certificate( + self, id_token: IdentityToken, monkeypatch: pytest.MonkeyPatch + ) -> None: + def in_validity_period(_: IdentityToken) -> bool: + return False + + monkeypatch.setattr(IdentityToken, "in_validity_period", in_validity_period) + + sign_ctx = SigningContext.staging() + with sign_ctx.signer(id_token, cache=False) as signer: + with pytest.raises(impl.AttestationError): + impl.Attestation.sign(signer, artifact_path) + + def test_multiple_signatures( + self, id_token: IdentityToken, monkeypatch: pytest.MonkeyPatch + ) -> None: + def get_bundle(*_) -> Bundle: # noqa: ANN002 + # Duplicate the signature to trigger a Conversion error + bundle = Bundle.from_json(gh_signed_bundle_path.read_bytes()) + bundle._inner.dsse_envelope.signatures.append(bundle._inner.dsse_envelope.signatures[0]) + return bundle + + monkeypatch.setattr(sigstore.sign.Signer, "sign_dsse", get_bundle) + + sign_ctx = SigningContext.staging() + + with pytest.raises(impl.AttestationError): + with sign_ctx.signer(id_token) as signer: + impl.Attestation.sign(signer, artifact_path) + def test_verify_github_attested(self) -> None: verifier = Verifier.production() pol = policy.AllOf(