Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,7 @@ def add_target(self, role: str, data: bytes, path: str) -> None:
self.target_files[path] = RepositoryTarget(data, target)

def add_delegation(
self,
delegator_name: str,
name: str,
targets: Targets,
terminating: bool,
paths: Optional[List[str]],
hash_prefixes: Optional[List[str]],
self, delegator_name: str, role: DelegatedRole, targets: Targets
) -> None:
"""Add delegated target role to the repository."""
if delegator_name == Targets.type:
Expand All @@ -360,7 +354,6 @@ def add_delegation(
delegator = self.md_delegates[delegator_name].signed

# Create delegation
role = DelegatedRole(name, [], 1, terminating, paths, hash_prefixes)
if delegator.delegations is None:
delegator.delegations = Delegations({}, OrderedDict())
# put delegation last by default
Expand All @@ -372,7 +365,8 @@ def add_delegation(
self.add_signer(role.name, signer)

# Add metadata for the role
self.md_delegates[role.name] = Metadata(targets, OrderedDict())
if role.name not in self.md_delegates:
self.md_delegates[role.name] = Metadata(targets, OrderedDict())

def write(self) -> None:
"""Dump current repository metadata to self.dump_dir
Expand Down
190 changes: 109 additions & 81 deletions tests/test_updater_consistent_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from tuf.api.metadata import (
SPECIFICATION_VERSION,
TOP_LEVEL_ROLE_NAMES,
DelegatedRole,
TargetFile,
Targets,
)
Expand All @@ -27,17 +28,40 @@ class TestConsistentSnapshot(unittest.TestCase):
'prefix_targets_with_hash' and verify that the correct URLs
are formed for each combination"""

# set dump_dir to trigger repository state dumps
dump_dir: Optional[str] = None

def setUp(self) -> None:
# pylint: disable=consider-using-with
self.subtest_count = 0
self.temp_dir = tempfile.TemporaryDirectory()
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
os.mkdir(self.metadata_dir)
os.mkdir(self.targets_dir)
self.sim: RepositorySimulator

def tearDown(self) -> None:
self.temp_dir.cleanup()

def setup_subtest(
self, consistent_snapshot: bool, prefix_targets: bool = True
) -> None:
self.sim = self._init_repo(consistent_snapshot, prefix_targets)

self.subtest_count += 1
if self.dump_dir is not None:
# create subtest dumpdir
name = f"{self.id().split('.')[-1]}-{self.subtest_count}"
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)

def teardown_subtest(self) -> None:
if self.dump_dir is not None:
self.sim.write()

utils.cleanup_dir(self.metadata_dir)

def _init_repo(
self, consistent_snapshot: bool, prefix_targets: bool = True
) -> RepositorySimulator:
Expand All @@ -54,24 +78,16 @@ def _init_repo(

return sim

def _init_updater(self, sim: RepositorySimulator) -> Updater:
def _init_updater(self) -> Updater:
"""Create a new Updater instance"""
return Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
sim,
self.sim,
)

@staticmethod
def _cleanup_dir(path: str) -> None:
"""Delete all files inside a directory"""
for filepath in [
os.path.join(path, filename) for filename in os.listdir(path)
]:
os.remove(filepath)

def _assert_metadata_files_exist(self, roles: Iterable[str]) -> None:
"""Assert that local metadata files exist for 'roles'"""
local_metadata_files = os.listdir(self.metadata_dir)
Expand Down Expand Up @@ -111,22 +127,23 @@ def test_top_level_roles_update(
) -> None:
# Test if the client fetches and stores metadata files with the
# correct version prefix, depending on 'consistent_snapshot' config
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
expected_calls: List[Any] = test_case_data["calls"]

sim = self._init_repo(consistent_snapshot)
updater = self._init_updater(sim)
try:
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
exp_calls: List[Any] = test_case_data["calls"]

# cleanup fetch tracker metadata
sim.fetch_tracker.metadata.clear()
updater.refresh()
self.setup_subtest(consistent_snapshot)
updater = self._init_updater()

# metadata files are fetched with the expected version (or None)
self.assertListEqual(sim.fetch_tracker.metadata, expected_calls)
# metadata files are always persisted without a version prefix
self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES)
# cleanup fetch tracker metadata
self.sim.fetch_tracker.metadata.clear()
updater.refresh()

self._cleanup_dir(self.metadata_dir)
# metadata files are fetched with the expected version (or None)
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
# metadata files are always persisted without a version prefix
self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES)
finally:
self.teardown_subtest()

delegated_roles_data: utils.DataSet = {
"consistent_snaphot disabled": {
Expand All @@ -145,31 +162,35 @@ def test_delegated_roles_update(
) -> None:
# Test if the client fetches and stores delegated metadata files with
# the correct version prefix, depending on 'consistent_snapshot' config
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
expected_version: Optional[int] = test_case_data["expected_version"]
rolenames = ["role1", "..", "."]
expected_calls = [(role, expected_version) for role in rolenames]

sim = self._init_repo(consistent_snapshot)
# Add new delegated targets
spec_version = ".".join(SPECIFICATION_VERSION)
targets = Targets(1, spec_version, sim.safe_expiry, {}, None)
for role in rolenames:
sim.add_delegation("targets", role, targets, False, ["*"], None)
sim.update_snapshot()
updater = self._init_updater(sim)
updater.refresh()

# cleanup fetch tracker metadata
sim.fetch_tracker.metadata.clear()
# trigger updater to fetch the delegated metadata
updater.get_targetinfo("anything")
# metadata files are fetched with the expected version (or None)
self.assertListEqual(sim.fetch_tracker.metadata, expected_calls)
# metadata files are always persisted without a version prefix
self._assert_metadata_files_exist(rolenames)

self._cleanup_dir(self.metadata_dir)
try:
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
exp_version: Optional[int] = test_case_data["expected_version"]
rolenames = ["role1", "..", "."]
exp_calls = [(role, exp_version) for role in rolenames]

self.setup_subtest(consistent_snapshot)
# Add new delegated targets
spec_version = ".".join(SPECIFICATION_VERSION)
for role in rolenames:
delegated_role = DelegatedRole(role, [], 1, False, ["*"], None)
targets = Targets(
1, spec_version, self.sim.safe_expiry, {}, None
)
self.sim.add_delegation("targets", delegated_role, targets)
self.sim.update_snapshot()
updater = self._init_updater()
updater.refresh()

# cleanup fetch tracker metadata
self.sim.fetch_tracker.metadata.clear()
# trigger updater to fetch the delegated metadata
updater.get_targetinfo("anything")
# metadata files are fetched with the expected version (or None)
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
# metadata files are always persisted without a version prefix
self._assert_metadata_files_exist(rolenames)
finally:
self.teardown_subtest()

targets_download_data: utils.DataSet = {
"consistent_snaphot disabled": {
Expand Down Expand Up @@ -197,42 +218,49 @@ def test_download_targets(self, test_case_data: Dict[str, Any]) -> None:
# Test if the client fetches and stores target files with
# the correct hash prefix, depending on 'consistent_snapshot'
# and 'prefix_targets_with_hash' config
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
prefix_targets_with_hash: bool = test_case_data["prefix_targets"]
hash_algo: Optional[str] = test_case_data["hash_algo"]
targetpaths: List[str] = test_case_data["targetpaths"]

sim = self._init_repo(consistent_snapshot, prefix_targets_with_hash)
# Add targets to repository
for targetpath in targetpaths:
sim.targets.version += 1
sim.add_target("targets", b"content", targetpath)
sim.update_snapshot()

updater = self._init_updater(sim)
updater.config.prefix_targets_with_hash = prefix_targets_with_hash
updater.refresh()

for targetpath in targetpaths:
info = updater.get_targetinfo(targetpath)
assert isinstance(info, TargetFile)
updater.download_target(info)

# target files are always persisted without hash prefix
self._assert_targets_files_exist([info.path])

# files are fetched with the expected hash prefix (or None)
expected_fetches = [
(targetpath, None if not hash_algo else info.hashes[hash_algo])
]

self.assertListEqual(sim.fetch_tracker.targets, expected_fetches)
sim.fetch_tracker.targets.clear()

self._cleanup_dir(self.targets_dir)
try:
consistent_snapshot: bool = test_case_data["consistent_snapshot"]
prefix_targets_with_hash: bool = test_case_data["prefix_targets"]
hash_algo: Optional[str] = test_case_data["hash_algo"]
targetpaths: List[str] = test_case_data["targetpaths"]

self.setup_subtest(consistent_snapshot, prefix_targets_with_hash)
# Add targets to repository
for targetpath in targetpaths:
self.sim.targets.version += 1
self.sim.add_target("targets", b"content", targetpath)
self.sim.update_snapshot()

updater = self._init_updater()
updater.config.prefix_targets_with_hash = prefix_targets_with_hash
updater.refresh()

for path in targetpaths:
info = updater.get_targetinfo(path)
assert isinstance(info, TargetFile)
updater.download_target(info)

# target files are always persisted without hash prefix
self._assert_targets_files_exist([info.path])

# files are fetched with the expected hash prefix (or None)
exp_calls = [
(path, None if not hash_algo else info.hashes[hash_algo])
]

self.assertListEqual(self.sim.fetch_tracker.targets, exp_calls)
self.sim.fetch_tracker.targets.clear()
finally:
self.teardown_subtest()


if __name__ == "__main__":
if "--dump" in sys.argv:
TestConsistentSnapshot.dump_dir = tempfile.mkdtemp()
print(
f"Repository Simulator dumps in {TestConsistentSnapshot.dump_dir}"
)
sys.argv.remove("--dump")

utils.configure_test_logging(sys.argv)
unittest.main()
Loading