From 568580701c89635daec85c4c55ae41d00b540b4a Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Nov 2021 16:26:29 +0200 Subject: [PATCH 1/7] tests: move _cleanup_dir to utils.py Make the method _cleanup_dir public and move it to tests/utils.py. Signed-off-by: Teodora Sechkova --- tests/test_updater_consistent_snapshot.py | 14 +++----------- tests/utils.py | 9 +++++++++ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 7b23843384..532769db16 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -64,14 +64,6 @@ def _init_updater(self, sim: RepositorySimulator) -> Updater: sim, ) - @staticmethod - def _cleanup_dir(path: str) -> None: - """Delete all files inside a directory""" - for filepath in [ - os.path.join(path, filename) for filename in os.listdir(path) - ]: - os.remove(filepath) - def _assert_metadata_files_exist(self, roles: Iterable[str]) -> None: """Assert that local metadata files exist for 'roles'""" local_metadata_files = os.listdir(self.metadata_dir) @@ -126,7 +118,7 @@ def test_top_level_roles_update( # metadata files are always persisted without a version prefix self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) - self._cleanup_dir(self.metadata_dir) + utils.cleanup_dir(self.metadata_dir) delegated_roles_data: utils.DataSet = { "consistent_snaphot disabled": { @@ -169,7 +161,7 @@ def test_delegated_roles_update( # metadata files are always persisted without a version prefix self._assert_metadata_files_exist(rolenames) - self._cleanup_dir(self.metadata_dir) + utils.cleanup_dir(self.metadata_dir) targets_download_data: utils.DataSet = { "consistent_snaphot disabled": { @@ -229,7 +221,7 @@ def test_download_targets(self, test_case_data: Dict[str, Any]) -> None: self.assertListEqual(sim.fetch_tracker.targets, expected_fetches) sim.fetch_tracker.targets.clear() - self._cleanup_dir(self.targets_dir) + utils.cleanup_dir(self.targets_dir) if __name__ == "__main__": diff --git a/tests/utils.py b/tests/utils.py index 3e257ef70f..e8060d6cf3 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -23,6 +23,7 @@ import argparse import errno import logging +import os import queue import socket import subprocess @@ -154,6 +155,14 @@ def configure_test_logging(argv: List[str]) -> None: tuf.log.set_log_level(loglevel) +def cleanup_dir(path: str) -> None: + """Delete all files inside a directory""" + for filepath in [ + os.path.join(path, filename) for filename in os.listdir(path) + ]: + os.remove(filepath) + + class TestServerProcess: """Helper class used to create a child process with the subprocess.Popen object and use a thread-safe Queue structure for logging. From c77b47ee82ddc2cafbb43b7e681740d3d40028ec Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Nov 2021 16:28:43 +0200 Subject: [PATCH 2/7] Modify RepositorySimulator.add_delegation Reduce the number of function arguments and use DelegatedRole instead. When adding a list of delegations to the repository, move the Targets creation inside the loop to create a separate Targets object for each delegation. Create a new Metadata obgect only for delegated roles which do not exist yet in the repository. Signed-off-by: Teodora Sechkova --- tests/repository_simulator.py | 12 +++--------- tests/test_updater_consistent_snapshot.py | 6 ++++-- tests/test_updater_with_simulator.py | 14 +++++++------- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 6377fe0a2b..948ee37dd2 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -345,13 +345,7 @@ def add_target(self, role: str, data: bytes, path: str) -> None: self.target_files[path] = RepositoryTarget(data, target) def add_delegation( - self, - delegator_name: str, - name: str, - targets: Targets, - terminating: bool, - paths: Optional[List[str]], - hash_prefixes: Optional[List[str]], + self, delegator_name: str, role: DelegatedRole, targets: Targets ) -> None: """Add delegated target role to the repository.""" if delegator_name == Targets.type: @@ -360,7 +354,6 @@ def add_delegation( delegator = self.md_delegates[delegator_name].signed # Create delegation - role = DelegatedRole(name, [], 1, terminating, paths, hash_prefixes) if delegator.delegations is None: delegator.delegations = Delegations({}, OrderedDict()) # put delegation last by default @@ -372,7 +365,8 @@ def add_delegation( self.add_signer(role.name, signer) # Add metadata for the role - self.md_delegates[role.name] = Metadata(targets, OrderedDict()) + if role.name not in self.md_delegates: + self.md_delegates[role.name] = Metadata(targets, OrderedDict()) def write(self) -> None: """Dump current repository metadata to self.dump_dir diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 532769db16..4ca37f365e 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -16,6 +16,7 @@ from tuf.api.metadata import ( SPECIFICATION_VERSION, TOP_LEVEL_ROLE_NAMES, + DelegatedRole, TargetFile, Targets, ) @@ -145,9 +146,10 @@ def test_delegated_roles_update( sim = self._init_repo(consistent_snapshot) # Add new delegated targets spec_version = ".".join(SPECIFICATION_VERSION) - targets = Targets(1, spec_version, sim.safe_expiry, {}, None) for role in rolenames: - sim.add_delegation("targets", role, targets, False, ["*"], None) + delegated_role = DelegatedRole(role, [], 1, False, ["*"], None) + targets = Targets(1, spec_version, sim.safe_expiry, {}, None) + sim.add_delegation("targets", delegated_role, targets) sim.update_snapshot() updater = self._init_updater(sim) updater.refresh() diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index ee9e257aec..8a3d6593c0 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -16,7 +16,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import SPECIFICATION_VERSION, Targets +from tuf.api.metadata import SPECIFICATION_VERSION, DelegatedRole, Targets from tuf.exceptions import BadVersionNumberError, UnsignedMetadataError from tuf.ngclient import Updater @@ -143,11 +143,10 @@ def test_fishy_rolenames(self) -> None: # Add new delegated targets, update the snapshot spec_version = ".".join(SPECIFICATION_VERSION) - targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) - for role in roles_to_filenames: - self.sim.add_delegation( - "targets", role, targets, False, ["*"], None - ) + for rolename in roles_to_filenames: + role = DelegatedRole(rolename, [], 1, False, ["*"], None) + targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) + self.sim.add_delegation("targets", role, targets) self.sim.update_snapshot() updater = self._run_refresh() @@ -236,7 +235,8 @@ def test_not_loading_targets_twice(self, wrapped_open: MagicMock) -> None: # Add new delegated targets, update the snapshot spec_version = ".".join(SPECIFICATION_VERSION) targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) - self.sim.add_delegation("targets", "role1", targets, False, ["*"], None) + role = DelegatedRole("role1", [], 1, False, ["*"], None) + self.sim.add_delegation("targets", role, targets) self.sim.update_snapshot() # Run refresh, top-level roles are loaded From 74dddf02c3249395f718317925ab594427ee8c3c Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 1 Dec 2021 14:16:49 +0200 Subject: [PATCH 3/7] Improve clean up in subtests Use a "try" block to catch exceptions during failing subtests and always execute the subtest clean up code. Signed-off-by: Teodora Sechkova --- tests/test_updater_consistent_snapshot.py | 147 +++++++++++----------- 1 file changed, 75 insertions(+), 72 deletions(-) diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 4ca37f365e..d6710b247e 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -104,22 +104,23 @@ def test_top_level_roles_update( ) -> None: # Test if the client fetches and stores metadata files with the # correct version prefix, depending on 'consistent_snapshot' config - consistent_snapshot: bool = test_case_data["consistent_snapshot"] - expected_calls: List[Any] = test_case_data["calls"] + try: + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + expected_calls: List[Any] = test_case_data["calls"] - sim = self._init_repo(consistent_snapshot) - updater = self._init_updater(sim) + sim = self._init_repo(consistent_snapshot) + updater = self._init_updater(sim) - # cleanup fetch tracker metadata - sim.fetch_tracker.metadata.clear() - updater.refresh() + # cleanup fetch tracker metadata + sim.fetch_tracker.metadata.clear() + updater.refresh() - # metadata files are fetched with the expected version (or None) - self.assertListEqual(sim.fetch_tracker.metadata, expected_calls) - # metadata files are always persisted without a version prefix - self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) - - utils.cleanup_dir(self.metadata_dir) + # metadata files are fetched with the expected version (or None) + self.assertListEqual(sim.fetch_tracker.metadata, expected_calls) + # metadata files are always persisted without a version prefix + self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) + finally: + utils.cleanup_dir(self.metadata_dir) delegated_roles_data: utils.DataSet = { "consistent_snaphot disabled": { @@ -138,32 +139,33 @@ def test_delegated_roles_update( ) -> None: # Test if the client fetches and stores delegated metadata files with # the correct version prefix, depending on 'consistent_snapshot' config - consistent_snapshot: bool = test_case_data["consistent_snapshot"] - expected_version: Optional[int] = test_case_data["expected_version"] - rolenames = ["role1", "..", "."] - expected_calls = [(role, expected_version) for role in rolenames] - - sim = self._init_repo(consistent_snapshot) - # Add new delegated targets - spec_version = ".".join(SPECIFICATION_VERSION) - for role in rolenames: - delegated_role = DelegatedRole(role, [], 1, False, ["*"], None) - targets = Targets(1, spec_version, sim.safe_expiry, {}, None) - sim.add_delegation("targets", delegated_role, targets) - sim.update_snapshot() - updater = self._init_updater(sim) - updater.refresh() - - # cleanup fetch tracker metadata - sim.fetch_tracker.metadata.clear() - # trigger updater to fetch the delegated metadata - updater.get_targetinfo("anything") - # metadata files are fetched with the expected version (or None) - self.assertListEqual(sim.fetch_tracker.metadata, expected_calls) - # metadata files are always persisted without a version prefix - self._assert_metadata_files_exist(rolenames) - - utils.cleanup_dir(self.metadata_dir) + try: + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + exp_version: Optional[int] = test_case_data["expected_version"] + rolenames = ["role1", "..", "."] + exp_calls = [(role, exp_version) for role in rolenames] + + sim = self._init_repo(consistent_snapshot) + # Add new delegated targets + spec_version = ".".join(SPECIFICATION_VERSION) + for role in rolenames: + delegated_role = DelegatedRole(role, [], 1, False, ["*"], None) + targets = Targets(1, spec_version, sim.safe_expiry, {}, None) + sim.add_delegation("targets", delegated_role, targets) + sim.update_snapshot() + updater = self._init_updater(sim) + updater.refresh() + + # cleanup fetch tracker metadata + sim.fetch_tracker.metadata.clear() + # trigger updater to fetch the delegated metadata + updater.get_targetinfo("anything") + # metadata files are fetched with the expected version (or None) + self.assertListEqual(sim.fetch_tracker.metadata, exp_calls) + # metadata files are always persisted without a version prefix + self._assert_metadata_files_exist(rolenames) + finally: + utils.cleanup_dir(self.metadata_dir) targets_download_data: utils.DataSet = { "consistent_snaphot disabled": { @@ -191,39 +193,40 @@ def test_download_targets(self, test_case_data: Dict[str, Any]) -> None: # Test if the client fetches and stores target files with # the correct hash prefix, depending on 'consistent_snapshot' # and 'prefix_targets_with_hash' config - consistent_snapshot: bool = test_case_data["consistent_snapshot"] - prefix_targets_with_hash: bool = test_case_data["prefix_targets"] - hash_algo: Optional[str] = test_case_data["hash_algo"] - targetpaths: List[str] = test_case_data["targetpaths"] - - sim = self._init_repo(consistent_snapshot, prefix_targets_with_hash) - # Add targets to repository - for targetpath in targetpaths: - sim.targets.version += 1 - sim.add_target("targets", b"content", targetpath) - sim.update_snapshot() - - updater = self._init_updater(sim) - updater.config.prefix_targets_with_hash = prefix_targets_with_hash - updater.refresh() - - for targetpath in targetpaths: - info = updater.get_targetinfo(targetpath) - assert isinstance(info, TargetFile) - updater.download_target(info) - - # target files are always persisted without hash prefix - self._assert_targets_files_exist([info.path]) - - # files are fetched with the expected hash prefix (or None) - expected_fetches = [ - (targetpath, None if not hash_algo else info.hashes[hash_algo]) - ] - - self.assertListEqual(sim.fetch_tracker.targets, expected_fetches) - sim.fetch_tracker.targets.clear() - - utils.cleanup_dir(self.targets_dir) + try: + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + prefix_targets_with_hash: bool = test_case_data["prefix_targets"] + hash_algo: Optional[str] = test_case_data["hash_algo"] + targetpaths: List[str] = test_case_data["targetpaths"] + + sim = self._init_repo(consistent_snapshot, prefix_targets_with_hash) + # Add targets to repository + for targetpath in targetpaths: + sim.targets.version += 1 + sim.add_target("targets", b"content", targetpath) + sim.update_snapshot() + + updater = self._init_updater(sim) + updater.config.prefix_targets_with_hash = prefix_targets_with_hash + updater.refresh() + + for path in targetpaths: + info = updater.get_targetinfo(path) + assert isinstance(info, TargetFile) + updater.download_target(info) + + # target files are always persisted without hash prefix + self._assert_targets_files_exist([info.path]) + + # files are fetched with the expected hash prefix (or None) + exp_fetches = [ + (path, None if not hash_algo else info.hashes[hash_algo]) + ] + + self.assertListEqual(sim.fetch_tracker.targets, exp_fetches) + sim.fetch_tracker.targets.clear() + finally: + utils.cleanup_dir(self.targets_dir) if __name__ == "__main__": From db0e0d6de483292065d0d2d99b32d96218ceb26e Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 22 Nov 2021 16:29:23 +0200 Subject: [PATCH 4/7] Add test_updater_deleagation_graphs.py Add tests creating delegations graphs with different complexity and successfully updating the delegated roles metadata. Signed-off-by: Teodora Sechkova --- tests/test_updater_delegation_graphs.py | 243 ++++++++++++++++++++++++ 1 file changed, 243 insertions(+) create mode 100644 tests/test_updater_delegation_graphs.py diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py new file mode 100644 index 0000000000..877591cf8a --- /dev/null +++ b/tests/test_updater_delegation_graphs.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test updating delegated targets roles with various +delegation hierarchies""" + +import os +import sys +import tempfile +import unittest +from dataclasses import astuple, dataclass, field +from typing import Iterable, List, Optional +from unittest.mock import call, patch + +from tests import utils +from tests.repository_simulator import RepositorySimulator +from tuf.api.metadata import ( + SPECIFICATION_VERSION, + TOP_LEVEL_ROLE_NAMES, + DelegatedRole, + Targets, +) +from tuf.ngclient import Updater + + +@dataclass +class TestDelegation: + delegator: str + rolename: str + keyids: List[str] = field(default_factory=list) + threshold: int = 1 + terminating: bool = False + paths: List[str] = field(default_factory=lambda: ["*"]) + path_hash_prefixes: Optional[List[str]] = None + + +@dataclass +class DelegationsTestCase: + """Describes a delegations graph as a list of delegations + and the expected order of traversal as 'visited_order'.""" + + delegations: List[TestDelegation] + visited_order: List[str] + + +class TestDelegationsGraphs(unittest.TestCase): + """Test creating delegations graphs with different complexity + and successfully updating the delegated roles metadata""" + + def setUp(self) -> None: + self.temp_dir = tempfile.TemporaryDirectory() + self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") + self.targets_dir = os.path.join(self.temp_dir.name, "targets") + os.mkdir(self.metadata_dir) + os.mkdir(self.targets_dir) + + def tearDown(self) -> None: + self.temp_dir.cleanup() + + def _init_updater(self, sim: RepositorySimulator) -> Updater: + """Create a new Updater instance""" + return Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + sim, + ) + + def _init_repo( + self, delegations: List[TestDelegation] + ) -> RepositorySimulator: + """Create a new RepositorySimulator instance with 'delegations'""" + sim = RepositorySimulator() + spec_version = ".".join(SPECIFICATION_VERSION) + + for d in delegations: + if d.rolename in sim.md_delegates: + targets = sim.md_delegates[d.rolename].signed + else: + targets = Targets(1, spec_version, sim.safe_expiry, {}, None) + + # unpack 'd' but skip "delegator" + role = DelegatedRole(*astuple(d)[1:]) + sim.add_delegation(d.delegator, role, targets) + sim.update_snapshot() + + # Init trusted root for Updater + with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: + f.write(sim.signed_roots[0]) + + return sim + + def _assert_files_exist(self, roles: Iterable[str]) -> None: + """Assert that local metadata files exist for 'roles'""" + expected_files = sorted([f"{role}.json" for role in roles]) + local_metadata_files = sorted(os.listdir(self.metadata_dir)) + self.assertListEqual(local_metadata_files, expected_files) + + graphs: utils.DataSet = { + "basic delegation": DelegationsTestCase( + delegations=[TestDelegation("targets", "A")], + visited_order=["A"], + ), + "single level delegations": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + ], + visited_order=["A", "B"], + ), + "two-level delegations": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("B", "C"), + ], + visited_order=["A", "B", "C"], + ), + "two-level test DFS order of traversal": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C"), + TestDelegation("A", "D"), + ], + visited_order=["A", "C", "D", "B"], + ), + "three-level delegation test DFS order of traversal": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C"), + TestDelegation("C", "D"), + ], + visited_order=["A", "C", "D", "B"], + ), + "two-level terminating ignores all but role's descendants": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C", terminating=True), + TestDelegation("A", "D"), + ], + visited_order=["A", "C"], + ), + "three-level terminating ignores all but role's descendants": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C", terminating=True), + TestDelegation("C", "D"), + ], + visited_order=["A", "C", "D"], + ), + "two-level ignores all branches not matching 'paths'": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A", paths=["*.py"]), + TestDelegation("targets", "B"), + TestDelegation("A", "C"), + ], + visited_order=["B"], + ), + "three-level ignores all branches not matching 'paths'": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("A", "C", paths=["*.py"]), + TestDelegation("C", "D"), + ], + visited_order=["A", "B"], + ), + "cyclic graph": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("B", "C"), + TestDelegation("C", "D"), + TestDelegation("D", "B"), + ], + visited_order=["A", "B", "C", "D"], + ), + "two roles delegating to a third": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("B", "C"), + TestDelegation("A", "C"), + ], + # Under all same conditions, 'C' is reached through 'A' first" + visited_order=["A", "C", "B"], + ), + "two roles delegating to a third different 'paths'": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("B", "C"), + TestDelegation("A", "C", paths=["*.py"]), + ], + # 'C' is reached through 'B' since 'A' does not delegate a matching pattern" + visited_order=["A", "B", "C"], + ), + } + + @utils.run_sub_tests_with_dataset(graphs) + def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: + """Test that delegated roles are traversed in the order of appearance + in the delegator's metadata, using pre-order depth-first search""" + + try: + exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] + exp_calls = [call(role, 1) for role in test_data.visited_order] + + sim = self._init_repo(test_data.delegations) + updater = self._init_updater(sim) + # Call explicitly refresh to simplify the expected_calls list + updater.refresh() + # Check that metadata dir contains only top-level roles + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + + with patch.object( + sim, "fetch_metadata", wraps=sim.fetch_metadata + ) as wrapped_fetch: + # Looking for a non-existing targetpath forces updater + # to visit all possible delegated roles + targetfile = updater.get_targetinfo("missingpath") + + self.assertIsNone(targetfile) + # Check that the delegated roles were visited in the expected + # order and the corresponding metadata files were persisted + self.assertListEqual(wrapped_fetch.call_args_list, exp_calls) + self._assert_files_exist(exp_files) + finally: + # clean up after each subtest + utils.cleanup_dir(self.metadata_dir) + + +if __name__ == "__main__": + + utils.configure_test_logging(sys.argv) + unittest.main() From 1ba93015c33f4141f5b097e8ac601c4522823cad Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 1 Dec 2021 17:52:21 +0200 Subject: [PATCH 5/7] Add --dump option to tests Extend updater tests with the option to dump repository metadata locally. Signed-off-by: Teodora Sechkova --- tests/test_updater_consistent_snapshot.py | 80 ++++++++++++++++------- tests/test_updater_delegation_graphs.py | 33 +++++++++- tests/test_updater_top_level_update.py | 19 ++++++ 3 files changed, 104 insertions(+), 28 deletions(-) diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index d6710b247e..99c0a0f7cf 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -28,8 +28,12 @@ class TestConsistentSnapshot(unittest.TestCase): 'prefix_targets_with_hash' and verify that the correct URLs are formed for each combination""" + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + def setUp(self) -> None: # pylint: disable=consider-using-with + self.subtest_count = 0 self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") self.targets_dir = os.path.join(self.temp_dir.name, "targets") @@ -39,6 +43,24 @@ def setUp(self) -> None: def tearDown(self) -> None: self.temp_dir.cleanup() + def setup_subtest( + self, consistent_snapshot: bool, prefix_targets: bool = True + ) -> None: + self.sim = self._init_repo(consistent_snapshot, prefix_targets) + + self.subtest_count += 1 + if self.dump_dir is not None: + # create subtest dumpdir + name = f"{self.id().split('.')[-1]}-{self.subtest_count}" + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) + + def teardown_subtest(self): + if self.dump_dir is not None: + self.sim.write() + + utils.cleanup_dir(self.metadata_dir) + def _init_repo( self, consistent_snapshot: bool, prefix_targets: bool = True ) -> RepositorySimulator: @@ -55,14 +77,14 @@ def _init_repo( return sim - def _init_updater(self, sim: RepositorySimulator) -> Updater: + def _init_updater(self) -> Updater: """Create a new Updater instance""" return Updater( self.metadata_dir, "https://example.com/metadata/", self.targets_dir, "https://example.com/targets/", - sim, + self.sim, ) def _assert_metadata_files_exist(self, roles: Iterable[str]) -> None: @@ -106,21 +128,21 @@ def test_top_level_roles_update( # correct version prefix, depending on 'consistent_snapshot' config try: consistent_snapshot: bool = test_case_data["consistent_snapshot"] - expected_calls: List[Any] = test_case_data["calls"] + exp_calls: List[Any] = test_case_data["calls"] - sim = self._init_repo(consistent_snapshot) - updater = self._init_updater(sim) + self.setup_subtest(consistent_snapshot) + updater = self._init_updater() # cleanup fetch tracker metadata - sim.fetch_tracker.metadata.clear() + self.sim.fetch_tracker.metadata.clear() updater.refresh() # metadata files are fetched with the expected version (or None) - self.assertListEqual(sim.fetch_tracker.metadata, expected_calls) + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) # metadata files are always persisted without a version prefix self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) finally: - utils.cleanup_dir(self.metadata_dir) + self.teardown_subtest() delegated_roles_data: utils.DataSet = { "consistent_snaphot disabled": { @@ -145,27 +167,29 @@ def test_delegated_roles_update( rolenames = ["role1", "..", "."] exp_calls = [(role, exp_version) for role in rolenames] - sim = self._init_repo(consistent_snapshot) + self.setup_subtest(consistent_snapshot) # Add new delegated targets spec_version = ".".join(SPECIFICATION_VERSION) for role in rolenames: delegated_role = DelegatedRole(role, [], 1, False, ["*"], None) - targets = Targets(1, spec_version, sim.safe_expiry, {}, None) - sim.add_delegation("targets", delegated_role, targets) - sim.update_snapshot() - updater = self._init_updater(sim) + targets = Targets( + 1, spec_version, self.sim.safe_expiry, {}, None + ) + self.sim.add_delegation("targets", delegated_role, targets) + self.sim.update_snapshot() + updater = self._init_updater() updater.refresh() # cleanup fetch tracker metadata - sim.fetch_tracker.metadata.clear() + self.sim.fetch_tracker.metadata.clear() # trigger updater to fetch the delegated metadata updater.get_targetinfo("anything") # metadata files are fetched with the expected version (or None) - self.assertListEqual(sim.fetch_tracker.metadata, exp_calls) + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) # metadata files are always persisted without a version prefix self._assert_metadata_files_exist(rolenames) finally: - utils.cleanup_dir(self.metadata_dir) + self.teardown_subtest() targets_download_data: utils.DataSet = { "consistent_snaphot disabled": { @@ -199,14 +223,14 @@ def test_download_targets(self, test_case_data: Dict[str, Any]) -> None: hash_algo: Optional[str] = test_case_data["hash_algo"] targetpaths: List[str] = test_case_data["targetpaths"] - sim = self._init_repo(consistent_snapshot, prefix_targets_with_hash) + self.setup_subtest(consistent_snapshot, prefix_targets_with_hash) # Add targets to repository for targetpath in targetpaths: - sim.targets.version += 1 - sim.add_target("targets", b"content", targetpath) - sim.update_snapshot() + self.sim.targets.version += 1 + self.sim.add_target("targets", b"content", targetpath) + self.sim.update_snapshot() - updater = self._init_updater(sim) + updater = self._init_updater() updater.config.prefix_targets_with_hash = prefix_targets_with_hash updater.refresh() @@ -219,17 +243,23 @@ def test_download_targets(self, test_case_data: Dict[str, Any]) -> None: self._assert_targets_files_exist([info.path]) # files are fetched with the expected hash prefix (or None) - exp_fetches = [ + exp_calls = [ (path, None if not hash_algo else info.hashes[hash_algo]) ] - self.assertListEqual(sim.fetch_tracker.targets, exp_fetches) - sim.fetch_tracker.targets.clear() + self.assertListEqual(self.sim.fetch_tracker.targets, exp_calls) + self.sim.fetch_tracker.targets.clear() finally: - utils.cleanup_dir(self.targets_dir) + self.teardown_subtest() if __name__ == "__main__": + if "--dump" in sys.argv: + TestConsistentSnapshot.dump_dir = tempfile.mkdtemp() + print( + f"Repository Simulator dumps in {TestConsistentSnapshot.dump_dir}" + ) + sys.argv.remove("--dump") utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index 877591cf8a..efb35d30ef 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -49,7 +49,11 @@ class TestDelegationsGraphs(unittest.TestCase): """Test creating delegations graphs with different complexity and successfully updating the delegated roles metadata""" + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + def setUp(self) -> None: + self.subtest_count = 0 self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") self.targets_dir = os.path.join(self.temp_dir.name, "targets") @@ -59,6 +63,26 @@ def setUp(self) -> None: def tearDown(self) -> None: self.temp_dir.cleanup() + def setup_subtest( + self, delegations: List[TestDelegation] + ) -> RepositorySimulator: + sim = self._init_repo(delegations) + + self.subtest_count += 1 + if self.dump_dir is not None: + # create subtest dumpdir + name = f"{self.id().split('.')[-1]}-{self.subtest_count}" + sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(sim.dump_dir) + # dump the repo simulator metadata + sim.write() + + return sim + + def teardown_subtest(self) -> None: + # clean up after each subtest + utils.cleanup_dir(self.metadata_dir) + def _init_updater(self, sim: RepositorySimulator) -> Updater: """Create a new Updater instance""" return Updater( @@ -213,7 +237,7 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] exp_calls = [call(role, 1) for role in test_data.visited_order] - sim = self._init_repo(test_data.delegations) + sim = self.setup_subtest(test_data.delegations) updater = self._init_updater(sim) # Call explicitly refresh to simplify the expected_calls list updater.refresh() @@ -233,11 +257,14 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: self.assertListEqual(wrapped_fetch.call_args_list, exp_calls) self._assert_files_exist(exp_files) finally: - # clean up after each subtest - utils.cleanup_dir(self.metadata_dir) + self.teardown_subtest() if __name__ == "__main__": + if "--dump" in sys.argv: + TestDelegationsGraphs.dump_dir = tempfile.mkdtemp() + print(f"Repository Simulator dumps in {TestDelegationsGraphs.dump_dir}") + sys.argv.remove("--dump") utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 0c5a18e480..ffaa9c71f0 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -37,6 +37,9 @@ class TestRefresh(unittest.TestCase): """Test update of top-level metadata following 'Detailed client workflow' in the specification.""" + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + past_datetime = datetime.utcnow().replace(microsecond=0) - timedelta(days=5) def setUp(self) -> None: @@ -53,11 +56,20 @@ def setUp(self) -> None: with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: f.write(self.sim.signed_roots[0]) + if self.dump_dir is not None: + # create test specific dump directory + name = self.id().split(".")[-1] + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) + def tearDown(self) -> None: self.temp_dir.cleanup() def _run_refresh(self) -> Updater: """Create a new Updater instance and refresh""" + if self.dump_dir is not None: + self.sim.write() + updater = Updater( self.metadata_dir, "https://example.com/metadata/", @@ -70,6 +82,9 @@ def _run_refresh(self) -> Updater: def _init_updater(self) -> Updater: """Create a new Updater instance""" + if self.dump_dir is not None: + self.sim.write() + return Updater( self.metadata_dir, "https://example.com/metadata/", @@ -455,6 +470,10 @@ def test_compute_metafile_hashes_length(self) -> None: if __name__ == "__main__": + if "--dump" in sys.argv: + TestRefresh.dump_dir = tempfile.mkdtemp() + print(f"Repository Simulator dumps in {TestRefresh.dump_dir}") + sys.argv.remove("--dump") utils.configure_test_logging(sys.argv) unittest.main() From 26a5f819776cd2e03a694d294fc393dcdd32d765 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Thu, 9 Dec 2021 15:47:09 +0200 Subject: [PATCH 6/7] Use fetch_tracker in test_graph_traversal Replace the use of "patch" in test_graph_traversal with the newly added fetch_tracker from RepositorySimulator. Signed-off-by: Teodora Sechkova --- tests/test_updater_delegation_graphs.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index efb35d30ef..0fc0735bf1 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -12,7 +12,6 @@ import unittest from dataclasses import astuple, dataclass, field from typing import Iterable, List, Optional -from unittest.mock import call, patch from tests import utils from tests.repository_simulator import RepositorySimulator @@ -235,27 +234,24 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: try: exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] - exp_calls = [call(role, 1) for role in test_data.visited_order] + exp_calls = [(role, 1) for role in test_data.visited_order] sim = self.setup_subtest(test_data.delegations) updater = self._init_updater(sim) # Call explicitly refresh to simplify the expected_calls list updater.refresh() + sim.fetch_tracker.metadata.clear() # Check that metadata dir contains only top-level roles self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) - with patch.object( - sim, "fetch_metadata", wraps=sim.fetch_metadata - ) as wrapped_fetch: - # Looking for a non-existing targetpath forces updater - # to visit all possible delegated roles - targetfile = updater.get_targetinfo("missingpath") - - self.assertIsNone(targetfile) - # Check that the delegated roles were visited in the expected - # order and the corresponding metadata files were persisted - self.assertListEqual(wrapped_fetch.call_args_list, exp_calls) - self._assert_files_exist(exp_files) + # Looking for a non-existing targetpath forces updater + # to visit all possible delegated roles + targetfile = updater.get_targetinfo("missingpath") + self.assertIsNone(targetfile) + # Check that the delegated roles were visited in the expected + # order and the corresponding metadata files were persisted + self.assertListEqual(sim.fetch_tracker.metadata, exp_calls) + self._assert_files_exist(exp_files) finally: self.teardown_subtest() From 2562aff00b3464a3711a0f8aba5b4f5fd42606f9 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Fri, 10 Dec 2021 16:09:47 +0200 Subject: [PATCH 7/7] Fix linter errors in tests Signed-off-by: Teodora Sechkova --- tests/test_updater_consistent_snapshot.py | 3 ++- tests/test_updater_delegation_graphs.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 99c0a0f7cf..e4bab8a8c7 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -39,6 +39,7 @@ def setUp(self) -> None: self.targets_dir = os.path.join(self.temp_dir.name, "targets") os.mkdir(self.metadata_dir) os.mkdir(self.targets_dir) + self.sim: RepositorySimulator def tearDown(self) -> None: self.temp_dir.cleanup() @@ -55,7 +56,7 @@ def setup_subtest( self.sim.dump_dir = os.path.join(self.dump_dir, name) os.mkdir(self.sim.dump_dir) - def teardown_subtest(self): + def teardown_subtest(self) -> None: if self.dump_dir is not None: self.sim.write() diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index 0fc0735bf1..f5003e1bb0 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -52,6 +52,7 @@ class TestDelegationsGraphs(unittest.TestCase): dump_dir: Optional[str] = None def setUp(self) -> None: + # pylint: disable=consider-using-with self.subtest_count = 0 self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")