From a6dd2a08b9dfdddd5638a75ccfa3e491c666e265 Mon Sep 17 00:00:00 2001 From: Alexis Date: Fri, 21 Jun 2024 16:00:10 +0200 Subject: [PATCH 01/12] CLI tool (init) --- src/pypi_attestation_models/__main__.py | 7 + src/pypi_attestation_models/_cli.py | 312 ++++++++++++++++++ ...0.1.2-py3-none-any.whl.publish.attestation | 51 +++ test/test_cli.py | 106 ++++++ 4 files changed, 476 insertions(+) create mode 100644 src/pypi_attestation_models/__main__.py create mode 100644 src/pypi_attestation_models/_cli.py create mode 100644 test/assets/rfc8785-0.1.2-py3-none-any.whl.publish.attestation create mode 100644 test/test_cli.py diff --git a/src/pypi_attestation_models/__main__.py b/src/pypi_attestation_models/__main__.py new file mode 100644 index 0000000..b12d902 --- /dev/null +++ b/src/pypi_attestation_models/__main__.py @@ -0,0 +1,7 @@ +"""The pypi-attestation-models entrypoint. +""" + +if __name__ == "__main__": + from pypi_attestation_models._cli import main + + main() diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py new file mode 100644 index 0000000..ef2bff8 --- /dev/null +++ b/src/pypi_attestation_models/_cli.py @@ -0,0 +1,312 @@ +from __future__ import annotations + +import argparse +import json +import logging +from pathlib import Path +from typing import NoReturn + +from cryptography import x509 +from pydantic import ValidationError +from sigstore.oidc import IdentityError, IdentityToken, Issuer, detect_credential +from sigstore.sign import SigningContext +from sigstore.verify import Verifier, policy + +from pypi_attestation_models import __version__ +from pypi_attestation_models._impl import Attestation, VerificationError + +logging.basicConfig( + format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()] +) +_logger = logging.getLogger(__name__) +_logger.setLevel(logging.INFO) + + +def _parser() -> argparse.ArgumentParser: + + parent_parser = argparse.ArgumentParser(add_help=False) + + parent_parser.add_argument( + "-v", + "--verbose", + action="count", + default=0, + help="run with additional debug logging; supply multiple times to increase verbosity", + ) + + parser = argparse.ArgumentParser( + prog="pypi-attestation-models", + description="TODO", + parents=[parent_parser], + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "-V", + "--version", + action="version", + version=f"pypi-attestation-models {__version__}", + ) + + subcommands = parser.add_subparsers( + required=True, + dest="subcommand", + metavar="COMMAND", + help="The operation to perform", + ) + + sign_command = subcommands.add_parser( + name="sign", help="Sign one or more inputs", parents=[parent_parser] + ) + + sign_command.add_argument( + "--identity-token", + type=str, + help="Identity token to use", + required=False, + ) + + sign_command.add_argument( + "--staging", + action="store_true", + default=False, + help="Use the staging environment", + ) + + sign_command.add_argument( + "files", + metavar="FILE", + type=Path, + nargs="+", + help="The file to sign", + ) + + verify_command = subcommands.add_parser( + name="verify", + help="Verify one or more inputs", + parents=[parent_parser], + ) + + verify_command.add_argument( + "--identity", + type=str, + required=True, + help="TODO", + ) + + verify_command.add_argument( + "--staging", + action="store_true", + default=False, + help="Use the staging environment", + ) + + verify_command.add_argument( + "files", + metavar="FILE", + type=Path, + nargs="+", + help="The file to sign", + ) + + inspect_command = subcommands.add_parser( + name="inspect", + help="Inspect one or more inputs", + parents=[parent_parser], + ) + + inspect_command.add_argument( + "--dump-bytes", + action="store_true", + default=False, + help="Dump the bytes of the signature", + ) + + inspect_command.add_argument( + "files", + metavar="FILE", + type=Path, + nargs="+", + help="The file to sign", + ) + + return parser + + +def _die(args: argparse.Namespace, message: str) -> NoReturn: + """An `argparse` helper that fixes up the type hints on our use of + `ArgumentParser.error`. + """ + args._parser.error(message) + raise ValueError("unreachable") + + +def get_identity_token(args: argparse.Namespace) -> IdentityToken: + """Generates an Identity Token + + This method uses the following order of precedence: + - A token passed as an argument + - An ambient credential + - An OAuth-2 flow + """ + # First, check if a token was supplied + if args.identity_token: + return IdentityToken(args.identity_token) + + # Ambient credential detection + oidc_token = detect_credential() + if oidc_token is not None: + return IdentityToken(oidc_token) + + # Finally, OAuth-2 Flow + if args.staging: + issuer = Issuer.staging() + else: + issuer = Issuer.production() + + return issuer.identity_token() + + +def _sign(args: argparse.Namespace) -> None: + """Sign the files passed as argument""" + try: + identity = get_identity_token(args) + _logger.debug(f"Identity: {identity._raw_token}") + except IdentityError as identity_error: + _die(args, f"Failed to detect identity: {identity_error}") + + if args.staging: + signing_ctx = SigningContext.staging() + else: + signing_ctx = SigningContext.production() + + with signing_ctx.signer(identity, cache=True) as signer: + + for file_path in args.files: + _logger.debug(f"Signing {file_path}") + + if not file_path.is_file(): + _die(args, f"{file_path} is not a file.") + + signature_path = Path(f"{file_path}.publish.attestation") + if signature_path.is_file(): + _die(args, f"Signature already exists for {file_path}") + + attestation = Attestation.sign(signer, file_path) + _logger.debug( + "Attestation saved for %s saved in %s", file_path, signature_path + ) + + signature_path.write_text(attestation.model_dump_json()) + + +def _inspect(args: argparse.Namespace) -> None: + """Inspect attestations. + + Warning: The information displayed from the attestations are not verified. + """ + for file_path in args.files: + if not file_path.is_file(): + _die(args, f"{file_path} is not a file.") + + try: + attestation = Attestation.model_validate_json(file_path.read_text()) + except ValidationError as validation_error: + _die(args, f"Invalid attestation ({file_path}): {validation_error}") + + _logger.info( + "Warning: The information displayed below are not verified, they are only " + "displayed. Use the verify command to verify them." + ) + + _logger.info(f"File: {file_path}") + _logger.info(f"Version: {attestation.version}") + + decoded_statement = json.loads(attestation.envelope.statement.decode()) + + _logger.info("Statement:") + _logger.info(f"\tType: {decoded_statement['_type']}") + _logger.info("\tSubject:") + for subject in decoded_statement["subject"]: + _logger.info( + f"\t\t{subject['name']} (digest: {subject['digest']['sha256']})" + ) + + _logger.info(f"\tPredicate type: {decoded_statement['predicateType']}") + _logger.info(f"\tPredicate: {decoded_statement['predicate']}") + + if args.dump_bytes: + _logger.info(f"Signature: {attestation.envelope.signature}") + + # Verification Material + verification_material = attestation.verification_material + + # Certificate + certificate = x509.load_der_x509_certificate(verification_material.certificate) + _logger.info("Certificate:") + _logger.info(f"\tSubject: {certificate.subject.rfc4514_string()}") + _logger.info(f"\tIssuer: {certificate.issuer.rfc4514_string()}") + _logger.info(f"\tValidity: {certificate.not_valid_after_utc}") + + # Transparency Log + _logger.info( + f"Transparency Log ({len(verification_material.transparency_entries)} entries):" + ) + for idx, entry in enumerate(verification_material.transparency_entries): + _logger.info(f"\tLog Index: {entry['logIndex']}") + + +def _verify(args: argparse.Namespace) -> None: + """Verify the files passed as argument.""" + if args.staging: + verifier = Verifier.staging() + else: + verifier = Verifier.production() + + pol = policy.Identity(identity=args.identity) + + for file_path in args.files: + if not file_path.is_file(): + _die(args, f"{file_path} is not a file.") + + attestation_path = Path(f"{file_path}.publish.attestation") + if not attestation_path.is_file(): + _die(args, f"Missing attestation file for {file_path}") + + try: + attestation = Attestation.model_validate_json(attestation_path.read_text()) + except ValidationError as validation_error: + _die(args, f"Invalid attestation ({file_path}): {validation_error}") + + try: + attestation.verify(verifier, pol, file_path) + except VerificationError as verification_error: + _logger.error( + "Verification failed for %s: %s", file_path, verification_error + ) + continue + + _logger.info(f"OK: {attestation_path}") + + +def main() -> None: + parser = _parser() + args: argparse.Namespace = parser.parse_args() + + if args.verbose >= 1: + _logger.setLevel("DEBUG") + if args.verbose >= 2: + logging.getLogger().setLevel("DEBUG") + + _logger.debug(args) + + args._parser = parser + + if args.subcommand == "sign": + _sign(args) + elif args.subcommand == "verify": + _verify(args) + elif args.subcommand == "inspect": + _inspect(args) + else: + _die(args, f"Unknown subcommand: {args.subcommand}") diff --git a/test/assets/rfc8785-0.1.2-py3-none-any.whl.publish.attestation b/test/assets/rfc8785-0.1.2-py3-none-any.whl.publish.attestation new file mode 100644 index 0000000..fff256e --- /dev/null +++ b/test/assets/rfc8785-0.1.2-py3-none-any.whl.publish.attestation @@ -0,0 +1,51 @@ +{ + "version": 1, + "verification_material": { + "certificate": "MIIC0zCCAlmgAwIBAgIUNa1+nVgkOX1xlssDyRyt0DZ6M5UwCgYIKoZIzj0EAwMwNzEVMBMGA1UE\nChMMc2lnc3RvcmUuZGV2MR4wHAYDVQQDExVzaWdzdG9yZS1pbnRlcm1lZGlhdGUwHhcNMjQwNjA2\nMTgzOTA1WhcNMjQwNjA2MTg0OTA1WjAAMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyrm8stLQ\nwPX/MdVS50NZ4gmXEPEh6kYlvhEo079Yk1lMMmMobwFvINC8Lc02kg+03BMscXbM/OKv3Fl1qH9P\nCKOCAXgwggF0MA4GA1UdDwEB/wQEAwIHgDATBgNVHSUEDDAKBggrBgEFBQcDAzAdBgNVHQ4EFgQU\nn98gJQymjI+dFUDEea6CKbQngj4wHwYDVR0jBBgwFoAUcYYwphR8Ym/599b0BRp/X//rb6wwIwYD\nVR0RAQH/BBkwF4EVd2lsbGlhbUB5b3NzYXJpYW4ubmV0MCwGCisGAQQBg78wAQEEHmh0dHBzOi8v\nZ2l0aHViLmNvbS9sb2dpbi9vYXV0aDAuBgorBgEEAYO/MAEIBCAMHmh0dHBzOi8vZ2l0aHViLmNv\nbS9sb2dpbi9vYXV0aDCBiQYKKwYBBAHWeQIEAgR7BHkAdwB1ACswvNxoiMni4dgmKV50H0g5MZYC\n8pwzy15DQP6yrIZ6AAABj+7Y7/YAAAQDAEYwRAIgTWyPyS2CKRm5ZUaTwngfBtrOJozwlIfOOfXH\nyyej0BQCIGCwmYVKhNS7JbUTFeDe90SWNlpwl5YAVDb/2GGFxGNCMAoGCCqGSM49BAMDA2gAMGUC\nMQCxIekmLNdhAS7HVo6CRgqVRht8RiFO6lbyGK4fDuEQOk/MPaBlRhsaUxwejf7jI2kCMCw5AOij\nMvqsXHjZYk7TfRH/079Zy0qEWjD9lurfPiTX9qSQKSiXORvxpk/DQsfTsg==\n", + "transparency_entries": [ + { + "logIndex": "28175749", + "logId": { + "keyId": "0y8wo8MtY5wrdiIFohx7sHeI5oKDpK5vQhGHI6G+pJY=" + }, + "kindVersion": { + "kind": "dsse", + "version": "0.0.1" + }, + "integratedTime": "1717699145", + "inclusionPromise": { + "signedEntryTimestamp": "MEYCIQDx9J86FXVVe/PIoY5jHvlQJ85A6oZ2BiZ6/3ZYe3EeAQIhALl97dZebI/Smm0qQMdVVkbVznthHZCaSClN4djajx3G" + }, + "inclusionProof": { + "logIndex": "28160930", + "rootHash": "zWVcqCxxaF+b1WWfb+xZZlQYK4MdEr81Dd0KzOFu0Ko=", + "treeSize": "28160931", + "hashes": [ + "qDMDpEGtUE3c8CnnlguBb24eYIGo+nv0wGjN2Wdq1V8=", + "r3g45oVhy3zCnIK7lkTsH8Sg1Qdy0kH/CqfaBUE0yok=", + "XAv5fJtrNK1YPZwvB0JIVOOwWiLHk/oWoqzN1xzF9t4=", + "14fYRBMB/6rTWV5Qpei46FU+7rHmaqqLFV/K22kI6sg=", + "KhgfVnUZkrYVk1Je+xSJ3iT5wZMgut38srFhH/iVsWQ=", + "C9LjSdxA96yalX4DOGX/fV0kuhx9LLU1BERodtxE+No=", + "NwfjLTWUBnDymaU+Ca/ykaXOiGNRvIt5/5ZZDzEyTyA=", + "jKHh3ZbaWLoBLn5qZTUpiw9oPlStl/ZSfPmdsHte+AQ=", + "ekhZZrQ/riDDmsvqy3I4gAcbUBcoyoNMChiDAXsTu3Y=", + "oMHAlypWw/lk5Q9JHd9O5UJZ7bdcH6Gzs+zCES7YUKo=", + "Kn3gkyUwY86Ut3fWtexgSLtxteycn2p6k7Kj7qJFEDw=", + "IfPx7HUTjLRrRAy6mhkYP/7aq48i6G+Mk/NQidZPJk8=", + "Edul4W41O3EfxKEEMlX2nW0+GTgCv00nGmcpwhALgVA=", + "rBWB37+HwkTZgDv0rMtGBUoDI0UZqcgDZp48M6CaUlA=" + ], + "checkpoint": { + "envelope": "rekor.sigstage.dev - 8050909264565447525\n28160931\nzWVcqCxxaF+b1WWfb+xZZlQYK4MdEr81Dd0KzOFu0Ko=\n\n— rekor.sigstage.dev 0y8wozBFAiBOHi+eUTSSX6mrNLjQwoKJLum7cpnVpvAb8QwK+DnLngIhAO2170Q0xfbOMwrbF2sM80z1wkYhnlVRidI+/j4/k4JJ\n" + } + }, + "canonicalizedBody": "eyJhcGlWZXJzaW9uIjoiMC4wLjEiLCJraW5kIjoiZHNzZSIsInNwZWMiOnsiZW52ZWxvcGVIYXNoIjp7ImFsZ29yaXRobSI6InNoYTI1NiIsInZhbHVlIjoiZGY1MDk2Njg2NzNkMmY4MjAxOTQ2ZTBmNTliNmFiNzhiZWY0NmYyMTc5NTc5N2EzYjJkMTUyZjc3NmFmYzEyZSJ9LCJwYXlsb2FkSGFzaCI6eyJhbGdvcml0aG0iOiJzaGEyNTYiLCJ2YWx1ZSI6IjcyOTM0Yjc1YzgxODk3ZWE4Yjg4NTk0N2ExOWRjODE4ZWUzNjIwYzUwMzJhZmIzYjc4ODc3ZmJjYmI3MjMwYzEifSwic2lnbmF0dXJlcyI6W3sic2lnbmF0dXJlIjoiTUVVQ0lBdmtSSEZ1K24yenMvNGorVjNjTTIyRFZaSTF6cUs0TmpmbHphdEVRTWZnQWlFQW82VjNaN3RpaE9Ha1lpeXNGMTh4dFpWcWVPdDNyZHdWVmI3Nm1XcDhETWM9IiwidmVyaWZpZXIiOiJMUzB0TFMxQ1JVZEpUaUJEUlZKVVNVWkpRMEZVUlMwdExTMHRDazFKU1VNd2VrTkRRV3h0WjBGM1NVSkJaMGxWVG1FeEsyNVdaMnRQV0RGNGJITnpSSGxTZVhRd1JGbzJUVFZWZDBObldVbExiMXBKZW1vd1JVRjNUWGNLVG5wRlZrMUNUVWRCTVZWRlEyaE5UV015Ykc1ak0xSjJZMjFWZFZwSFZqSk5ValIzU0VGWlJGWlJVVVJGZUZaNllWZGtlbVJIT1hsYVV6RndZbTVTYkFwamJURnNXa2RzYUdSSFZYZElhR05PVFdwUmQwNXFRVEpOVkdkNlQxUkJNVmRvWTA1TmFsRjNUbXBCTWsxVVp6QlBWRUV4VjJwQlFVMUdhM2RGZDFsSUNrdHZXa2w2YWpCRFFWRlpTVXR2V2tsNmFqQkVRVkZqUkZGblFVVjVjbTA0YzNSTVVYZFFXQzlOWkZaVE5UQk9XalJuYlZoRlVFVm9ObXRaYkhab1JXOEtNRGM1V1dzeGJFMU5iVTF2WW5kR2RrbE9RemhNWXpBeWEyY3JNRE5DVFhOaldHSk5MMDlMZGpOR2JERnhTRGxRUTB0UFEwRllaM2RuWjBZd1RVRTBSd3BCTVZWa1JIZEZRaTkzVVVWQmQwbElaMFJCVkVKblRsWklVMVZGUkVSQlMwSm5aM0pDWjBWR1FsRmpSRUY2UVdSQ1owNVdTRkUwUlVablVWVnVPVGhuQ2twUmVXMXFTU3RrUmxWRVJXVmhOa05MWWxGdVoybzBkMGgzV1VSV1VqQnFRa0puZDBadlFWVmpXVmwzY0doU09GbHRMelU1T1dJd1FsSndMMWd2TDNJS1lqWjNkMGwzV1VSV1VqQlNRVkZJTDBKQ2EzZEdORVZXWkRKc2MySkhiR2hpVlVJMVlqTk9lbGxZU25CWlZ6UjFZbTFXTUUxRGQwZERhWE5IUVZGUlFncG5OemgzUVZGRlJVaHRhREJrU0VKNlQyazRkbG95YkRCaFNGWnBURzFPZG1KVE9YTmlNbVJ3WW1rNWRsbFlWakJoUkVGMVFtZHZja0puUlVWQldVOHZDazFCUlVsQ1EwRk5TRzFvTUdSSVFucFBhVGgyV2pKc01HRklWbWxNYlU1MllsTTVjMkl5WkhCaWFUbDJXVmhXTUdGRVEwSnBVVmxMUzNkWlFrSkJTRmNLWlZGSlJVRm5VamRDU0d0QlpIZENNVUZEYzNkMlRuaHZhVTF1YVRSa1oyMUxWalV3U0RCbk5VMWFXVU00Y0hkNmVURTFSRkZRTm5seVNWbzJRVUZCUWdwcUt6ZFpOeTlaUVVGQlVVUkJSVmwzVWtGSloxUlhlVkI1VXpKRFMxSnROVnBWWVZSM2JtZG1RblJ5VDBwdmVuZHNTV1pQVDJaWVNIbDVaV293UWxGRENrbEhRM2R0V1ZaTGFFNVROMHBpVlZSR1pVUmxPVEJUVjA1c2NIZHNOVmxCVmtSaUx6SkhSMFo0UjA1RFRVRnZSME5EY1VkVFRUUTVRa0ZOUkVFeVowRUtUVWRWUTAxUlEzaEpaV3R0VEU1a2FFRlROMGhXYnpaRFVtZHhWbEpvZERoU2FVWlBObXhpZVVkTE5HWkVkVVZSVDJzdlRWQmhRbXhTYUhOaFZYaDNaUXBxWmpkcVNUSnJRMDFEZHpWQlQybHFUWFp4YzFoSWFscFphemRVWmxKSUx6QTNPVnA1TUhGRlYycEVPV3gxY21aUWFWUllPWEZUVVV0VGFWaFBVblo0Q25CckwwUlJjMlpVYzJjOVBRb3RMUzB0TFVWT1JDQkRSVkpVU1VaSlEwRlVSUzB0TFMwdENnPT0ifV19fQ==" + } + ] + }, + "envelope": { + "statement": "eyJfdHlwZSI6Imh0dHBzOi8vaW4tdG90by5pby9TdGF0ZW1lbnQvdjEiLCJzdWJqZWN0IjpbeyJu\nYW1lIjoicmZjODc4NS0wLjEuMi1weTMtbm9uZS1hbnkud2hsIiwiZGlnZXN0Ijp7InNoYTI1NiI6\nImM0ZTkyZTllY2M4MjhiZWYyYWE3ZGJhMWRlOGFjOTgzNTExZjc1MzJhMGRmMTFjNzcwZDM5MDk5\nYTI1Y2YyMDEifX1dLCJwcmVkaWNhdGVUeXBlIjoiaHR0cHM6Ly9kb2NzLnB5cGkub3JnL2F0dGVz\ndGF0aW9ucy9wdWJsaXNoL3YxIiwicHJlZGljYXRlIjpudWxsfQ==\n", + "signature": "MEUCIAvkRHFu+n2zs/4j+V3cM22DVZI1zqK4NjflzatEQMfgAiEAo6V3Z7tihOGkYiysF18xtZVq\neOt3rdwVVb76mWp8DMc=\n" + } +} diff --git a/test/test_cli.py b/test/test_cli.py new file mode 100644 index 0000000..214b4d8 --- /dev/null +++ b/test/test_cli.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import logging +import os +import sys +import tempfile +from pathlib import Path + +import pypi_attestation_models._cli +import pytest +from pypi_attestation_models._cli import _logger, main + +ONLINE_TESTS = "CI" in os.environ or "TEST_INTERACTIVE" in os.environ +online = pytest.mark.skipif(not ONLINE_TESTS, reason="online tests not enabled") + + +_HERE = Path(__file__).parent +_ASSETS = _HERE / "assets" + +artifact_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl" +attestation_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl.publish.attestation" + + +def run_main_with_command(cmd: list[str]) -> None: + sys.argv[1:] = cmd + main() + + +def test_main_verbose_level(monkeypatch) -> None: + def default_sign(*args) -> None: + return + + monkeypatch.setattr(pypi_attestation_models._cli, "_sign", default_sign) + + run_main_with_command(["sign", "-v", ""]) + assert _logger.level == logging.DEBUG + + run_main_with_command(["sign", "-v", "-v", ""]) + assert logging.getLogger().level == logging.DEBUG + + +# @online +# def test_sign_command(id_token: IdentityToken) -> None: +# sys.argv[1:] = ["sign", artifact_path.as_posix()] +# main() +# + + +def test_inspect_command(caplog) -> None: + # Happy path + run_main_with_command(["inspect", attestation_path.as_posix()]) + assert attestation_path.as_posix() in caplog.text + assert "CN=sigstore-intermediate,O=sigstore.dev" in caplog.text + + run_main_with_command(["inspect", "--dump-bytes", attestation_path.as_posix()]) + assert "Signature:" in caplog.text + + +def test_verify_command(caplog) -> None: + # Happy path + run_main_with_command( + [ + "verify", + "--staging", + "--identity", + "william@yossarian.net", + artifact_path.as_posix(), + ] + ) + assert f"OK: {attestation_path.as_posix()}" in caplog.text + + caplog.clear() + + # Failure from the Sigstore environment + run_main_with_command( + [ + "verify", + "--identity", + "william@yossarian.net", + artifact_path.as_posix(), + ] + ) + assert ( + "Verification failed: failed to build chain: unable to get local issuer certificate" + in caplog.text + ) + assert "OK:" not in caplog.text + + caplog.clear() + + # Failure because not an attestation + with pytest.raises(SystemExit) as exc_info: + with tempfile.NamedTemporaryFile(suffix=".publish.attestation") as f: + fake_package_name = Path(f.name.removesuffix(".publish.attestation")) + fake_package_name.touch() + + run_main_with_command( + [ + "verify", + "--staging", + "--identity", + "william@yossarian.net", + fake_package_name.as_posix(), + ] + ) + assert "Verification failed" in caplog.text From 2a152d2563613f912ec3c38f02bfabb6b7edf614 Mon Sep 17 00:00:00 2001 From: Alexis Date: Fri, 21 Jun 2024 18:00:32 +0200 Subject: [PATCH 02/12] Update CLI tool --- src/pypi_attestation_models/__main__.py | 3 +- src/pypi_attestation_models/_cli.py | 32 ++---- test/test_cli.py | 146 +++++++++++++++++++++--- 3 files changed, 145 insertions(+), 36 deletions(-) diff --git a/src/pypi_attestation_models/__main__.py b/src/pypi_attestation_models/__main__.py index b12d902..fc89c0e 100644 --- a/src/pypi_attestation_models/__main__.py +++ b/src/pypi_attestation_models/__main__.py @@ -1,5 +1,4 @@ -"""The pypi-attestation-models entrypoint. -""" +"""The pypi-attestation-models entrypoint.""" if __name__ == "__main__": from pypi_attestation_models._cli import main diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py index ef2bff8..4f3a3ff 100644 --- a/src/pypi_attestation_models/_cli.py +++ b/src/pypi_attestation_models/_cli.py @@ -15,15 +15,12 @@ from pypi_attestation_models import __version__ from pypi_attestation_models._impl import Attestation, VerificationError -logging.basicConfig( - format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()] -) +logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()]) _logger = logging.getLogger(__name__) _logger.setLevel(logging.INFO) def _parser() -> argparse.ArgumentParser: - parent_parser = argparse.ArgumentParser(add_help=False) parent_parser.add_argument( @@ -134,15 +131,16 @@ def _parser() -> argparse.ArgumentParser: def _die(args: argparse.Namespace, message: str) -> NoReturn: - """An `argparse` helper that fixes up the type hints on our use of - `ArgumentParser.error`. + """Handle argument parsing errors and terminate the program. + + Fix up the type hints on our use of `ArgumentParser.error`. """ - args._parser.error(message) + args._parser.error(message) # noqa: SLF001. raise ValueError("unreachable") def get_identity_token(args: argparse.Namespace) -> IdentityToken: - """Generates an Identity Token + """Generate an Identity Token. This method uses the following order of precedence: - A token passed as an argument @@ -168,10 +166,9 @@ def get_identity_token(args: argparse.Namespace) -> IdentityToken: def _sign(args: argparse.Namespace) -> None: - """Sign the files passed as argument""" + """Sign the files passed as argument.""" try: identity = get_identity_token(args) - _logger.debug(f"Identity: {identity._raw_token}") except IdentityError as identity_error: _die(args, f"Failed to detect identity: {identity_error}") @@ -181,7 +178,6 @@ def _sign(args: argparse.Namespace) -> None: signing_ctx = SigningContext.production() with signing_ctx.signer(identity, cache=True) as signer: - for file_path in args.files: _logger.debug(f"Signing {file_path}") @@ -193,9 +189,7 @@ def _sign(args: argparse.Namespace) -> None: _die(args, f"Signature already exists for {file_path}") attestation = Attestation.sign(signer, file_path) - _logger.debug( - "Attestation saved for %s saved in %s", file_path, signature_path - ) + _logger.debug("Attestation saved for %s saved in %s", file_path, signature_path) signature_path.write_text(attestation.model_dump_json()) @@ -228,9 +222,7 @@ def _inspect(args: argparse.Namespace) -> None: _logger.info(f"\tType: {decoded_statement['_type']}") _logger.info("\tSubject:") for subject in decoded_statement["subject"]: - _logger.info( - f"\t\t{subject['name']} (digest: {subject['digest']['sha256']})" - ) + _logger.info(f"\t\t{subject['name']} (digest: {subject['digest']['sha256']})") _logger.info(f"\tPredicate type: {decoded_statement['predicateType']}") _logger.info(f"\tPredicate: {decoded_statement['predicate']}") @@ -281,9 +273,7 @@ def _verify(args: argparse.Namespace) -> None: try: attestation.verify(verifier, pol, file_path) except VerificationError as verification_error: - _logger.error( - "Verification failed for %s: %s", file_path, verification_error - ) + _logger.error("Verification failed for %s: %s", file_path, verification_error) continue _logger.info(f"OK: {attestation_path}") @@ -300,7 +290,7 @@ def main() -> None: _logger.debug(args) - args._parser = parser + args._parser = parser # noqa: SLF001. if args.subcommand == "sign": _sign(args) diff --git a/test/test_cli.py b/test/test_cli.py index 214b4d8..17d5410 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -1,14 +1,18 @@ from __future__ import annotations +import argparse import logging import os +import shutil import sys import tempfile from pathlib import Path import pypi_attestation_models._cli import pytest -from pypi_attestation_models._cli import _logger, main +from pypi_attestation_models._cli import _logger, get_identity_token, main +from pypi_attestation_models._impl import Attestation +from sigstore.oidc import IdentityError, IdentityToken ONLINE_TESTS = "CI" in os.environ or "TEST_INTERACTIVE" in os.environ online = pytest.mark.skipif(not ONLINE_TESTS, reason="online tests not enabled") @@ -22,12 +26,18 @@ def run_main_with_command(cmd: list[str]) -> None: + """Helper method to run the main function with a given command.""" sys.argv[1:] = cmd main() -def test_main_verbose_level(monkeypatch) -> None: - def default_sign(*args) -> None: +def _die_test(_: argparse.Namespace, message: str) -> None: + """Placeholder for the _die function.""" + raise SystemExit(message) + + +def test_main_verbose_level(monkeypatch: pytest.MonkeyPatch) -> None: + def default_sign(_: argparse.Namespace) -> None: return monkeypatch.setattr(pypi_attestation_models._cli, "_sign", default_sign) @@ -38,15 +48,82 @@ def default_sign(*args) -> None: run_main_with_command(["sign", "-v", "-v", ""]) assert logging.getLogger().level == logging.DEBUG + with pytest.raises(SystemExit) as exc_info: + run_main_with_command(["not-a-command"]) + + assert exc_info.value.code == 2 + + +def test_get_identity_token(monkeypatch: pytest.MonkeyPatch) -> None: + # Failure path + monkeypatch.setattr(pypi_attestation_models._cli, "_die", _die_test) + + # Invalid token + with pytest.raises(IdentityError, match="Identity token is malformed"): + get_identity_token(argparse.Namespace(identity_token="INVALID")) + + # Happy paths tests missing -# @online -# def test_sign_command(id_token: IdentityToken) -> None: -# sys.argv[1:] = ["sign", artifact_path.as_posix()] -# main() -# +@online +def test_sign_command(tmp_path: Path, id_token: IdentityToken) -> None: + # Happy path + copied_artifact = tmp_path / artifact_path.with_suffix(".copy.whl").name + shutil.copy(artifact_path, copied_artifact) -def test_inspect_command(caplog) -> None: + run_main_with_command( + [ + "sign", + "--staging", + "--identity-token", + id_token._raw_token, + copied_artifact.as_posix(), + ] + ) + copied_artifact_attestation = Path(f"{copied_artifact}.publish.attestation") + assert copied_artifact_attestation.is_file() + + attestation = Attestation.model_validate_json(copied_artifact_attestation.read_text()) + assert attestation.version + + +@online +def test_sign_command_failures( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, id_token: IdentityToken +) -> None: + monkeypatch.setattr(pypi_attestation_models._cli, "_die", _die_test) + + # Missing file + with pytest.raises(SystemExit, match="not_exist.txt is not a file"): + run_main_with_command( + [ + "sign", + "--staging", + "--identity-token", + id_token._raw_token, + "not_exist.txt", + ] + ) + + # Signature already exists + artifact = tmp_path / artifact_path.with_suffix(".copy2.whl").name + artifact.touch(exist_ok=False) + + artifact_attestation = Path(f"{artifact}.publish.attestation") + artifact_attestation.touch(exist_ok=False) + with pytest.raises(SystemExit, match="Signature already exists"): + run_main_with_command( + [ + "sign", + "--staging", + "--identity-token", + id_token._raw_token, + artifact.as_posix(), + ] + ) + + +def test_inspect_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: # Happy path run_main_with_command(["inspect", attestation_path.as_posix()]) assert attestation_path.as_posix() in caplog.text @@ -55,8 +132,23 @@ def test_inspect_command(caplog) -> None: run_main_with_command(["inspect", "--dump-bytes", attestation_path.as_posix()]) assert "Signature:" in caplog.text + # Failure paths + monkeypatch.setattr(pypi_attestation_models._cli, "_die", _die_test) + + # Failure because not an attestation + with tempfile.NamedTemporaryFile(suffix=".publish.attestation") as f: + fake_package_name = Path(f.name.removesuffix(".publish.attestation")) + fake_package_name.touch() -def test_verify_command(caplog) -> None: + with pytest.raises(SystemExit, match="Invalid attestation"): + run_main_with_command(["inspect", fake_package_name.as_posix()]) + + # Failure because file is missing + with pytest.raises(SystemExit, match="not_a_file.txt is not a file."): + run_main_with_command(["inspect", "not_a_file.txt"]) + + +def test_verify_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: # Happy path run_main_with_command( [ @@ -86,10 +178,14 @@ def test_verify_command(caplog) -> None: ) assert "OK:" not in caplog.text - caplog.clear() + +def test_verify_command_failures(monkeypatch: pytest.MonkeyPatch) -> None: + # Hook the `_die` function to raise directly an exception instead of using the argparse errors + # This helps recover the message raised as an error + monkeypatch.setattr(pypi_attestation_models._cli, "_die", _die_test) # Failure because not an attestation - with pytest.raises(SystemExit) as exc_info: + with pytest.raises(SystemExit, match="Invalid attestation"): with tempfile.NamedTemporaryFile(suffix=".publish.attestation") as f: fake_package_name = Path(f.name.removesuffix(".publish.attestation")) fake_package_name.touch() @@ -103,4 +199,28 @@ def test_verify_command(caplog) -> None: fake_package_name.as_posix(), ] ) - assert "Verification failed" in caplog.text + + # Failure because missing package file + with pytest.raises(SystemExit, match="not_a_file.txt is not a file."): + run_main_with_command( + [ + "verify", + "--staging", + "--identity", + "william@yossarian.net", + "not_a_file.txt", + ] + ) + + # Failure because missing attestation file + with pytest.raises(SystemExit, match="Missing attestation"): + with tempfile.NamedTemporaryFile() as f: + run_main_with_command( + [ + "verify", + "--staging", + "--identity", + "william@yossarian.net", + f.name, + ] + ) From 007f424da7b9ba77964b99b564455533a6679ea3 Mon Sep 17 00:00:00 2001 From: Alexis Date: Fri, 21 Jun 2024 18:02:32 +0200 Subject: [PATCH 03/12] Remove TODOs --- src/pypi_attestation_models/_cli.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py index 4f3a3ff..b2b4330 100644 --- a/src/pypi_attestation_models/_cli.py +++ b/src/pypi_attestation_models/_cli.py @@ -15,7 +15,9 @@ from pypi_attestation_models import __version__ from pypi_attestation_models._impl import Attestation, VerificationError -logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()]) +logging.basicConfig( + format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()] +) _logger = logging.getLogger(__name__) _logger.setLevel(logging.INFO) @@ -33,7 +35,7 @@ def _parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="pypi-attestation-models", - description="TODO", + description="Sign, inspect or verify PEP 740 attestations generated for Python Packages", parents=[parent_parser], formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) @@ -88,7 +90,7 @@ def _parser() -> argparse.ArgumentParser: "--identity", type=str, required=True, - help="TODO", + help="Signer identity", ) verify_command.add_argument( @@ -189,7 +191,9 @@ def _sign(args: argparse.Namespace) -> None: _die(args, f"Signature already exists for {file_path}") attestation = Attestation.sign(signer, file_path) - _logger.debug("Attestation saved for %s saved in %s", file_path, signature_path) + _logger.debug( + "Attestation saved for %s saved in %s", file_path, signature_path + ) signature_path.write_text(attestation.model_dump_json()) @@ -222,7 +226,9 @@ def _inspect(args: argparse.Namespace) -> None: _logger.info(f"\tType: {decoded_statement['_type']}") _logger.info("\tSubject:") for subject in decoded_statement["subject"]: - _logger.info(f"\t\t{subject['name']} (digest: {subject['digest']['sha256']})") + _logger.info( + f"\t\t{subject['name']} (digest: {subject['digest']['sha256']})" + ) _logger.info(f"\tPredicate type: {decoded_statement['predicateType']}") _logger.info(f"\tPredicate: {decoded_statement['predicate']}") @@ -273,7 +279,9 @@ def _verify(args: argparse.Namespace) -> None: try: attestation.verify(verifier, pol, file_path) except VerificationError as verification_error: - _logger.error("Verification failed for %s: %s", file_path, verification_error) + _logger.error( + "Verification failed for %s: %s", file_path, verification_error + ) continue _logger.info(f"OK: {attestation_path}") From 5a66a7cc433884df950fca795b9848177f1cf953 Mon Sep 17 00:00:00 2001 From: Alexis Date: Fri, 21 Jun 2024 18:02:56 +0200 Subject: [PATCH 04/12] Fix linter --- src/pypi_attestation_models/_cli.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py index b2b4330..df6ffc4 100644 --- a/src/pypi_attestation_models/_cli.py +++ b/src/pypi_attestation_models/_cli.py @@ -15,9 +15,7 @@ from pypi_attestation_models import __version__ from pypi_attestation_models._impl import Attestation, VerificationError -logging.basicConfig( - format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()] -) +logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()]) _logger = logging.getLogger(__name__) _logger.setLevel(logging.INFO) @@ -191,9 +189,7 @@ def _sign(args: argparse.Namespace) -> None: _die(args, f"Signature already exists for {file_path}") attestation = Attestation.sign(signer, file_path) - _logger.debug( - "Attestation saved for %s saved in %s", file_path, signature_path - ) + _logger.debug("Attestation saved for %s saved in %s", file_path, signature_path) signature_path.write_text(attestation.model_dump_json()) @@ -226,9 +222,7 @@ def _inspect(args: argparse.Namespace) -> None: _logger.info(f"\tType: {decoded_statement['_type']}") _logger.info("\tSubject:") for subject in decoded_statement["subject"]: - _logger.info( - f"\t\t{subject['name']} (digest: {subject['digest']['sha256']})" - ) + _logger.info(f"\t\t{subject['name']} (digest: {subject['digest']['sha256']})") _logger.info(f"\tPredicate type: {decoded_statement['predicateType']}") _logger.info(f"\tPredicate: {decoded_statement['predicate']}") @@ -279,9 +273,7 @@ def _verify(args: argparse.Namespace) -> None: try: attestation.verify(verifier, pol, file_path) except VerificationError as verification_error: - _logger.error( - "Verification failed for %s: %s", file_path, verification_error - ) + _logger.error("Verification failed for %s: %s", file_path, verification_error) continue _logger.info(f"OK: {attestation_path}") From 5f93fa4ce9ef0db5090d83eba546313ebfbad3b7 Mon Sep 17 00:00:00 2001 From: Alexis Date: Fri, 21 Jun 2024 18:11:53 +0200 Subject: [PATCH 05/12] Update README --- README.md | 52 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1c0f746..7e933e3 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,57 @@ A library to convert between Sigstore Bundles and [PEP 740] Attestation objects python -m pip install pypi-attestation-models ``` -## Usage +## Usage as a command line tool + +````bash +python -m pypi_attestation_models --help +usage: pypi-attestation-models [-h] [-v] [-V] COMMAND ... + +Sign, inspect or verify PEP 740 attestations generated for Python Packages + +positional arguments: + COMMAND The operation to perform + sign Sign one or more inputs + verify Verify one or more inputs + inspect Inspect one or more inputs + +options: + -h, --help show this help message and exit + -v, --verbose run with additional debug logging; supply multiple times to + increase verbosity (default: 0) + -V, --version show program's version number and exit +```` + +### Signing a package + +```bash +# Generate a whl file +make package +python -m pypi_attestation_models sign dist/pypi_attestation_models-*.whl +``` + +_Note_: This will open a browser window to authenticate with the Sigstore OIDC +provider. + +### Inspecting a PEP 740 Attestation + +```bash +python -m pypi_attestation_models inspect dist/pypi_attestation_models-*.whl.publish.attestation +``` +_Warning_: Inspecting does not mean verifying. It only prints the structure of +the attestation. + +### Verifying a PEP 740 Attestation + +```bash +python -m pypi_attestation_models verify --staging \ + --identity william@yossarian.net \ + test/assets/rfc8785-0.1.2-py3-none-any.whl +``` +The attestation present in the test has been generated using the staging +environment of Sigstore and signed by William. + +## Usage as a library See the full API documentation [here]. From 9b1604586bcb7b0bc3315ee7f76ec37183152f11 Mon Sep 17 00:00:00 2001 From: Alexis Date: Tue, 25 Jun 2024 11:38:42 -0400 Subject: [PATCH 06/12] Update CLI tool --- README.md | 6 +- src/pypi_attestation_models/_cli.py | 104 +++++++++++------------ test/test_cli.py | 127 ++++++++++++++++++++-------- 3 files changed, 144 insertions(+), 93 deletions(-) diff --git a/README.md b/README.md index 7e933e3..4064f5c 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ python -m pip install pypi-attestation-models python -m pypi_attestation_models --help usage: pypi-attestation-models [-h] [-v] [-V] COMMAND ... -Sign, inspect or verify PEP 740 attestations generated for Python Packages +Sign, inspect or verify PEP 740 attestations positional arguments: COMMAND The operation to perform @@ -43,8 +43,8 @@ make package python -m pypi_attestation_models sign dist/pypi_attestation_models-*.whl ``` -_Note_: This will open a browser window to authenticate with the Sigstore OIDC -provider. +_Note_: This will open a browser window to authenticate with the Sigstore +OAuth flow. ### Inspecting a PEP 740 Attestation diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py index df6ffc4..de0321d 100644 --- a/src/pypi_attestation_models/_cli.py +++ b/src/pypi_attestation_models/_cli.py @@ -3,18 +3,23 @@ import argparse import json import logging +import typing from pathlib import Path from typing import NoReturn +import sigstore.oidc from cryptography import x509 from pydantic import ValidationError -from sigstore.oidc import IdentityError, IdentityToken, Issuer, detect_credential +from sigstore.oidc import IdentityError, IdentityToken, Issuer from sigstore.sign import SigningContext from sigstore.verify import Verifier, policy from pypi_attestation_models import __version__ from pypi_attestation_models._impl import Attestation, VerificationError +if typing.TYPE_CHECKING: + from collections.abc import Iterable + logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()]) _logger = logging.getLogger(__name__) _logger.setLevel(logging.INFO) @@ -28,12 +33,12 @@ def _parser() -> argparse.ArgumentParser: "--verbose", action="count", default=0, - help="run with additional debug logging; supply multiple times to increase verbosity", + help="Run with additional debug logging; supply multiple times to increase verbosity", ) parser = argparse.ArgumentParser( prog="pypi-attestation-models", - description="Sign, inspect or verify PEP 740 attestations generated for Python Packages", + description="Sign, inspect or verify PEP 740 attestations", parents=[parent_parser], formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) @@ -56,13 +61,6 @@ def _parser() -> argparse.ArgumentParser: name="sign", help="Sign one or more inputs", parents=[parent_parser] ) - sign_command.add_argument( - "--identity-token", - type=str, - help="Identity token to use", - required=False, - ) - sign_command.add_argument( "--staging", action="store_true", @@ -130,13 +128,23 @@ def _parser() -> argparse.ArgumentParser: return parser -def _die(args: argparse.Namespace, message: str) -> NoReturn: - """Handle argument parsing errors and terminate the program. +def _die(message: str) -> NoReturn: + """Handle errors and terminate the program with an error code.""" + _logger.error(message) + raise SystemExit(1) - Fix up the type hints on our use of `ArgumentParser.error`. + +def _validate_files(files: list[Path] | Iterable[Path], should_exists: bool = True) -> None: + """Validate that the list of files exists or not. + + This function exits the program if the condition is not met. """ - args._parser.error(message) # noqa: SLF001. - raise ValueError("unreachable") + for file_path in files: + if file_path.is_file() != should_exists: + if should_exists: + _die(f"{file_path} is not a file.") + else: + _die(f"{file_path} already exists.") def get_identity_token(args: argparse.Namespace) -> IdentityToken: @@ -147,21 +155,13 @@ def get_identity_token(args: argparse.Namespace) -> IdentityToken: - An ambient credential - An OAuth-2 flow """ - # First, check if a token was supplied - if args.identity_token: - return IdentityToken(args.identity_token) - # Ambient credential detection - oidc_token = detect_credential() + oidc_token = sigstore.oidc.detect_credential() if oidc_token is not None: return IdentityToken(oidc_token) - # Finally, OAuth-2 Flow - if args.staging: - issuer = Issuer.staging() - else: - issuer = Issuer.production() - + # Fallback to interactive OAuth-2 Flow + issuer: Issuer = Issuer.staging() if args.staging else Issuer.production() return issuer.identity_token() @@ -170,25 +170,24 @@ def _sign(args: argparse.Namespace) -> None: try: identity = get_identity_token(args) except IdentityError as identity_error: - _die(args, f"Failed to detect identity: {identity_error}") + _die(f"Failed to detect identity: {identity_error}") + + signing_ctx = SigningContext.staging() if args.staging else SigningContext.production() - if args.staging: - signing_ctx = SigningContext.staging() - else: - signing_ctx = SigningContext.production() + # Validates that every file we want to sign exist but none of their attestations + _validate_files(args.files, should_exists=True) + _validate_files( + (Path(f"{file_path}.publish.attestation") for file_path in args.files), + should_exists=False, + ) with signing_ctx.signer(identity, cache=True) as signer: for file_path in args.files: _logger.debug(f"Signing {file_path}") - if not file_path.is_file(): - _die(args, f"{file_path} is not a file.") - signature_path = Path(f"{file_path}.publish.attestation") - if signature_path.is_file(): - _die(args, f"Signature already exists for {file_path}") - attestation = Attestation.sign(signer, file_path) + _logger.debug("Attestation saved for %s saved in %s", file_path, signature_path) signature_path.write_text(attestation.model_dump_json()) @@ -199,14 +198,12 @@ def _inspect(args: argparse.Namespace) -> None: Warning: The information displayed from the attestations are not verified. """ + _validate_files(args.files, should_exists=True) for file_path in args.files: - if not file_path.is_file(): - _die(args, f"{file_path} is not a file.") - try: attestation = Attestation.model_validate_json(file_path.read_text()) except ValidationError as validation_error: - _die(args, f"Invalid attestation ({file_path}): {validation_error}") + _die(f"Invalid attestation ({file_path}): {validation_error}") _logger.info( "Warning: The information displayed below are not verified, they are only " @@ -228,7 +225,7 @@ def _inspect(args: argparse.Namespace) -> None: _logger.info(f"\tPredicate: {decoded_statement['predicate']}") if args.dump_bytes: - _logger.info(f"Signature: {attestation.envelope.signature}") + _logger.info(f"Signature: {attestation.envelope.signature!r}") # Verification Material verification_material = attestation.verification_material @@ -250,25 +247,22 @@ def _inspect(args: argparse.Namespace) -> None: def _verify(args: argparse.Namespace) -> None: """Verify the files passed as argument.""" - if args.staging: - verifier = Verifier.staging() - else: - verifier = Verifier.production() - + verifier: Verifier = Verifier.staging() if args.staging else Verifier.production() pol = policy.Identity(identity=args.identity) - for file_path in args.files: - if not file_path.is_file(): - _die(args, f"{file_path} is not a file.") + # Validate that both the attestations and files exists + _validate_files(args.files, should_exists=True) + _validate_files( + (Path(f"{file_path}.publish.attestation") for file_path in args.files), + should_exists=True, + ) + for file_path in args.files: attestation_path = Path(f"{file_path}.publish.attestation") - if not attestation_path.is_file(): - _die(args, f"Missing attestation file for {file_path}") - try: attestation = Attestation.model_validate_json(attestation_path.read_text()) except ValidationError as validation_error: - _die(args, f"Invalid attestation ({file_path}): {validation_error}") + _die(f"Invalid attestation ({file_path}): {validation_error}") try: attestation.verify(verifier, pol, file_path) @@ -298,5 +292,3 @@ def main() -> None: _verify(args) elif args.subcommand == "inspect": _inspect(args) - else: - _die(args, f"Unknown subcommand: {args.subcommand}") diff --git a/test/test_cli.py b/test/test_cli.py index 17d5410..497a2e5 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -10,9 +10,15 @@ import pypi_attestation_models._cli import pytest -from pypi_attestation_models._cli import _logger, get_identity_token, main +import sigstore.oidc +from pypi_attestation_models._cli import ( + _logger, + _validate_files, + get_identity_token, + main, +) from pypi_attestation_models._impl import Attestation -from sigstore.oidc import IdentityError, IdentityToken +from sigstore.oidc import IdentityError ONLINE_TESTS = "CI" in os.environ or "TEST_INTERACTIVE" in os.environ online = pytest.mark.skipif(not ONLINE_TESTS, reason="online tests not enabled") @@ -31,11 +37,6 @@ def run_main_with_command(cmd: list[str]) -> None: main() -def _die_test(_: argparse.Namespace, message: str) -> None: - """Placeholder for the _die function.""" - raise SystemExit(message) - - def test_main_verbose_level(monkeypatch: pytest.MonkeyPatch) -> None: def default_sign(_: argparse.Namespace) -> None: return @@ -54,19 +55,25 @@ def default_sign(_: argparse.Namespace) -> None: assert exc_info.value.code == 2 +@online def test_get_identity_token(monkeypatch: pytest.MonkeyPatch) -> None: + # Happy paths + identity_token = get_identity_token(argparse.Namespace(staging=True)) + assert identity_token.in_validity_period() + # Failure path - monkeypatch.setattr(pypi_attestation_models._cli, "_die", _die_test) + def return_invalid_token() -> str: + return "invalid-token" + + monkeypatch.setattr(sigstore.oidc, "detect_credential", return_invalid_token) # Invalid token with pytest.raises(IdentityError, match="Identity token is malformed"): - get_identity_token(argparse.Namespace(identity_token="INVALID")) - - # Happy paths tests missing + get_identity_token(argparse.Namespace(staging=True)) @online -def test_sign_command(tmp_path: Path, id_token: IdentityToken) -> None: +def test_sign_command(tmp_path: Path) -> None: # Happy path copied_artifact = tmp_path / artifact_path.with_suffix(".copy.whl").name shutil.copy(artifact_path, copied_artifact) @@ -75,8 +82,6 @@ def test_sign_command(tmp_path: Path, id_token: IdentityToken) -> None: [ "sign", "--staging", - "--identity-token", - id_token._raw_token, copied_artifact.as_posix(), ] ) @@ -89,39 +94,56 @@ def test_sign_command(tmp_path: Path, id_token: IdentityToken) -> None: @online def test_sign_command_failures( - tmp_path: Path, monkeypatch: pytest.MonkeyPatch, id_token: IdentityToken + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture ) -> None: - monkeypatch.setattr(pypi_attestation_models._cli, "_die", _die_test) - # Missing file - with pytest.raises(SystemExit, match="not_exist.txt is not a file"): + with pytest.raises(SystemExit): run_main_with_command( [ "sign", "--staging", - "--identity-token", - id_token._raw_token, "not_exist.txt", ] ) + assert "not_exist.txt is not a file" in caplog.text + caplog.clear() + # Signature already exists artifact = tmp_path / artifact_path.with_suffix(".copy2.whl").name artifact.touch(exist_ok=False) artifact_attestation = Path(f"{artifact}.publish.attestation") artifact_attestation.touch(exist_ok=False) - with pytest.raises(SystemExit, match="Signature already exists"): + with pytest.raises(SystemExit): + run_main_with_command( + [ + "sign", + "--staging", + artifact.as_posix(), + ] + ) + + assert "already exists" in caplog.text + caplog.clear() + + # Invalid token + def return_invalid_token() -> str: + return "invalid-token" + + monkeypatch.setattr(sigstore.oidc, "detect_credential", return_invalid_token) + + with pytest.raises(SystemExit): run_main_with_command( [ "sign", "--staging", - "--identity-token", - id_token._raw_token, artifact.as_posix(), ] ) + assert "Failed to detect identity" in caplog.text + def test_inspect_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: # Happy path @@ -133,20 +155,25 @@ def test_inspect_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.M assert "Signature:" in caplog.text # Failure paths - monkeypatch.setattr(pypi_attestation_models._cli, "_die", _die_test) + caplog.clear() # Failure because not an attestation with tempfile.NamedTemporaryFile(suffix=".publish.attestation") as f: fake_package_name = Path(f.name.removesuffix(".publish.attestation")) fake_package_name.touch() - with pytest.raises(SystemExit, match="Invalid attestation"): + with pytest.raises(SystemExit): run_main_with_command(["inspect", fake_package_name.as_posix()]) + assert "Invalid attestation" in caplog.text + # Failure because file is missing - with pytest.raises(SystemExit, match="not_a_file.txt is not a file."): + caplog.clear() + with pytest.raises(SystemExit): run_main_with_command(["inspect", "not_a_file.txt"]) + assert "not_a_file.txt is not a file." in caplog.text + def test_verify_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: # Happy path @@ -179,13 +206,9 @@ def test_verify_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.Mo assert "OK:" not in caplog.text -def test_verify_command_failures(monkeypatch: pytest.MonkeyPatch) -> None: - # Hook the `_die` function to raise directly an exception instead of using the argparse errors - # This helps recover the message raised as an error - monkeypatch.setattr(pypi_attestation_models._cli, "_die", _die_test) - +def test_verify_command_failures(caplog: pytest.LogCaptureFixture) -> None: # Failure because not an attestation - with pytest.raises(SystemExit, match="Invalid attestation"): + with pytest.raises(SystemExit): with tempfile.NamedTemporaryFile(suffix=".publish.attestation") as f: fake_package_name = Path(f.name.removesuffix(".publish.attestation")) fake_package_name.touch() @@ -199,9 +222,11 @@ def test_verify_command_failures(monkeypatch: pytest.MonkeyPatch) -> None: fake_package_name.as_posix(), ] ) + assert "Invalid attestation" in caplog.text # Failure because missing package file - with pytest.raises(SystemExit, match="not_a_file.txt is not a file."): + caplog.clear() + with pytest.raises(SystemExit): run_main_with_command( [ "verify", @@ -212,8 +237,11 @@ def test_verify_command_failures(monkeypatch: pytest.MonkeyPatch) -> None: ] ) + assert "not_a_file.txt is not a file." in caplog.text + # Failure because missing attestation file - with pytest.raises(SystemExit, match="Missing attestation"): + caplog.clear() + with pytest.raises(SystemExit): with tempfile.NamedTemporaryFile() as f: run_main_with_command( [ @@ -224,3 +252,34 @@ def test_verify_command_failures(monkeypatch: pytest.MonkeyPatch) -> None: f.name, ] ) + + assert "is not a file." in caplog.text + + +def test_validate_files(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None: + # Happy path + file_1_exist = tmp_path / "file1" + file_1_exist.touch() + + file_2_exist = tmp_path / "file2" + file_2_exist.touch() + + _validate_files([file_1_exist, file_2_exist], should_exists=True) + assert True # No exception raised + + file_1_missing = tmp_path / "file3" + file_2_missing = tmp_path / "file4" + _validate_files([file_1_missing, file_2_missing], should_exists=False) + assert True + + # Failure paths + with pytest.raises(SystemExit): + _validate_files([file_1_missing, file_2_exist], should_exists=True) + + assert f"{file_1_missing} is not a file." in caplog.text + + caplog.clear() + with pytest.raises(SystemExit): + _validate_files([file_1_missing, file_2_exist], should_exists=False) + + assert f"{file_2_exist} already exists." in caplog.text From e5686a1fa4a8a6cba9b69b956dcfcba648d6e57e Mon Sep 17 00:00:00 2001 From: Alexis Date: Tue, 25 Jun 2024 15:17:01 -0400 Subject: [PATCH 07/12] Integrate will remarks --- pyproject.toml | 12 ++++++++++-- src/pypi_attestation_models/_cli.py | 17 ++++++++--------- test/test_cli.py | 20 +++++++++++++------- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c62e1bb..ef154f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,10 @@ name = "pypi_attestation_models" [tool.coverage.run] # don't attempt code coverage for the CLI entrypoints -omit = ["src/pypi_attestation_models/_cli.py"] +omit = [ + "src/pypi_attestation_models/_cli.py", + "src/pypi_attestation_models/__main__.py" +] [tool.mypy] mypy_path = "src" @@ -93,6 +96,11 @@ ignore = ["ANN101", "ANN102", "D203", "D213", "COM812", "ISC001"] [tool.interrogate] # don't enforce documentation coverage for packaging, testing, the virtual # environment, or the CLI (which is documented separately). -exclude = ["env", "test", "src/pypi_attestation_models/_cli.py"] +exclude = [ + "env", + "test", + "src/pypi_attestation_models/_cli.py", + "src/pypi_attestation_models/__main__.py" +] ignore-semiprivate = true fail-under = 100 diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py index de0321d..9f87765 100644 --- a/src/pypi_attestation_models/_cli.py +++ b/src/pypi_attestation_models/_cli.py @@ -134,14 +134,14 @@ def _die(message: str) -> NoReturn: raise SystemExit(1) -def _validate_files(files: list[Path] | Iterable[Path], should_exists: bool = True) -> None: +def _validate_files(files: Iterable[Path], should_exist: bool = True) -> None: """Validate that the list of files exists or not. This function exits the program if the condition is not met. """ for file_path in files: - if file_path.is_file() != should_exists: - if should_exists: + if file_path.is_file() != should_exist: + if should_exist: _die(f"{file_path} is not a file.") else: _die(f"{file_path} already exists.") @@ -151,7 +151,6 @@ def get_identity_token(args: argparse.Namespace) -> IdentityToken: """Generate an Identity Token. This method uses the following order of precedence: - - A token passed as an argument - An ambient credential - An OAuth-2 flow """ @@ -175,10 +174,10 @@ def _sign(args: argparse.Namespace) -> None: signing_ctx = SigningContext.staging() if args.staging else SigningContext.production() # Validates that every file we want to sign exist but none of their attestations - _validate_files(args.files, should_exists=True) + _validate_files(args.files, should_exist=True) _validate_files( (Path(f"{file_path}.publish.attestation") for file_path in args.files), - should_exists=False, + should_exist=False, ) with signing_ctx.signer(identity, cache=True) as signer: @@ -198,7 +197,7 @@ def _inspect(args: argparse.Namespace) -> None: Warning: The information displayed from the attestations are not verified. """ - _validate_files(args.files, should_exists=True) + _validate_files(args.files, should_exist=True) for file_path in args.files: try: attestation = Attestation.model_validate_json(file_path.read_text()) @@ -251,10 +250,10 @@ def _verify(args: argparse.Namespace) -> None: pol = policy.Identity(identity=args.identity) # Validate that both the attestations and files exists - _validate_files(args.files, should_exists=True) + _validate_files(args.files, should_exist=True) _validate_files( (Path(f"{file_path}.publish.attestation") for file_path in args.files), - should_exists=True, + should_exist=True, ) for file_path in args.files: diff --git a/test/test_cli.py b/test/test_cli.py index 497a2e5..1c49f2d 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -88,7 +88,9 @@ def test_sign_command(tmp_path: Path) -> None: copied_artifact_attestation = Path(f"{copied_artifact}.publish.attestation") assert copied_artifact_attestation.is_file() - attestation = Attestation.model_validate_json(copied_artifact_attestation.read_text()) + attestation = Attestation.model_validate_json( + copied_artifact_attestation.read_text() + ) assert attestation.version @@ -145,7 +147,9 @@ def return_invalid_token() -> str: assert "Failed to detect identity" in caplog.text -def test_inspect_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: +def test_inspect_command( + caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch +) -> None: # Happy path run_main_with_command(["inspect", attestation_path.as_posix()]) assert attestation_path.as_posix() in caplog.text @@ -175,7 +179,9 @@ def test_inspect_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.M assert "not_a_file.txt is not a file." in caplog.text -def test_verify_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: +def test_verify_command( + caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch +) -> None: # Happy path run_main_with_command( [ @@ -264,22 +270,22 @@ def test_validate_files(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> Non file_2_exist = tmp_path / "file2" file_2_exist.touch() - _validate_files([file_1_exist, file_2_exist], should_exists=True) + _validate_files([file_1_exist, file_2_exist], should_exist=True) assert True # No exception raised file_1_missing = tmp_path / "file3" file_2_missing = tmp_path / "file4" - _validate_files([file_1_missing, file_2_missing], should_exists=False) + _validate_files([file_1_missing, file_2_missing], should_exist=False) assert True # Failure paths with pytest.raises(SystemExit): - _validate_files([file_1_missing, file_2_exist], should_exists=True) + _validate_files([file_1_missing, file_2_exist], should_exist=True) assert f"{file_1_missing} is not a file." in caplog.text caplog.clear() with pytest.raises(SystemExit): - _validate_files([file_1_missing, file_2_exist], should_exists=False) + _validate_files([file_1_missing, file_2_exist], should_exist=False) assert f"{file_2_exist} already exists." in caplog.text From fe9d00571c2f3c38bffbf4afb38ed8fc01ab1639 Mon Sep 17 00:00:00 2001 From: Alexis Date: Tue, 25 Jun 2024 15:19:03 -0400 Subject: [PATCH 08/12] Fix linting issues --- test/test_cli.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/test/test_cli.py b/test/test_cli.py index 1c49f2d..a7c145c 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -88,9 +88,7 @@ def test_sign_command(tmp_path: Path) -> None: copied_artifact_attestation = Path(f"{copied_artifact}.publish.attestation") assert copied_artifact_attestation.is_file() - attestation = Attestation.model_validate_json( - copied_artifact_attestation.read_text() - ) + attestation = Attestation.model_validate_json(copied_artifact_attestation.read_text()) assert attestation.version @@ -147,9 +145,7 @@ def return_invalid_token() -> str: assert "Failed to detect identity" in caplog.text -def test_inspect_command( - caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch -) -> None: +def test_inspect_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: # Happy path run_main_with_command(["inspect", attestation_path.as_posix()]) assert attestation_path.as_posix() in caplog.text @@ -179,9 +175,7 @@ def test_inspect_command( assert "not_a_file.txt is not a file." in caplog.text -def test_verify_command( - caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch -) -> None: +def test_verify_command(caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: # Happy path run_main_with_command( [ From 35fcf991f1d466c28104f0f8674f71eb78255eb6 Mon Sep 17 00:00:00 2001 From: William Woodruff Date: Wed, 26 Jun 2024 11:50:45 -0400 Subject: [PATCH 09/12] _cli: use SAN for subjects in `inspect` Signed-off-by: William Woodruff --- src/pypi_attestation_models/_cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py index 9f87765..4d46afa 100644 --- a/src/pypi_attestation_models/_cli.py +++ b/src/pypi_attestation_models/_cli.py @@ -232,7 +232,8 @@ def _inspect(args: argparse.Namespace) -> None: # Certificate certificate = x509.load_der_x509_certificate(verification_material.certificate) _logger.info("Certificate:") - _logger.info(f"\tSubject: {certificate.subject.rfc4514_string()}") + san = certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName) + _logger.info(f"\tSubjects: {[name.value for name in san.value]}") _logger.info(f"\tIssuer: {certificate.issuer.rfc4514_string()}") _logger.info(f"\tValidity: {certificate.not_valid_after_utc}") From ddffb403406de107321bee94a821bdd60b6c8147 Mon Sep 17 00:00:00 2001 From: William Woodruff Date: Wed, 26 Jun 2024 14:46:47 -0400 Subject: [PATCH 10/12] _cli: move typing to TYPE_CHECKING block Signed-off-by: William Woodruff --- src/pypi_attestation_models/_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py index 4d46afa..d505ec5 100644 --- a/src/pypi_attestation_models/_cli.py +++ b/src/pypi_attestation_models/_cli.py @@ -5,7 +5,6 @@ import logging import typing from pathlib import Path -from typing import NoReturn import sigstore.oidc from cryptography import x509 @@ -19,6 +18,7 @@ if typing.TYPE_CHECKING: from collections.abc import Iterable + from typing import NoReturn logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[logging.StreamHandler()]) _logger = logging.getLogger(__name__) From dffa0fe0f65d7ba3b9060ae61d45f4d2a1872ead Mon Sep 17 00:00:00 2001 From: William Woodruff Date: Wed, 26 Jun 2024 14:54:29 -0400 Subject: [PATCH 11/12] _cli, _impl: clean up exceptions slightly Needs a follow-up. Signed-off-by: William Woodruff --- src/pypi_attestation_models/_cli.py | 8 +++++--- src/pypi_attestation_models/_impl.py | 12 ++++++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/pypi_attestation_models/_cli.py b/src/pypi_attestation_models/_cli.py index d505ec5..2216176 100644 --- a/src/pypi_attestation_models/_cli.py +++ b/src/pypi_attestation_models/_cli.py @@ -13,8 +13,7 @@ from sigstore.sign import SigningContext from sigstore.verify import Verifier, policy -from pypi_attestation_models import __version__ -from pypi_attestation_models._impl import Attestation, VerificationError +from pypi_attestation_models import Attestation, AttestationError, VerificationError, __version__ if typing.TYPE_CHECKING: from collections.abc import Iterable @@ -185,7 +184,10 @@ def _sign(args: argparse.Namespace) -> None: _logger.debug(f"Signing {file_path}") signature_path = Path(f"{file_path}.publish.attestation") - attestation = Attestation.sign(signer, file_path) + try: + attestation = Attestation.sign(signer, file_path) + except AttestationError as e: + _die(f"Failed to sign: {e}") _logger.debug("Attestation saved for %s saved in %s", file_path, signature_path) diff --git a/src/pypi_attestation_models/_impl.py b/src/pypi_attestation_models/_impl.py index 3758f01..259e798 100644 --- a/src/pypi_attestation_models/_impl.py +++ b/src/pypi_attestation_models/_impl.py @@ -84,18 +84,26 @@ class Attestation(BaseModel): @classmethod def sign(cls, signer: Signer, dist: Path) -> Attestation: - """Create an envelope, with signature, from a distribution file.""" + """Create an envelope, with signature, from a distribution file. + + On failure, raises `AttestationError` or an appropriate subclass. + """ with dist.open(mode="rb", buffering=0) as io: # Replace this with `hashlib.file_digest()` once # our minimum supported Python is >=3.11 digest = _sha256_streaming(io).hex() + try: + name = _ultranormalize_dist_filename(dist.name) + except ValueError as e: + raise AttestationError(str(e)) + stmt = ( _StatementBuilder() .subjects( [ _Subject( - name=_ultranormalize_dist_filename(dist.name), + name=name, digest=_DigestSet(root={"sha256": digest}), ) ] From de1b9f73dd5172060cba9673a8ddd0d01b9fcd79 Mon Sep 17 00:00:00 2001 From: William Woodruff Date: Wed, 26 Jun 2024 14:58:49 -0400 Subject: [PATCH 12/12] test_impl: coverage Signed-off-by: William Woodruff --- test/test_impl.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/test_impl.py b/test/test_impl.py index f638d55..b046a28 100644 --- a/test/test_impl.py +++ b/test/test_impl.py @@ -47,6 +47,16 @@ def test_roundtrip(self, id_token: IdentityToken) -> None: roundtripped_attestation = impl.sigstore_to_pypi(bundle) roundtripped_attestation.verify(verifier, policy.UnsafeNoOp(), artifact_path) + def test_sign_invalid_dist_filename(self, tmp_path: Path) -> None: + bad_dist = tmp_path / "invalid-name.tar.gz" + bad_dist.write_bytes(b"junk") + + with pytest.raises( + impl.AttestationError, + match=r"Invalid sdist filename \(invalid version\): invalid-name\.tar\.gz", + ): + impl.Attestation.sign(pretend.stub(), bad_dist) + def test_verify_github_attested(self) -> None: verifier = Verifier.production() pol = policy.AllOf(