Skip to content

Commit

Permalink
examples: Use verification results in repo example
Browse files Browse the repository at this point in the history
This is an example of using the verification resutls in a repository.

The only remaining tricky part is in _get_verification_result():
* has to figure out the delegating metadata (something we currently
  cannot provide in repository.Repository for the general case)
* Needs a special case for first root

Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
  • Loading branch information
jku committed Feb 3, 2024
1 parent 26bdbbe commit b8dbe30
Showing 1 changed file with 43 additions and 8 deletions.
51 changes: 43 additions & 8 deletions examples/repository/_simplerepo.py
Expand Up @@ -8,7 +8,7 @@
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
from typing import Dict, List, Union

from securesystemslib import keys
from securesystemslib.signer import Key, Signer, SSlibKey, SSlibSigner
Expand All @@ -20,10 +20,13 @@
Metadata,
MetaFile,
Root,
RootVerificationResult,
Signed,
Snapshot,
TargetFile,
Targets,
Timestamp,
VerificationResult,
)
from tuf.repository import Repository

Expand Down Expand Up @@ -89,6 +92,27 @@ def targets_infos(self) -> Dict[str, MetaFile]:
def snapshot_info(self) -> MetaFile:
return self._snapshot_info

def _get_verification_result(
self, role: str, md: Metadata
) -> Union[VerificationResult, RootVerificationResult]:
"""Verify roles metadata using the existing repository metadata"""
if role == Root.type:
assert isinstance(md.signed, Root)
root = self.root()
if root.version == 0:
# special case first root
root = md.signed
return md.signed.get_root_verification_result(
root, md.signed_bytes, md.signatures
)
if role in [Timestamp.type, Snapshot.type, Targets.type]:
delegator: Signed = self.root()
else:
delegator = self.targets()
return delegator.get_verification_result(
role, md.signed_bytes, md.signatures
)

def open(self, role: str) -> Metadata:
"""Return current Metadata for role from 'storage' (or create a new one)"""

Expand All @@ -112,6 +136,14 @@ def close(self, role: str, md: Metadata) -> None:
for signer in self.signer_cache[role]:
md.sign(signer, append=True)

# Double check that we only write verified metadata
vr = self._get_verification_result(role, md)
if not vr:
raise ValueError(f"Role {role} failed to verify")
keyids = [keyid[:7] for keyid in vr.signed]
verify_str = f"verified with keys [{', '.join(keyids)}]"
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)

# store new metadata version, update version caches
self.role_cache[role].append(md)
if role == "snapshot":
Expand All @@ -130,8 +162,6 @@ def add_target(self, path: str, content: str) -> None:
with self.edit_targets() as targets:
targets.targets[path] = TargetFile.from_data(path, data)

logger.debug("Targets v%d", targets.version)

# update snapshot, timestamp
self.do_snapshot()
self.do_timestamp()
Expand All @@ -157,8 +187,6 @@ def submit_delegation(self, rolename: str, data: bytes) -> bool:
logger.info("Failed to add delegation for %s: %s", rolename, e)
return False

logger.debug("Targets v%d", targets.version)

# update snapshot, timestamp
self.do_snapshot()
self.do_timestamp()
Expand All @@ -177,19 +205,26 @@ def submit_role(self, role: str, data: bytes) -> bool:
if not targetpath.startswith(f"{role}/"):
raise ValueError(f"targets allowed under {role}/ only")

self.targets().verify_delegate(role, md.signed_bytes, md.signatures)

if md.signed.version != self.targets(role).version + 1:
raise ValueError("Invalid version {md.signed.version}")

except (RepositoryError, ValueError) as e:
logger.info("Failed to add new version for %s: %s", role, e)
return False

# Check that we only write verified metadata
vr = self._get_verification_result(role, md)
if not vr:
logger.info("Role %s failed to verify", role)
return False

keyids = [keyid[:7] for keyid in vr.signed]
verify_str = f"verified with keys [{', '.join(keyids)}]"
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)

# Checks passed: Add new delegated role version
self.role_cache[role].append(md)
self._targets_infos[f"{role}.json"].version = md.signed.version
logger.debug("%s v%d", role, md.signed.version)

# To keep it simple, target content is generated from targetpath
for targetpath in md.signed.targets:
Expand Down

0 comments on commit b8dbe30

Please sign in to comment.