Skip to content

Commit

Permalink
Merge pull request #2551 from jku/improve-verification-result
Browse files Browse the repository at this point in the history
Improve verification results
  • Loading branch information
jku committed Feb 5, 2024
2 parents 3ab89c5 + 14edf3d commit be55b87
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 87 deletions.
49 changes: 41 additions & 8 deletions examples/repository/_simplerepo.py
Expand Up @@ -8,7 +8,7 @@
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
from typing import Dict, List, Union

from securesystemslib import keys
from securesystemslib.signer import Key, Signer, SSlibKey, SSlibSigner
Expand All @@ -20,10 +20,13 @@
Metadata,
MetaFile,
Root,
RootVerificationResult,
Signed,
Snapshot,
TargetFile,
Targets,
Timestamp,
VerificationResult,
)
from tuf.repository import Repository

Expand Down Expand Up @@ -89,6 +92,25 @@ def targets_infos(self) -> Dict[str, MetaFile]:
def snapshot_info(self) -> MetaFile:
return self._snapshot_info

def _get_verification_result(
self, role: str, md: Metadata
) -> Union[VerificationResult, RootVerificationResult]:
"""Verify roles metadata using the existing repository metadata"""
if role == Root.type:
assert isinstance(md.signed, Root)
root = self.root()
previous = root if root.version > 0 else None
return md.signed.get_root_verification_result(
previous, md.signed_bytes, md.signatures
)
if role in [Timestamp.type, Snapshot.type, Targets.type]:
delegator: Signed = self.root()
else:
delegator = self.targets()
return delegator.get_verification_result(
role, md.signed_bytes, md.signatures
)

def open(self, role: str) -> Metadata:
"""Return current Metadata for role from 'storage' (or create a new one)"""

Expand All @@ -112,6 +134,14 @@ def close(self, role: str, md: Metadata) -> None:
for signer in self.signer_cache[role]:
md.sign(signer, append=True)

# Double check that we only write verified metadata
vr = self._get_verification_result(role, md)
if not vr:
raise ValueError(f"Role {role} failed to verify")
keyids = [keyid[:7] for keyid in vr.signed]
verify_str = f"verified with keys [{', '.join(keyids)}]"
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)

# store new metadata version, update version caches
self.role_cache[role].append(md)
if role == "snapshot":
Expand All @@ -130,8 +160,6 @@ def add_target(self, path: str, content: str) -> None:
with self.edit_targets() as targets:
targets.targets[path] = TargetFile.from_data(path, data)

logger.debug("Targets v%d", targets.version)

# update snapshot, timestamp
self.do_snapshot()
self.do_timestamp()
Expand All @@ -157,8 +185,6 @@ def submit_delegation(self, rolename: str, data: bytes) -> bool:
logger.info("Failed to add delegation for %s: %s", rolename, e)
return False

logger.debug("Targets v%d", targets.version)

# update snapshot, timestamp
self.do_snapshot()
self.do_timestamp()
Expand All @@ -177,19 +203,26 @@ def submit_role(self, role: str, data: bytes) -> bool:
if not targetpath.startswith(f"{role}/"):
raise ValueError(f"targets allowed under {role}/ only")

self.targets().verify_delegate(role, md.signed_bytes, md.signatures)

if md.signed.version != self.targets(role).version + 1:
raise ValueError("Invalid version {md.signed.version}")

except (RepositoryError, ValueError) as e:
logger.info("Failed to add new version for %s: %s", role, e)
return False

# Check that we only write verified metadata
vr = self._get_verification_result(role, md)
if not vr:
logger.info("Role %s failed to verify", role)
return False

keyids = [keyid[:7] for keyid in vr.signed]
verify_str = f"verified with keys [{', '.join(keyids)}]"
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)

# Checks passed: Add new delegated role version
self.role_cache[role].append(md)
self._targets_infos[f"{role}.json"].version = md.signed.version
logger.debug("%s v%d", role, md.signed.version)

# To keep it simple, target content is generated from targetpath
for targetpath in md.signed.targets:
Expand Down
213 changes: 162 additions & 51 deletions tests/test_api.py
Expand Up @@ -13,7 +13,7 @@
import sys
import tempfile
import unittest
from copy import copy
from copy import copy, deepcopy
from datetime import datetime, timedelta
from typing import Any, ClassVar, Dict, Optional

Expand Down Expand Up @@ -41,6 +41,7 @@
Metadata,
MetaFile,
Root,
RootVerificationResult,
Signature,
Snapshot,
SuccinctRoles,
Expand All @@ -55,7 +56,7 @@
logger = logging.getLogger(__name__)


# pylint: disable=too-many-public-methods
# pylint: disable=too-many-public-methods,too-many-statements
class TestMetadata(unittest.TestCase):
"""Tests for public API of all classes in 'tuf/api/metadata.py'."""

Expand Down Expand Up @@ -471,95 +472,205 @@ def test_signed_verify_delegate(self) -> None:
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
)

def test_verification_result(self) -> None:
vr = VerificationResult(3, {"a": None}, {"b": None})
self.assertEqual(vr.missing, 2)
self.assertFalse(vr.verified)
self.assertFalse(vr)

# Add a signature
vr.signed["c"] = None
self.assertEqual(vr.missing, 1)
self.assertFalse(vr.verified)
self.assertFalse(vr)

# Add last missing signature
vr.signed["d"] = None
self.assertEqual(vr.missing, 0)
self.assertTrue(vr.verified)
self.assertTrue(vr)

# Add one more signature
vr.signed["e"] = None
self.assertEqual(vr.missing, 0)
self.assertTrue(vr.verified)
self.assertTrue(vr)

def test_root_verification_result(self) -> None:
vr1 = VerificationResult(3, {"a": None}, {"b": None})
vr2 = VerificationResult(1, {"c": None}, {"b": None})

vr = RootVerificationResult(vr1, vr2)
self.assertEqual(vr.signed, {"a": None, "c": None})
self.assertEqual(vr.unsigned, {"b": None})
self.assertFalse(vr.verified)
self.assertFalse(vr)

vr1.signed["c"] = None
vr1.signed["f"] = None
self.assertEqual(vr.signed, {"a": None, "c": None, "f": None})
self.assertEqual(vr.unsigned, {"b": None})
self.assertTrue(vr.verified)
self.assertTrue(vr)

def test_signed_get_verification_result(self) -> None:
# Setup: Load test metadata and keys
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)
initial_root_keyids = root.signed.roles[Root.type].keyids
self.assertEqual(len(initial_root_keyids), 1)
key1_id = initial_root_keyids[0]
key2 = self.keystore[Timestamp.type]
key2_id = key2["keyid"]

key1_id = root.signed.roles[Root.type].keyids[0]
key1 = root.signed.get_key(key1_id)

key2_id = root.signed.roles[Timestamp.type].keyids[0]
key2 = root.signed.get_key(key2_id)
priv_key2 = self.keystore[Timestamp.type]

key3_id = "123456789abcdefg"
key4 = self.keystore[Snapshot.type]
key4_id = key4["keyid"]
priv_key4 = self.keystore[Snapshot.type]
key4_id = priv_key4["keyid"]

# Test: 1 authorized key, 1 valid signature
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key1_id})
self.assertEqual(result.unsigned, set())
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {})

# Test: 2 authorized keys, 1 invalid signature
# Adding a key, i.e. metadata change, invalidates existing signature
root.signed.add_key(
SSlibKey.from_securesystemslib_key(key2),
Root.type,
)
root.signed.add_key(key2, Root.type)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertFalse(result.verified)
self.assertEqual(result.signed, set())
self.assertEqual(result.unsigned, {key1_id, key2_id})
self.assertFalse(result)
self.assertEqual(result.signed, {})
self.assertEqual(result.unsigned, {key1_id: key1, key2_id: key2})

# Test: 3 authorized keys, 1 invalid signature, 1 key missing key data
# Adding a keyid w/o key, fails verification the same as no signature
# or an invalid signature for that key
# Adding a keyid w/o key, fails verification but this key is not listed
# in unsigned
root.signed.roles[Root.type].keyids.append(key3_id)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertFalse(result.verified)
self.assertEqual(result.signed, set())
self.assertEqual(result.unsigned, {key1_id, key2_id, key3_id})
self.assertFalse(result)
self.assertEqual(result.signed, {})
self.assertEqual(result.unsigned, {key1_id: key1, key2_id: key2})

# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
# key missing key data
root.sign(SSlibSigner(key2), append=True)
root.sign(SSlibSigner(priv_key2), append=True)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key2_id})
self.assertEqual(result.unsigned, {key1_id, key3_id})
self.assertTrue(result)
self.assertEqual(result.signed, {key2_id: key2})
self.assertEqual(result.unsigned, {key1_id: key1})

# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
# key missing key data, 1 ignored unrelated signature
root.sign(SSlibSigner(key4), append=True)
root.sign(SSlibSigner(priv_key4), append=True)
self.assertEqual(
set(root.signatures.keys()), {key1_id, key2_id, key4_id}
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key2_id})
self.assertEqual(result.unsigned, {key1_id, key3_id})
self.assertTrue(result)
self.assertEqual(result.signed, {key2_id: key2})
self.assertEqual(result.unsigned, {key1_id: key1})

# See test_signed_verify_delegate for more related tests ...

def test_signed_verification_result_union(self) -> None:
# Test all possible "unions" (AND) of "verified" field
data = [
(True, True, True),
(True, False, False),
(False, True, False),
(False, False, False),
]
def test_root_get_root_verification_result(self) -> None:
# Setup: Load test metadata and keys
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)

key1_id = root.signed.roles[Root.type].keyids[0]
key1 = root.signed.get_key(key1_id)

key2_id = root.signed.roles[Timestamp.type].keyids[0]
key2 = root.signed.get_key(key2_id)
priv_key2 = self.keystore[Timestamp.type]

for a_part, b_part, ab_part in data:
self.assertEqual(
VerificationResult(a_part, set(), set()).union(
VerificationResult(b_part, set(), set())
),
VerificationResult(ab_part, set(), set()),
priv_key4 = self.keystore[Snapshot.type]

# Test: Verify with no previous root version
result = root.signed.get_root_verification_result(
None, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {})

# Test: Verify with other root that is not version N-1
prev_root: Metadata[Root] = deepcopy(root)
with self.assertRaises(ValueError):
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)

# Test exemplary union (|) of "signed" and "unsigned" fields
a = VerificationResult(True, {"1"}, {"2"})
b = VerificationResult(True, {"3"}, {"4"})
ab = VerificationResult(True, {"1", "3"}, {"2", "4"})
self.assertEqual(a.union(b), ab)
# Test: Verify with previous root
prev_root.signed.version -= 1
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {})

# Test: Add a signer to previous root (threshold still 1)
prev_root.signed.add_key(key2, Root.type)
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {key2_id: key2})

# Test: Increase threshold in previous root
prev_root.signed.roles[Root.type].threshold += 1
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertFalse(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {key2_id: key2})

# Test: Sign root with both keys
root.sign(SSlibSigner(priv_key2), append=True)
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
self.assertEqual(result.unsigned, {})

# Test: Sign root with an unrelated key
root.sign(SSlibSigner(priv_key4), append=True)
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
self.assertEqual(result.unsigned, {})

# Test: Remove key1 from previous root
prev_root.signed.revoke_key(key1_id, Root.type)
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertFalse(result)
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
self.assertEqual(result.unsigned, {})

# Test: Lower threshold in previous root
prev_root.signed.roles[Root.type].threshold -= 1
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
self.assertEqual(result.unsigned, {})

def test_key_class(self) -> None:
# Test if from_securesystemslib_key removes the private key from keyval
Expand Down

0 comments on commit be55b87

Please sign in to comment.