Skip to content

Commit

Permalink
Add VerificationMaterials.to_bundle()
Browse files Browse the repository at this point in the history
  • Loading branch information
sethmlarson committed Jul 27, 2023
1 parent 6b40127 commit 0ea3e49
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
78 changes: 75 additions & 3 deletions sigstore/verify/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,25 @@
load_pem_x509_certificate,
)
from pydantic import BaseModel
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import Bundle
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
Bundle,
VerificationMaterial,
)
from sigstore_protobuf_specs.dev.sigstore.common.v1 import (
HashAlgorithm,
HashOutput,
LogId,
MessageSignature,
PublicKeyIdentifier,
X509Certificate,
X509CertificateChain,
)
from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import (
Checkpoint,
InclusionPromise,
InclusionProof,
KindVersion,
TransparencyLogEntry,
)

from sigstore._internal.rekor import RekorClient
Expand Down Expand Up @@ -171,8 +186,6 @@ class VerificationMaterials:
certificate: Certificate
"""
The certificate that attests to and contains the public signing key.
# TODO: Support a certificate chain here, with optional intermediates.
"""

signature: bytes
Expand Down Expand Up @@ -438,3 +451,62 @@ def rekor_entry(self, client: RekorClient) -> LogEntry:
raise InvalidRekorEntry

return entry

def to_bundle(self) -> Bundle:
"""Converts VerificationMaterials into a Bundle. Requires that
the VerificationMaterials have a Rekor entry loaded. This is
the reverse operation of VerificationMaterials.from_bundle()
"""
if not self.has_rekor_entry:
raise ValueError("Must have Rekor entry before converting to a Bundle")
rekor_entry = self._rekor_entry

bundle = Bundle(
media_type="application/vnd.dev.sigstore.bundle+json;version=0.2",
verification_material=VerificationMaterial(
public_key=PublicKeyIdentifier(),
x509_certificate_chain=X509CertificateChain(
certificates=[
X509Certificate(
raw_bytes=self.certificate.public_bytes(Encoding.DER)
)
]
),
tlog_entries=[
TransparencyLogEntry(
log_index=rekor_entry.log_index,
log_id=LogId(key_id=bytes.fromhex(rekor_entry.log_id)),
kind_version=KindVersion(kind="hashedrekord", version="0.0.1"),
integrated_time=rekor_entry.integrated_time,
inclusion_promise=InclusionPromise(
signed_entry_timestamp=base64.b64decode(
rekor_entry.inclusion_promise
)
),
inclusion_proof=InclusionProof(
log_index=rekor_entry.inclusion_proof.log_index,
root_hash=bytes.fromhex(
rekor_entry.inclusion_proof.root_hash
),
tree_size=rekor_entry.inclusion_proof.tree_size,
hashes=[
bytes.fromhex(hash_hex)
for hash_hex in rekor_entry.inclusion_proof.hashes
],
checkpoint=Checkpoint(
envelope=rekor_entry.inclusion_proof.checkpoint
),
),
canonicalized_body=base64.b64decode(rekor_entry.body),
)
],
),
message_signature=MessageSignature(
message_digest=HashOutput(
algorithm=HashAlgorithm.SHA2_256,
digest=self.input_digest,
),
signature=self.signature,
),
)
return bundle
11 changes: 11 additions & 0 deletions test/unit/verify/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
InvalidMaterials,
InvalidRekorEntry,
RekorEntryMissing,
VerificationMaterials,
)


Expand Down Expand Up @@ -86,3 +87,13 @@ def test_verification_materials_offline_no_checkpoint(self, signing_bundle):
InvalidMaterials, match="expected checkpoint in inclusion proof"
):
signing_bundle("bundle_no_checkpoint.txt", offline=True)

def test_verification_materials_to_bundle_round_trip(self, asset, signing_bundle):
bundle = signing_bundle("bundle.txt").to_bundle()

with asset("bundle.txt").open(mode="rb", buffering=0) as io:
round_tripped_bundle = VerificationMaterials.from_bundle(
input_=io, bundle=bundle, offline=False
).to_bundle()

assert bundle == round_tripped_bundle

0 comments on commit 0ea3e49

Please sign in to comment.