Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Add ngclient root key rotation tests #1635

Merged
merged 3 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def __init__(self):
self.signed_roots: List[bytes] = []

# signers are used on-demand at fetch time to sign metadata
self.signers: Dict[str, List[SSlibSigner]] = {}
# keys are roles, values are dicts of {keyid: signer}
self.signers: Dict[str, Dict[str, SSlibSigner]] = {}

# target downloads are served from this dict
self.target_files: Dict[str, RepositoryTarget] = {}
Expand Down Expand Up @@ -135,10 +136,16 @@ def all_targets(self) -> Iterator[Tuple[str, Targets]]:
for role, md in self.md_delegates.items():
yield role, md.signed

def create_key(self) -> Tuple[Key, SSlibSigner]:
@staticmethod
def create_key() -> Tuple[Key, SSlibSigner]:
sslib_key = generate_ed25519_key()
return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key)

def add_signer(self, role:str, signer: SSlibSigner):
if role not in self.signers:
self.signers[role] = {}
self.signers[role][signer.key_dict["keyid"]] = signer

def _initialize(self):
"""Setup a minimal valid repository"""

Expand All @@ -159,18 +166,16 @@ def _initialize(self):
for role in TOP_LEVEL_ROLE_NAMES:
key, signer = self.create_key()
root.add_key(role, key)
# store the private key
if role not in self.signers:
self.signers[role] = []
self.signers[role].append(signer)
self.add_signer(role, signer)

self.md_root = Metadata(root, OrderedDict())
self.publish_root()

def publish_root(self):
"""Sign and store a new serialized version of root"""
self.md_root.signatures.clear()
for signer in self.signers["root"]:
self.md_root.sign(signer)
for signer in self.signers["root"].values():
self.md_root.sign(signer, append=True)

self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
logger.debug("Published root v%d", self.root.version)
Expand Down Expand Up @@ -242,7 +247,7 @@ def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes:
raise FetcherHTTPError(f"Unknown {role} version {version}", 404)

md.signatures.clear()
for signer in self.signers[role]:
for signer in self.signers[role].values():
md.sign(signer, append=True)

logger.debug(
Expand Down Expand Up @@ -320,9 +325,7 @@ def add_delegation(
# By default add one new key for the role
key, signer = self.create_key()
delegator.add_key(role.name, key)
if role.name not in self.signers:
self.signers[role.name] = []
self.signers[role.name].append(signer)
self.add_signer(role.name, signer)

# Add metadata for the role
self.md_delegates[role.name] = Metadata(targets, OrderedDict())
Expand Down
211 changes: 211 additions & 0 deletions tests/test_updater_key_rotations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#!/usr/bin/env python

# Copyright 2021, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0

"""Test ngclient Updater key rotation handling"""

from dataclasses import dataclass
from typing import List, Optional, Type
import os
import sys
import tempfile
import unittest

from securesystemslib.signer import SSlibSigner

from tuf.api.metadata import Key
from tuf.exceptions import UnsignedMetadataError
from tuf.ngclient import Updater

from tests import utils
from tests.repository_simulator import RepositorySimulator
from tests.utils import run_sub_tests_with_dataset


@dataclass
class RootVersion:
keys: List[int]
threshold: int
signatures: List[int]
result: Optional[Type[Exception]] = None


class TestUpdaterKeyRotations(unittest.TestCase):
"""Test ngclient root rotation handling"""
# set dump_dir to trigger repository state dumps
dump_dir: Optional[str] = None

def setUp(self) -> None:
self.sim = None
self.metadata_dir = None
self.subtest_count = 0
# pylint: disable-next=consider-using-with
self.temp_dir = tempfile.TemporaryDirectory()

# Pre-create a bunch of keys and signers
self.keys: List[Key] = []
self.signers: List[SSlibSigner] = []
for _ in range(10):
key, signer = RepositorySimulator.create_key()
self.keys.append(key)
self.signers.append(signer)

def tearDown(self) -> None:
self.temp_dir.cleanup()

def setup_subtest(self) -> None:
self.subtest_count += 1

# Setup repository for subtest: make sure no roots have been published
self.sim = RepositorySimulator()
self.sim.signed_roots.clear()
self.sim.root.version = 0

if self.dump_dir is not None:
# create subtest dumpdir
name = f"{self.id().split('.')[-1]}-{self.subtest_count}"
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)

def _run_refresh(self) -> None:
"""Create new updater, run refresh"""
if self.sim.dump_dir is not None:
self.sim.write()

# bootstrap with initial root
self.metadata_dir = tempfile.mkdtemp(dir=self.temp_dir.name)
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
f.write(self.sim.signed_roots[0])

updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
fetcher=self.sim,
)
updater.refresh()

root_rotation_cases = {
"1-of-1 key rotation": [
RootVersion([1], 1, [1]),
RootVersion([2], 1, [2, 1]),
RootVersion([2], 1, [2]),
],
"1-of-1 key rotation, unused signatures": [
RootVersion([1], 1, [3, 1, 4]),
RootVersion([2], 1, [3, 2, 1, 4]),
RootVersion([2], 1, [3, 2, 4]),
],
"1-of-1 key rotation fail: not signed with old key": [
RootVersion([1], 1, [1]),
RootVersion([2], 1, [2, 3, 4], UnsignedMetadataError),
],
"1-of-1 key rotation fail: not signed with new key": [
RootVersion([1], 1, [1]),
RootVersion([2], 1, [1, 3, 4], UnsignedMetadataError),
],
"3-of-5, sign with different keycombos": [
RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]),
RootVersion([0, 1, 2, 3, 4], 3, [0, 4, 1]),
RootVersion([0, 1, 2, 3, 4], 3, [0, 1, 3]),
RootVersion([0, 1, 2, 3, 4], 3, [0, 1, 3]),
],
"3-of-5, one key rotated": [
RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]),
RootVersion([0, 1, 3, 4, 5], 3, [0, 4, 1]),
],
"3-of-5, one key rotate fails: not signed with 3 new keys": [
RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]),
RootVersion([0, 1, 3, 4, 5], 3, [0, 2, 4], UnsignedMetadataError),
],
"3-of-5, one key rotate fails: not signed with 3 old keys": [
RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]),
RootVersion([0, 1, 3, 4, 5], 3, [0, 4, 5], UnsignedMetadataError),
],
"3-of-5, one key rotated, with intermediate step": [
RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]),
RootVersion([0, 1, 3, 4, 5], 3, [0, 2, 4, 5]),
RootVersion([0, 1, 3, 4, 5], 3, [0, 4, 5]),
],
"3-of-5, all keys rotated, with intermediate step": [
RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]),
RootVersion([5, 6, 7, 8, 9], 3, [0, 2, 4, 5, 6, 7]),
RootVersion([5, 6, 7, 8, 9], 3, [5, 6, 7]),
],
"1-of-3 threshold increase to 2-of-3": [
RootVersion([1, 2, 3], 1, [1]),
RootVersion([1, 2, 3], 2, [1, 2]),
],
"1-of-3 threshold bump to 2-of-3 fails: new threshold not reached": [
RootVersion([1, 2, 3], 1, [1]),
RootVersion([1, 2, 3], 2, [2], UnsignedMetadataError),
],
"2-of-3 threshold decrease to 1-of-3": [
RootVersion([1, 2, 3], 2, [1, 2]),
RootVersion([1, 2, 3], 1, [1, 2]),
RootVersion([1, 2, 3], 1, [1]),
],
"2-of-3 threshold decr. to 1-of-3 fails: old threshold not reached": [
RootVersion([1, 2, 3], 2, [1, 2]),
RootVersion([1, 2, 3], 1, [1], UnsignedMetadataError),
],
"1-of-2 threshold increase to 2-of-2": [
RootVersion([1], 1, [1]),
RootVersion([1, 2], 2, [1, 2]),
],
}

@run_sub_tests_with_dataset(root_rotation_cases)
def test_root_rotation(self, root_versions: List[RootVersion]) -> None:
"""Test Updater.refresh() with various sequences of root updates

Each RootVersion in the list describes root keys and signatures of a
remote root metadata version. As an example:
RootVersion([1,2,3], 2, [1,2])
defines a root that contains keys 1, 2 and 3 with threshold 2. The
metadata is signed with keys 1 and 2.

Assert that refresh() result is expected and that local root on disk is
the expected one after all roots have been loaded from remote using the
standard client update workflow.
"""
self.setup_subtest()

# Publish all remote root versions defined in root_versions
for rootver in root_versions:
# clear root keys, signers
self.sim.root.roles["root"].keyids.clear()
self.sim.signers["root"].clear()

self.sim.root.roles["root"].threshold = rootver.threshold
for i in rootver.keys:
self.sim.root.add_key("root", self.keys[i])
for i in rootver.signatures:
self.sim.add_signer("root", self.signers[i])
self.sim.root.version += 1
self.sim.publish_root()

# run client workflow, assert success/failure
expected_result = root_versions[-1].result
if expected_result is None:
self._run_refresh()
expected_local_root = self.sim.signed_roots[-1]
else:
# failure expected: local root should be the root before last
with self.assertRaises(expected_result):
self._run_refresh()
expected_local_root = self.sim.signed_roots[-2]

# assert local root on disk is expected
with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f:
self.assertEqual(f.read(), expected_local_root)


if __name__ == "__main__":
if "--dump" in sys.argv:
TestUpdaterKeyRotations.dump_dir = tempfile.mkdtemp()
print(f"Repository dumps in {TestUpdaterKeyRotations.dump_dir}")
sys.argv.remove("--dump")

utils.configure_test_logging(sys.argv)
unittest.main()
11 changes: 6 additions & 5 deletions tests/test_updater_with_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ def test_keys_and_signatures(self):
# Update top level metadata
self._run_refresh()

# New targets: signed with a new key that is not in roles keys
old_signer = self.sim.signers["targets"].pop()
# New targets: signed with only a new key that is not in roles keys
old_signers = self.sim.signers.pop("targets")
key, signer = self.sim.create_key()
self.sim.signers["targets"] = [signer]
self.sim.add_signer("targets", signer)
self.sim.targets.version += 1
self.sim.update_snapshot()

Expand All @@ -182,8 +182,9 @@ def test_keys_and_signatures(self):
with self.assertRaises(UnsignedMetadataError):
self._run_refresh()

# New targets: sign with both new and old key
self.sim.signers["targets"] = [signer, old_signer]
# New targets: sign with both new and any original keys
for signer in old_signers.values():
self.sim.add_signer("targets", signer)
self.sim.targets.version += 1
self.sim.update_snapshot()

Expand Down