diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 6377fe0a2b..948ee37dd2 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -345,13 +345,7 @@ def add_target(self, role: str, data: bytes, path: str) -> None: self.target_files[path] = RepositoryTarget(data, target) def add_delegation( - self, - delegator_name: str, - name: str, - targets: Targets, - terminating: bool, - paths: Optional[List[str]], - hash_prefixes: Optional[List[str]], + self, delegator_name: str, role: DelegatedRole, targets: Targets ) -> None: """Add delegated target role to the repository.""" if delegator_name == Targets.type: @@ -360,7 +354,6 @@ def add_delegation( delegator = self.md_delegates[delegator_name].signed # Create delegation - role = DelegatedRole(name, [], 1, terminating, paths, hash_prefixes) if delegator.delegations is None: delegator.delegations = Delegations({}, OrderedDict()) # put delegation last by default @@ -372,7 +365,8 @@ def add_delegation( self.add_signer(role.name, signer) # Add metadata for the role - self.md_delegates[role.name] = Metadata(targets, OrderedDict()) + if role.name not in self.md_delegates: + self.md_delegates[role.name] = Metadata(targets, OrderedDict()) def write(self) -> None: """Dump current repository metadata to self.dump_dir diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 7b23843384..e4bab8a8c7 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -16,6 +16,7 @@ from tuf.api.metadata import ( SPECIFICATION_VERSION, TOP_LEVEL_ROLE_NAMES, + DelegatedRole, TargetFile, Targets, ) @@ -27,17 +28,40 @@ class TestConsistentSnapshot(unittest.TestCase): 'prefix_targets_with_hash' and verify that the correct URLs are formed for each combination""" + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + def setUp(self) -> None: # pylint: disable=consider-using-with + self.subtest_count = 0 self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") self.targets_dir = os.path.join(self.temp_dir.name, "targets") os.mkdir(self.metadata_dir) os.mkdir(self.targets_dir) + self.sim: RepositorySimulator def tearDown(self) -> None: self.temp_dir.cleanup() + def setup_subtest( + self, consistent_snapshot: bool, prefix_targets: bool = True + ) -> None: + self.sim = self._init_repo(consistent_snapshot, prefix_targets) + + self.subtest_count += 1 + if self.dump_dir is not None: + # create subtest dumpdir + name = f"{self.id().split('.')[-1]}-{self.subtest_count}" + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) + + def teardown_subtest(self) -> None: + if self.dump_dir is not None: + self.sim.write() + + utils.cleanup_dir(self.metadata_dir) + def _init_repo( self, consistent_snapshot: bool, prefix_targets: bool = True ) -> RepositorySimulator: @@ -54,24 +78,16 @@ def _init_repo( return sim - def _init_updater(self, sim: RepositorySimulator) -> Updater: + def _init_updater(self) -> Updater: """Create a new Updater instance""" return Updater( self.metadata_dir, "https://example.com/metadata/", self.targets_dir, "https://example.com/targets/", - sim, + self.sim, ) - @staticmethod - def _cleanup_dir(path: str) -> None: - """Delete all files inside a directory""" - for filepath in [ - os.path.join(path, filename) for filename in os.listdir(path) - ]: - os.remove(filepath) - def _assert_metadata_files_exist(self, roles: Iterable[str]) -> None: """Assert that local metadata files exist for 'roles'""" local_metadata_files = os.listdir(self.metadata_dir) @@ -111,22 +127,23 @@ def test_top_level_roles_update( ) -> None: # Test if the client fetches and stores metadata files with the # correct version prefix, depending on 'consistent_snapshot' config - consistent_snapshot: bool = test_case_data["consistent_snapshot"] - expected_calls: List[Any] = test_case_data["calls"] - - sim = self._init_repo(consistent_snapshot) - updater = self._init_updater(sim) + try: + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + exp_calls: List[Any] = test_case_data["calls"] - # cleanup fetch tracker metadata - sim.fetch_tracker.metadata.clear() - updater.refresh() + self.setup_subtest(consistent_snapshot) + updater = self._init_updater() - # metadata files are fetched with the expected version (or None) - self.assertListEqual(sim.fetch_tracker.metadata, expected_calls) - # metadata files are always persisted without a version prefix - self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) + # cleanup fetch tracker metadata + self.sim.fetch_tracker.metadata.clear() + updater.refresh() - self._cleanup_dir(self.metadata_dir) + # metadata files are fetched with the expected version (or None) + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) + # metadata files are always persisted without a version prefix + self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) + finally: + self.teardown_subtest() delegated_roles_data: utils.DataSet = { "consistent_snaphot disabled": { @@ -145,31 +162,35 @@ def test_delegated_roles_update( ) -> None: # Test if the client fetches and stores delegated metadata files with # the correct version prefix, depending on 'consistent_snapshot' config - consistent_snapshot: bool = test_case_data["consistent_snapshot"] - expected_version: Optional[int] = test_case_data["expected_version"] - rolenames = ["role1", "..", "."] - expected_calls = [(role, expected_version) for role in rolenames] - - sim = self._init_repo(consistent_snapshot) - # Add new delegated targets - spec_version = ".".join(SPECIFICATION_VERSION) - targets = Targets(1, spec_version, sim.safe_expiry, {}, None) - for role in rolenames: - sim.add_delegation("targets", role, targets, False, ["*"], None) - sim.update_snapshot() - updater = self._init_updater(sim) - updater.refresh() - - # cleanup fetch tracker metadata - sim.fetch_tracker.metadata.clear() - # trigger updater to fetch the delegated metadata - updater.get_targetinfo("anything") - # metadata files are fetched with the expected version (or None) - self.assertListEqual(sim.fetch_tracker.metadata, expected_calls) - # metadata files are always persisted without a version prefix - self._assert_metadata_files_exist(rolenames) - - self._cleanup_dir(self.metadata_dir) + try: + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + exp_version: Optional[int] = test_case_data["expected_version"] + rolenames = ["role1", "..", "."] + exp_calls = [(role, exp_version) for role in rolenames] + + self.setup_subtest(consistent_snapshot) + # Add new delegated targets + spec_version = ".".join(SPECIFICATION_VERSION) + for role in rolenames: + delegated_role = DelegatedRole(role, [], 1, False, ["*"], None) + targets = Targets( + 1, spec_version, self.sim.safe_expiry, {}, None + ) + self.sim.add_delegation("targets", delegated_role, targets) + self.sim.update_snapshot() + updater = self._init_updater() + updater.refresh() + + # cleanup fetch tracker metadata + self.sim.fetch_tracker.metadata.clear() + # trigger updater to fetch the delegated metadata + updater.get_targetinfo("anything") + # metadata files are fetched with the expected version (or None) + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) + # metadata files are always persisted without a version prefix + self._assert_metadata_files_exist(rolenames) + finally: + self.teardown_subtest() targets_download_data: utils.DataSet = { "consistent_snaphot disabled": { @@ -197,42 +218,49 @@ def test_download_targets(self, test_case_data: Dict[str, Any]) -> None: # Test if the client fetches and stores target files with # the correct hash prefix, depending on 'consistent_snapshot' # and 'prefix_targets_with_hash' config - consistent_snapshot: bool = test_case_data["consistent_snapshot"] - prefix_targets_with_hash: bool = test_case_data["prefix_targets"] - hash_algo: Optional[str] = test_case_data["hash_algo"] - targetpaths: List[str] = test_case_data["targetpaths"] - - sim = self._init_repo(consistent_snapshot, prefix_targets_with_hash) - # Add targets to repository - for targetpath in targetpaths: - sim.targets.version += 1 - sim.add_target("targets", b"content", targetpath) - sim.update_snapshot() - - updater = self._init_updater(sim) - updater.config.prefix_targets_with_hash = prefix_targets_with_hash - updater.refresh() - - for targetpath in targetpaths: - info = updater.get_targetinfo(targetpath) - assert isinstance(info, TargetFile) - updater.download_target(info) - - # target files are always persisted without hash prefix - self._assert_targets_files_exist([info.path]) - - # files are fetched with the expected hash prefix (or None) - expected_fetches = [ - (targetpath, None if not hash_algo else info.hashes[hash_algo]) - ] - - self.assertListEqual(sim.fetch_tracker.targets, expected_fetches) - sim.fetch_tracker.targets.clear() - - self._cleanup_dir(self.targets_dir) + try: + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + prefix_targets_with_hash: bool = test_case_data["prefix_targets"] + hash_algo: Optional[str] = test_case_data["hash_algo"] + targetpaths: List[str] = test_case_data["targetpaths"] + + self.setup_subtest(consistent_snapshot, prefix_targets_with_hash) + # Add targets to repository + for targetpath in targetpaths: + self.sim.targets.version += 1 + self.sim.add_target("targets", b"content", targetpath) + self.sim.update_snapshot() + + updater = self._init_updater() + updater.config.prefix_targets_with_hash = prefix_targets_with_hash + updater.refresh() + + for path in targetpaths: + info = updater.get_targetinfo(path) + assert isinstance(info, TargetFile) + updater.download_target(info) + + # target files are always persisted without hash prefix + self._assert_targets_files_exist([info.path]) + + # files are fetched with the expected hash prefix (or None) + exp_calls = [ + (path, None if not hash_algo else info.hashes[hash_algo]) + ] + + self.assertListEqual(self.sim.fetch_tracker.targets, exp_calls) + self.sim.fetch_tracker.targets.clear() + finally: + self.teardown_subtest() if __name__ == "__main__": + if "--dump" in sys.argv: + TestConsistentSnapshot.dump_dir = tempfile.mkdtemp() + print( + f"Repository Simulator dumps in {TestConsistentSnapshot.dump_dir}" + ) + sys.argv.remove("--dump") utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py new file mode 100644 index 0000000000..f5003e1bb0 --- /dev/null +++ b/tests/test_updater_delegation_graphs.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test updating delegated targets roles with various +delegation hierarchies""" + +import os +import sys +import tempfile +import unittest +from dataclasses import astuple, dataclass, field +from typing import Iterable, List, Optional + +from tests import utils +from tests.repository_simulator import RepositorySimulator +from tuf.api.metadata import ( + SPECIFICATION_VERSION, + TOP_LEVEL_ROLE_NAMES, + DelegatedRole, + Targets, +) +from tuf.ngclient import Updater + + +@dataclass +class TestDelegation: + delegator: str + rolename: str + keyids: List[str] = field(default_factory=list) + threshold: int = 1 + terminating: bool = False + paths: List[str] = field(default_factory=lambda: ["*"]) + path_hash_prefixes: Optional[List[str]] = None + + +@dataclass +class DelegationsTestCase: + """Describes a delegations graph as a list of delegations + and the expected order of traversal as 'visited_order'.""" + + delegations: List[TestDelegation] + visited_order: List[str] + + +class TestDelegationsGraphs(unittest.TestCase): + """Test creating delegations graphs with different complexity + and successfully updating the delegated roles metadata""" + + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + + def setUp(self) -> None: + # pylint: disable=consider-using-with + self.subtest_count = 0 + self.temp_dir = tempfile.TemporaryDirectory() + self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") + self.targets_dir = os.path.join(self.temp_dir.name, "targets") + os.mkdir(self.metadata_dir) + os.mkdir(self.targets_dir) + + def tearDown(self) -> None: + self.temp_dir.cleanup() + + def setup_subtest( + self, delegations: List[TestDelegation] + ) -> RepositorySimulator: + sim = self._init_repo(delegations) + + self.subtest_count += 1 + if self.dump_dir is not None: + # create subtest dumpdir + name = f"{self.id().split('.')[-1]}-{self.subtest_count}" + sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(sim.dump_dir) + # dump the repo simulator metadata + sim.write() + + return sim + + def teardown_subtest(self) -> None: + # clean up after each subtest + utils.cleanup_dir(self.metadata_dir) + + def _init_updater(self, sim: RepositorySimulator) -> Updater: + """Create a new Updater instance""" + return Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + sim, + ) + + def _init_repo( + self, delegations: List[TestDelegation] + ) -> RepositorySimulator: + """Create a new RepositorySimulator instance with 'delegations'""" + sim = RepositorySimulator() + spec_version = ".".join(SPECIFICATION_VERSION) + + for d in delegations: + if d.rolename in sim.md_delegates: + targets = sim.md_delegates[d.rolename].signed + else: + targets = Targets(1, spec_version, sim.safe_expiry, {}, None) + + # unpack 'd' but skip "delegator" + role = DelegatedRole(*astuple(d)[1:]) + sim.add_delegation(d.delegator, role, targets) + sim.update_snapshot() + + # Init trusted root for Updater + with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: + f.write(sim.signed_roots[0]) + + return sim + + def _assert_files_exist(self, roles: Iterable[str]) -> None: + """Assert that local metadata files exist for 'roles'""" + expected_files = sorted([f"{role}.json" for role in roles]) + local_metadata_files = sorted(os.listdir(self.metadata_dir)) + self.assertListEqual(local_metadata_files, expected_files) + + graphs: utils.DataSet = { + "basic delegation": DelegationsTestCase( + delegations=[TestDelegation("targets", "A")], + visited_order=["A"], + ), + "single level delegations": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + ], + visited_order=["A", "B"], + ), + "two-level delegations": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("B", "C"), + ], + visited_order=["A", "B", "C"], + ), + "two-level test DFS order of traversal": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C"), + TestDelegation("A", "D"), + ], + visited_order=["A", "C", "D", "B"], + ), + "three-level delegation test DFS order of traversal": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C"), + TestDelegation("C", "D"), + ], + visited_order=["A", "C", "D", "B"], + ), + "two-level terminating ignores all but role's descendants": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C", terminating=True), + TestDelegation("A", "D"), + ], + visited_order=["A", "C"], + ), + "three-level terminating ignores all but role's descendants": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C", terminating=True), + TestDelegation("C", "D"), + ], + visited_order=["A", "C", "D"], + ), + "two-level ignores all branches not matching 'paths'": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A", paths=["*.py"]), + TestDelegation("targets", "B"), + TestDelegation("A", "C"), + ], + visited_order=["B"], + ), + "three-level ignores all branches not matching 'paths'": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C", paths=["*.py"]), + TestDelegation("C", "D"), + ], + visited_order=["A", "B"], + ), + "cyclic graph": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("B", "C"), + TestDelegation("C", "D"), + TestDelegation("D", "B"), + ], + visited_order=["A", "B", "C", "D"], + ), + "two roles delegating to a third": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("B", "C"), + TestDelegation("A", "C"), + ], + # Under all same conditions, 'C' is reached through 'A' first" + visited_order=["A", "C", "B"], + ), + "two roles delegating to a third different 'paths'": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("B", "C"), + TestDelegation("A", "C", paths=["*.py"]), + ], + # 'C' is reached through 'B' since 'A' does not delegate a matching pattern" + visited_order=["A", "B", "C"], + ), + } + + @utils.run_sub_tests_with_dataset(graphs) + def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: + """Test that delegated roles are traversed in the order of appearance + in the delegator's metadata, using pre-order depth-first search""" + + try: + exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] + exp_calls = [(role, 1) for role in test_data.visited_order] + + sim = self.setup_subtest(test_data.delegations) + updater = self._init_updater(sim) + # Call explicitly refresh to simplify the expected_calls list + updater.refresh() + sim.fetch_tracker.metadata.clear() + # Check that metadata dir contains only top-level roles + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + + # Looking for a non-existing targetpath forces updater + # to visit all possible delegated roles + targetfile = updater.get_targetinfo("missingpath") + self.assertIsNone(targetfile) + # Check that the delegated roles were visited in the expected + # order and the corresponding metadata files were persisted + self.assertListEqual(sim.fetch_tracker.metadata, exp_calls) + self._assert_files_exist(exp_files) + finally: + self.teardown_subtest() + + +if __name__ == "__main__": + if "--dump" in sys.argv: + TestDelegationsGraphs.dump_dir = tempfile.mkdtemp() + print(f"Repository Simulator dumps in {TestDelegationsGraphs.dump_dir}") + sys.argv.remove("--dump") + + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 0c5a18e480..ffaa9c71f0 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -37,6 +37,9 @@ class TestRefresh(unittest.TestCase): """Test update of top-level metadata following 'Detailed client workflow' in the specification.""" + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + past_datetime = datetime.utcnow().replace(microsecond=0) - timedelta(days=5) def setUp(self) -> None: @@ -53,11 +56,20 @@ def setUp(self) -> None: with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: f.write(self.sim.signed_roots[0]) + if self.dump_dir is not None: + # create test specific dump directory + name = self.id().split(".")[-1] + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) + def tearDown(self) -> None: self.temp_dir.cleanup() def _run_refresh(self) -> Updater: """Create a new Updater instance and refresh""" + if self.dump_dir is not None: + self.sim.write() + updater = Updater( self.metadata_dir, "https://example.com/metadata/", @@ -70,6 +82,9 @@ def _run_refresh(self) -> Updater: def _init_updater(self) -> Updater: """Create a new Updater instance""" + if self.dump_dir is not None: + self.sim.write() + return Updater( self.metadata_dir, "https://example.com/metadata/", @@ -455,6 +470,10 @@ def test_compute_metafile_hashes_length(self) -> None: if __name__ == "__main__": + if "--dump" in sys.argv: + TestRefresh.dump_dir = tempfile.mkdtemp() + print(f"Repository Simulator dumps in {TestRefresh.dump_dir}") + sys.argv.remove("--dump") utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index ee9e257aec..8a3d6593c0 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -16,7 +16,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import SPECIFICATION_VERSION, Targets +from tuf.api.metadata import SPECIFICATION_VERSION, DelegatedRole, Targets from tuf.exceptions import BadVersionNumberError, UnsignedMetadataError from tuf.ngclient import Updater @@ -143,11 +143,10 @@ def test_fishy_rolenames(self) -> None: # Add new delegated targets, update the snapshot spec_version = ".".join(SPECIFICATION_VERSION) - targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) - for role in roles_to_filenames: - self.sim.add_delegation( - "targets", role, targets, False, ["*"], None - ) + for rolename in roles_to_filenames: + role = DelegatedRole(rolename, [], 1, False, ["*"], None) + targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) + self.sim.add_delegation("targets", role, targets) self.sim.update_snapshot() updater = self._run_refresh() @@ -236,7 +235,8 @@ def test_not_loading_targets_twice(self, wrapped_open: MagicMock) -> None: # Add new delegated targets, update the snapshot spec_version = ".".join(SPECIFICATION_VERSION) targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) - self.sim.add_delegation("targets", "role1", targets, False, ["*"], None) + role = DelegatedRole("role1", [], 1, False, ["*"], None) + self.sim.add_delegation("targets", role, targets) self.sim.update_snapshot() # Run refresh, top-level roles are loaded diff --git a/tests/utils.py b/tests/utils.py index 3e257ef70f..e8060d6cf3 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -23,6 +23,7 @@ import argparse import errno import logging +import os import queue import socket import subprocess @@ -154,6 +155,14 @@ def configure_test_logging(argv: List[str]) -> None: tuf.log.set_log_level(loglevel) +def cleanup_dir(path: str) -> None: + """Delete all files inside a directory""" + for filepath in [ + os.path.join(path, filename) for filename in os.listdir(path) + ]: + os.remove(filepath) + + class TestServerProcess: """Helper class used to create a child process with the subprocess.Popen object and use a thread-safe Queue structure for logging.