diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index abb7f37141..422ac66bf4 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -66,6 +66,7 @@ Metadata, MetaFile, Root, + Rotate, Snapshot, SuccinctRoles, TargetFile, @@ -180,6 +181,7 @@ def _initialize(self) -> None: self.md_snapshot = Metadata(Snapshot(expires=self.safe_expiry)) self.md_timestamp = Metadata(Timestamp(expires=self.safe_expiry)) self.md_root = Metadata(Root(expires=self.safe_expiry)) + self.md_rotate: Dict[str, List[Metadata]] = {} for role in TOP_LEVEL_ROLE_NAMES: key, signer = self.create_key() @@ -197,6 +199,33 @@ def publish_root(self) -> None: self.signed_roots.append(self.md_root.to_bytes(JSONSerializer())) logger.debug("Published root v%d", self.root.version) + def add_rotate_file( + self, + rolename: str, + new_keys: Dict[str, Key], + new_threshold: int, + signers: Dict[str, SSlibSigner], + ) -> None: + """Add rotate file""" + if rolename in self.md_rotate: + rotate_version = len(self.md_rotate[rolename]) + inner_rotate = Rotate( + rotate_version, rolename, new_keys, new_threshold + ) + rotate_file = Metadata(inner_rotate) + self.md_rotate[rolename].append(rotate_file) + else: + rotate_version = 0 + inner_rotate = Rotate( + rotate_version, rolename, new_keys, new_threshold + ) + rotate_file = Metadata(inner_rotate) + self.md_rotate[rolename] = [rotate_file] + + rotate_rolename = f"rotate/{rolename}.rotate.{rotate_version}" + + self.signers[rotate_rolename] = signers + def _fetch(self, url: str) -> Iterator[bytes]: """Fetches data from the given url and returns an Iterator (or yields bytes). @@ -208,7 +237,9 @@ def _fetch(self, url: str) -> Iterator[bytes]: version_str, _, role = ver_and_name.partition(".") # root is always version-prefixed while timestamp is always NOT if role == Root.type or ( - self.root.consistent_snapshot and ver_and_name != Timestamp.type + self.root.consistent_snapshot + and ver_and_name != Timestamp.type + and not version_str.startswith("rotate") ): version: Optional[int] = int(version_str) else: @@ -277,6 +308,14 @@ def fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: md = self.md_snapshot elif role == Targets.type: md = self.md_targets + elif role.startswith("rotate/"): + rotate_parts = role.split(".") + rotate_role = rotate_parts[0][len("rotate/") :] + rotate_version = rotate_parts[2] + try: + md = self.md_rotate[rotate_role][int(rotate_version)] + except (KeyError, IndexError) as e: + raise DownloadHTTPError(f"Unknown role {role}", 404) from e else: md = self.md_delegates.get(role) diff --git a/tests/test_api.py b/tests/test_api.py index 1fd0a44689..e30a5c17d4 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -34,6 +34,7 @@ Key, Metadata, Root, + Rotate, Snapshot, SuccinctRoles, TargetFile, @@ -780,6 +781,22 @@ def test_get_roles_in_succinct_roles(self) -> None: expected_bin_suffix = f"{bin_numer:0{expected_suffix_length}x}" self.assertEqual(role_name, f"bin-{expected_bin_suffix}") + def test_rotate_generate(self) -> None: + # create a valid rotate file + rotate = Rotate(0, "timestamp", {}, 1) + + # version automatically set to 0 + rotate = Rotate(None, "timestamp", {}, 1) + self.assertEqual(rotate.version, 0) + + # version must not be negative + with self.assertRaises(ValueError): + rotate = Rotate(-1, "timestamp", {}, 1) + + # rotate file needs a role + with self.assertRaises(ValueError): + rotate = Rotate(0, None, {}, 1) + # Run unit test. if __name__ == "__main__": diff --git a/tests/test_metadata_eq_.py b/tests/test_metadata_eq_.py index a3b3f9fd91..b54ba5fe7b 100644 --- a/tests/test_metadata_eq_.py +++ b/tests/test_metadata_eq_.py @@ -23,6 +23,7 @@ Metadata, MetaFile, Role, + Rotate, SuccinctRoles, TargetFile, ) @@ -63,6 +64,9 @@ def setUpClass(cls) -> None: cls.objects["TargetFile"] = TargetFile( 1, {"sha256": "abc"}, "file1.txt" ) + cls.objects["Rotate"] = Rotate( + 0, "timestamp", {"keyid": cls.objects["Key"]}, 1 + ) # Keys are class names. # Values are dictionaries containing attribute names and their new values. @@ -85,6 +89,7 @@ def setUpClass(cls) -> None: "Delegations": {"keys": {}, "roles": {}}, "TargetFile": {"length": 0, "hashes": {}, "path": ""}, "Targets": {"targets": {}, "delegations": []}, + "Rotate": {"version": 1, "role": "", "keys": {}, "threshold": 0}, } @utils.run_sub_tests_with_dataset(classes_attributes_modifications) diff --git a/tests/test_rotate_files.py b/tests/test_rotate_files.py new file mode 100644 index 0000000000..e9411e72e9 --- /dev/null +++ b/tests/test_rotate_files.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python + +# Copyright 2023, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +""" Test ngclient handling of rotate files""" + +import os +import sys +import tempfile +import unittest +from typing import ClassVar, List, Optional + +from securesystemslib.signer import SSlibSigner + +from tests import utils +from tests.repository_simulator import RepositorySimulator +from tuf.api import exceptions +from tuf.api.metadata import Key +from tuf.ngclient import Updater + + +class TestRotateFiles(unittest.TestCase): + """Test ngclient handling of rotate files""" + + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + temp_dir: ClassVar[tempfile.TemporaryDirectory] + keys: ClassVar[List[Key]] + signers: ClassVar[List[SSlibSigner]] + + @classmethod + def setUpClass(cls) -> None: + # pylint: disable-next=consider-using-with + cls.temp_dir = tempfile.TemporaryDirectory() + + # pre-create keys and signers + cls.keys = [] + cls.signers = [] + for _ in range(10): + key, signer = RepositorySimulator.create_key() + cls.keys.append(key) + cls.signers.append(signer) + + @classmethod + def tearDownClass(cls) -> None: + cls.temp_dir.cleanup() + + def setUp(self) -> None: + self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") + self.targets_dir = os.path.join(self.temp_dir.name, "targets") + os.mkdir(self.metadata_dir) + os.mkdir(self.targets_dir) + + self.sim = RepositorySimulator() + with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: + f.write(self.sim.signed_roots[0]) + + if self.dump_dir is not None: + # create subtest dumpdir + # pylint: disable=no-member + name = f"{self.id().split('.')[-1]}-{self.case_name}" + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) + + def _init_updater(self) -> Updater: + """Creates a new updater instance.""" + if self.sim.dump_dir is not None: + self.sim.write() + + updater = Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + self.sim, + ) + return updater + + def test_read_rotate_file(self) -> None: + root = self.sim.root + new_keyids = root.roles["snapshot"].keyids + new_keys = {k: v for (k, v) in root.keys.items() if k in new_keyids} + self.sim.add_rotate_file( + "timestamp", new_keys, 1, self.sim.signers["timestamp"] + ) + self.sim.update_snapshot() + + updater = self._init_updater() + with self.assertRaises(exceptions.UnsignedMetadataError): + updater.refresh() + + old_keyids = root.roles["timestamp"].keyids + old_keys = {k: v for (k, v) in root.keys.items() if k in old_keyids} + self.sim.add_rotate_file( + "timestamp", old_keys, 1, self.sim.signers["snapshot"] + ) + updater.refresh() + + +if __name__ == "__main__": + if "--dump" in sys.argv: + TestRotateFiles.dump_dir = tempfile.mkdtemp() + print(f"Repository dumps in {TestRotateFiles.dump_dir}") + sys.argv.remove("--dump") + + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index d6ef50f35d..42aff6def5 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -18,12 +18,16 @@ Metadata, MetaFile, Root, + Rotate, Snapshot, Targets, Timestamp, ) from tuf.api.serialization.json import JSONSerializer -from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet +from tuf.ngclient._internal.trusted_metadata_set import ( + TrustedMetadataSet, + verify_helper, +) logger = logging.getLogger(__name__) @@ -112,19 +116,19 @@ def _update_all_besides_targets( """ timestamp_bytes = timestamp_bytes or self.metadata[Timestamp.type] - self.trusted_set.update_timestamp(timestamp_bytes) + self.trusted_set.update_timestamp(timestamp_bytes, []) snapshot_bytes = snapshot_bytes or self.metadata[Snapshot.type] - self.trusted_set.update_snapshot(snapshot_bytes) + self.trusted_set.update_snapshot(snapshot_bytes, []) def test_update(self) -> None: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) + self.trusted_set.update_targets(self.metadata[Targets.type], []) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", Targets.type + self.metadata["role1"], "role1", Targets.type, [] ) self.trusted_set.update_delegated_targets( - self.metadata["role2"], "role2", "role1" + self.metadata["role2"], "role2", "role1", [] ) # the 4 top level metadata objects + 2 additional delegated targets self.assertTrue(len(self.trusted_set), 6) @@ -136,17 +140,115 @@ def test_update(self) -> None: self.assertTrue(count, 6) + def test_update_tap8(self) -> None: + root = Metadata.from_bytes(self.metadata[Root.type]) + + new_keyids = root.signed.roles["snapshot"].keyids + new_keys = { + k: v for (k, v) in root.signed.keys.items() if k in new_keyids + } + inner_rotate = Rotate(0, "timestamp", new_keys, 1) + rotate_file = Metadata(inner_rotate) + rotate_file.sign(self.keystore["timestamp"]) + encoded_rotate_file = rotate_file.to_bytes() + with self.assertRaises(exceptions.UnsignedMetadataError): + self.trusted_set.update_timestamp( + self.metadata[Timestamp.type], [encoded_rotate_file] + ) + + old_keyids = root.signed.roles["timestamp"].keyids + old_keys = { + k: v for (k, v) in root.signed.keys.items() if k in old_keyids + } + inner_rotate2 = Rotate(1, "timestamp", old_keys, 1) + rotate_file2 = Metadata(inner_rotate2) + rotate_file2.sign(self.keystore["snapshot"]) + encoded_rotate_file2 = rotate_file2.to_bytes() + self.trusted_set.update_timestamp( + self.metadata[Timestamp.type], + [encoded_rotate_file, encoded_rotate_file2], + ) + + def test_verify_helper(self) -> None: + snapshot = Metadata.from_bytes(self.metadata[Snapshot.type]) + root = Metadata.from_bytes(self.metadata[Root.type]) + + # role doesn't match + new_keyids = root.signed.roles["timestamp"].keyids + new_keys = { + k: v for (k, v) in root.signed.keys.items() if k in new_keyids + } + inner_rotate = Rotate(0, "timestamp", new_keys, 1) + rotate_file = Metadata(inner_rotate) + rotate_file.sign(self.keystore["snapshot"]) + encoded_rotate_file = rotate_file.to_bytes() + with self.assertRaises(exceptions.RepositoryError): + verify_helper(root, [encoded_rotate_file], Snapshot.type, snapshot) + + # version doesn't match + inner_rotate = Rotate(1, "snapshot", new_keys, 1) + rotate_file = Metadata(inner_rotate) + rotate_file.sign(self.keystore["snapshot"]) + encoded_rotate_file = rotate_file.to_bytes() + with self.assertRaises(exceptions.DownloadError): + verify_helper(root, [encoded_rotate_file], Snapshot.type, snapshot) + + # invalid rotate file (signed with the wrong keys) + # first a correct rotate file to change the keys + inner_rotate = Rotate(0, "snapshot", new_keys, 1) + rotate_file = Metadata(inner_rotate) + rotate_file.sign(self.keystore["snapshot"]) + encoded_rotate_file = rotate_file.to_bytes() + with self.assertRaises(exceptions.UnsignedMetadataError): + verify_helper(root, [encoded_rotate_file], Snapshot.type, snapshot) + + # now an invalid rotate file + old_keyids = root.signed.roles["snapshot"].keyids + old_keys = { + k: v for (k, v) in root.signed.keys.items() if k in old_keyids + } + inner_rotate2 = Rotate(1, "snapshot", old_keys, 1) + rotate_file2 = Metadata(inner_rotate2) + rotate_file2.sign(self.keystore["snapshot"]) + encoded_rotate_file2 = rotate_file2.to_bytes() + + # this will fail as the second rotation was invalid, snapshot is being checked with timestamp keys + with self.assertRaises(exceptions.UnsignedMetadataError): + verify_helper( + root, + [encoded_rotate_file, encoded_rotate_file2], + Snapshot.type, + snapshot, + ) + + def test_rotate_to_null(self) -> None: + root = Metadata.from_bytes(self.metadata[Root.type]) + timestamp = Metadata.from_bytes(self.metadata[Timestamp.type]) + + # Rotate to null + inner_rotate = Rotate(0, "timestamp", {}, 1) + rotate_file = Metadata(inner_rotate) + rotate_file.sign(self.keystore["timestamp"]) + encoded_rotate_file = rotate_file.to_bytes() + + with self.assertRaises(exceptions.UnsignedMetadataError): + verify_helper( + root, [encoded_rotate_file], Timestamp.type, timestamp + ) + def test_update_metadata_output(self) -> None: timestamp = self.trusted_set.update_timestamp( - self.metadata["timestamp"] + self.metadata["timestamp"], [] ) - snapshot = self.trusted_set.update_snapshot(self.metadata["snapshot"]) - targets = self.trusted_set.update_targets(self.metadata["targets"]) + snapshot = self.trusted_set.update_snapshot( + self.metadata["snapshot"], [] + ) + targets = self.trusted_set.update_targets(self.metadata["targets"], []) delegeted_targets_1 = self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" + self.metadata["role1"], "role1", "targets", [] ) delegeted_targets_2 = self.trusted_set.update_delegated_targets( - self.metadata["role2"], "role2", "role1" + self.metadata["role2"], "role2", "role1", [] ) self.assertIsInstance(timestamp.signed, Timestamp) self.assertIsInstance(snapshot.signed, Snapshot) @@ -157,9 +259,9 @@ def test_update_metadata_output(self) -> None: def test_out_of_order_ops(self) -> None: # Update snapshot before timestamp with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) # Update root after timestamp with self.assertRaises(RuntimeError): @@ -167,28 +269,28 @@ def test_out_of_order_ops(self) -> None: # Update targets before snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata[Targets.type], []) - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) # update timestamp after snapshot with self.assertRaises(RuntimeError): - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) # Update delegated targets before targets with self.assertRaises(RuntimeError): self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", Targets.type + self.metadata["role1"], "role1", Targets.type, [] ) - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata[Targets.type], []) # Update snapshot after sucessful targets update with self.assertRaises(RuntimeError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) self.trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", Targets.type + self.metadata["role1"], "role1", Targets.type, [] ) def test_root_with_invalid_json(self) -> None: @@ -218,18 +320,18 @@ def test_top_level_md_with_invalid_json(self) -> None: md = Metadata.from_bytes(metadata) # metadata is not json with self.assertRaises(exceptions.RepositoryError): - update_func(b"") + update_func(b"", []) # metadata is invalid md.signed.version += 1 with self.assertRaises(exceptions.UnsignedMetadataError): - update_func(md.to_bytes()) + update_func(md.to_bytes(), []) # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - update_func(self.metadata[Root.type]) + update_func(self.metadata[Root.type], []) - update_func(metadata) + update_func(metadata, []) def test_update_root_new_root(self) -> None: # test that root can be updated with a new valid version @@ -262,7 +364,7 @@ def root_expired_modifier(root: Root) -> None: tmp_trusted_set = TrustedMetadataSet(root) # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): - tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type]) + tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type], []) def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self) -> None: # new_timestamp.version < trusted_timestamp.version @@ -270,19 +372,21 @@ def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 timestamp = self.modify_metadata(Timestamp.type, version_modifier) - self.trusted_set.update_timestamp(timestamp) + self.trusted_set.update_timestamp(timestamp, []) with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) def test_update_timestamp_with_same_timestamp(self) -> None: # Test that timestamp is NOT updated if: # new_timestamp.version == trusted_timestamp.version - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) initial_timestamp = self.trusted_set.timestamp # Update timestamp with the same version. with self.assertRaises(exceptions.EqualVersionNumberError): - self.trusted_set.update_timestamp((self.metadata[Timestamp.type])) + self.trusted_set.update_timestamp( + (self.metadata[Timestamp.type]), [] + ) # Every object has a unique id() if they are equal, this means timestamp # was not updated. @@ -296,11 +400,11 @@ def bump_snapshot_version(timestamp: Timestamp) -> None: # set current known snapshot.json version to 2 timestamp = self.modify_metadata(Timestamp.type, bump_snapshot_version) - self.trusted_set.update_timestamp(timestamp) + self.trusted_set.update_timestamp(timestamp, []) # newtimestamp.meta.version < trusted_timestamp.meta.version with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) def test_update_timestamp_expired(self) -> None: # new_timestamp has expired @@ -312,11 +416,11 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: Timestamp.type, timestamp_expired_modifier ) with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_timestamp(timestamp) + self.trusted_set.update_timestamp(timestamp, []) # snapshot update does start but fails because timestamp is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) def test_update_snapshot_length_or_hash_mismatch(self) -> None: def modify_snapshot_length(timestamp: Timestamp) -> None: @@ -324,17 +428,17 @@ def modify_snapshot_length(timestamp: Timestamp) -> None: # set known snapshot.json length to 1 timestamp = self.modify_metadata(Timestamp.type, modify_snapshot_length) - self.trusted_set.update_timestamp(timestamp) + self.trusted_set.update_timestamp(timestamp, []) with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) def test_update_snapshot_fail_threshold_verification(self) -> None: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) snapshot = Metadata.from_bytes(self.metadata[Snapshot.type]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): - self.trusted_set.update_snapshot(snapshot.to_bytes()) + self.trusted_set.update_snapshot(snapshot.to_bytes(), []) def test_update_snapshot_version_diverge_timestamp_snapshot_version( self, @@ -345,15 +449,15 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp = self.modify_metadata( Timestamp.type, timestamp_version_modifier ) - self.trusted_set.update_timestamp(timestamp) + self.trusted_set.update_timestamp(timestamp, []) # if intermediate snapshot version is incorrect, load it but also raise with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) # targets update starts but fails if snapshot version does not match with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata[Targets.type], []) def test_update_snapshot_file_removed_from_meta(self) -> None: self._update_all_besides_targets(self.metadata[Timestamp.type]) @@ -364,22 +468,22 @@ def remove_file_from_meta(snapshot: Snapshot) -> None: # Test removing a meta_file in new_snapshot compared to the old snapshot snapshot = self.modify_metadata(Snapshot.type, remove_file_from_meta) with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_snapshot(snapshot) + self.trusted_set.update_snapshot(snapshot, []) def test_update_snapshot_meta_version_decreases(self) -> None: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) def version_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta["targets.json"].version += 1 snapshot = self.modify_metadata(Snapshot.type, version_meta_modifier) - self.trusted_set.update_snapshot(snapshot) + self.trusted_set.update_snapshot(snapshot, []) with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) def test_update_snapshot_expired_new_snapshot(self) -> None: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) @@ -389,11 +493,11 @@ def snapshot_expired_modifier(snapshot: Snapshot) -> None: Snapshot.type, snapshot_expired_modifier ) with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(snapshot) + self.trusted_set.update_snapshot(snapshot, []) # targets update does start but fails because snapshot is expired with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata[Targets.type], []) def test_update_snapshot_successful_rollback_checks(self) -> None: def meta_version_bump(timestamp: Timestamp) -> None: @@ -405,19 +509,19 @@ def version_bump(snapshot: Snapshot) -> None: snapshot.version += 1 # load a "local" timestamp, then update to newer one: - self.trusted_set.update_timestamp(self.metadata[Timestamp.type]) + self.trusted_set.update_timestamp(self.metadata[Timestamp.type], []) new_timestamp = self.modify_metadata(Timestamp.type, meta_version_bump) - self.trusted_set.update_timestamp(new_timestamp) + self.trusted_set.update_timestamp(new_timestamp, []) # load a "local" snapshot with mismatching version (loading happens but # BadVersionNumberError is raised), then update to newer one: with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata[Snapshot.type]) + self.trusted_set.update_snapshot(self.metadata[Snapshot.type], []) new_snapshot = self.modify_metadata(Snapshot.type, version_bump) - self.trusted_set.update_snapshot(new_snapshot) + self.trusted_set.update_snapshot(new_snapshot, []) # update targets to trigger final snapshot meta version check - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata[Targets.type], []) def test_update_targets_no_meta_in_snapshot(self) -> None: def no_meta_modifier(snapshot: Snapshot) -> None: @@ -429,7 +533,7 @@ def no_meta_modifier(snapshot: Snapshot) -> None: ) # remove meta information with information about targets from snapshot with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata[Targets.type], []) def test_update_targets_hash_diverge_from_snapshot_meta_hash(self) -> None: def meta_length_modifier(snapshot: Snapshot) -> None: @@ -442,7 +546,7 @@ def meta_length_modifier(snapshot: Snapshot) -> None: ) # observed_hash != stored hash in snapshot meta for targets with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata[Targets.type], []) def test_update_targets_version_diverge_snapshot_meta_version(self) -> None: def meta_modifier(snapshot: Snapshot) -> None: @@ -455,7 +559,7 @@ def meta_modifier(snapshot: Snapshot) -> None: ) # new_delegate.signed.version != meta.version stored in snapshot with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_targets(self.metadata[Targets.type]) + self.trusted_set.update_targets(self.metadata[Targets.type], []) def test_update_targets_expired_new_target(self) -> None: self._update_all_besides_targets() @@ -465,7 +569,7 @@ def target_expired_modifier(target: Targets) -> None: targets = self.modify_metadata(Targets.type, target_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_targets(targets) + self.trusted_set.update_targets(targets, []) # TODO test updating over initial metadata (new keys, newer timestamp, etc) diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index e4bab8a8c7..b17c1075b0 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -106,8 +106,11 @@ def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None: "calls": [ ("root", 3), ("timestamp", None), + ("rotate/timestamp.rotate.0", None), ("snapshot", None), + ("rotate/snapshot.rotate.0", None), ("targets", None), + ("rotate/targets.rotate.0", None), ], }, "consistent_snaphot enabled": { @@ -115,8 +118,11 @@ def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None: "calls": [ ("root", 3), ("timestamp", None), + ("rotate/timestamp.rotate.0", None), ("snapshot", 1), + ("rotate/snapshot.rotate.0", None), ("targets", 1), + ("rotate/targets.rotate.0", None), ], }, } @@ -166,7 +172,10 @@ def test_delegated_roles_update( consistent_snapshot: bool = test_case_data["consistent_snapshot"] exp_version: Optional[int] = test_case_data["expected_version"] rolenames = ["role1", "..", "."] - exp_calls = [(role, exp_version) for role in rolenames] + exp_calls = [] + for role in rolenames: + exp_calls.append((role, exp_version)) + exp_calls.append((f"rotate/{role}.rotate.0", None)) self.setup_subtest(consistent_snapshot) # Add new delegated targets diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index ca04621da0..2f17e621c5 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -11,7 +11,7 @@ import tempfile import unittest from dataclasses import astuple, dataclass, field -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Tuple from tests import utils from tests.repository_simulator import RepositorySimulator @@ -264,7 +264,10 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: try: exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] - exp_calls = [(role, 1) for role in test_data.visited_order] + exp_calls: List[Tuple[str, Optional[int]]] = [] + for role in test_data.visited_order: + exp_calls.append((role, 1)) + exp_calls.append((f"rotate/{role}.rotate.0", None)) self._init_repo(test_data) self.setup_subtest() @@ -312,7 +315,10 @@ def test_invalid_metadata(self, test_data: DelegationsTestCase) -> None: self.setup_subtest() # The invalid role metadata must not be persisted exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order[:-1]] - exp_calls = [(role, 1) for role in test_data.visited_order] + exp_calls: List[Tuple[str, Optional[int]]] = [] + for role in test_data.visited_order: + exp_calls.append((role, 1)) + exp_calls.append((f"rotate/{role}.rotate.0", None)) updater = self._init_updater() # Call explicitly refresh to simplify the expected_calls list @@ -359,7 +365,10 @@ def test_safely_encoded_rolenames(self) -> None: self.assertTrue(fname in local_metadata) # assert that requested URLs are quoted without extension - exp_calls = [(quoted[:-5], 1) for quoted in roles_to_filenames.values()] + exp_calls: List[Tuple[str, Optional[int]]] = [] + for quoted in roles_to_filenames.values(): + exp_calls.append((quoted[:-5], 1)) + exp_calls.append((f"rotate/{quoted[:-5]}.rotate.0", None)) self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) hash_bins_graph: utils.DataSet = { @@ -398,7 +407,10 @@ def test_hash_bins_graph_traversal( try: exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] - exp_calls = [(role, 1) for role in test_data.visited_order] + exp_calls: List[Tuple[str, Optional[int]]] = [] + for role in test_data.visited_order: + exp_calls.append((role, 1)) + exp_calls.append((f"rotate/{role}.rotate.0", None)) self._init_repo(test_data) self.setup_subtest() @@ -482,7 +494,10 @@ def test_succinct_roles_graph_traversal( try: exp_files = [*TOP_LEVEL_ROLE_NAMES, test_data.expected_target_bin] - exp_calls = [(test_data.expected_target_bin, 1)] + exp_calls = [ + (test_data.expected_target_bin, 1), + (f"rotate/{test_data.expected_target_bin}.rotate.0", None), + ] self.sim = RepositorySimulator() self.sim.add_succinct_roles("targets", test_data.bit_length, "bin") @@ -565,7 +580,10 @@ def test_targetfile_search(self, test_data: TargetTestCase) -> None: try: self.setup_subtest() exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] - exp_calls = [(role, 1) for role in test_data.visited_order] + exp_calls: List[Tuple[str, Optional[int]]] = [] + for role in test_data.visited_order: + exp_calls.append((role, 1)) + exp_calls.append((f"rotate/{role}.rotate.0", None)) exp_target = self.sim.target_files[test_data.targetpath].target_file updater = self._init_updater() diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index be6ce09d27..b3dd900c5a 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -737,7 +737,15 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None: ] ) - expected_calls = [("root", 2), ("timestamp", None)] + expected_calls = [ + ("root", 2), + ("rotate/timestamp.rotate.0", None), + ("timestamp", None), + ("rotate/timestamp.rotate.0", None), + ("rotate/snapshot.rotate.0", None), + ("rotate/targets.rotate.0", None), + ("rotate/role1.rotate.0", None), + ] self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls) @patch.object(datetime, "datetime", wraps=datetime.datetime) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 9172211fb8..fd6f002e77 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -69,6 +69,9 @@ _SNAPSHOT = "snapshot" _TARGETS = "targets" _TIMESTAMP = "timestamp" +_ROTATE = "rotate" + +ROTATE_NULL: Dict[str, Any] = {} # pylint: disable=too-many-lines @@ -80,7 +83,7 @@ TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS} # T is a Generic type constraint for Metadata.signed -T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") +T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets", "Rotate") class Metadata(Generic[T]): @@ -180,6 +183,8 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": inner_cls = Timestamp elif _type == _ROOT: inner_cls = Root + elif _type == _ROTATE: + inner_cls = Rotate else: raise ValueError(f'unrecognized metadata type "{_type}"') @@ -417,6 +422,9 @@ def verify_delegate( if isinstance(self.signed, Root): keys = self.signed.keys role = self.signed.roles.get(delegated_role) + elif isinstance(self.signed, Rotate): + keys = self.signed.keys + role = Role(list(keys.keys()), self.signed.threshold) elif isinstance(self.signed, Targets): if self.signed.delegations is None: raise ValueError(f"No delegation found for {delegated_role}") @@ -993,6 +1001,107 @@ def revoke_key(self, keyid: str, role: str) -> None: del self.keys[keyid] +# pylint: disable=super-init-not-called +class Rotate(Signed): + """A class for the rotate file defined in TAP 8 + + Parameters listed below are also instance attributes. + + Args: + previous: the name of the previous rotate file. Default is empty strong + role: the role this file is assosiated with. + keys: Dictionary of keyids to Keys for the new keys associated with the role. Default is null key + threshold: threshold of required keys. Default is 1 + + Raises: + ValueError: Invalid arguments. + """ + + type = _ROTATE + + def __init__( + self, + version: Optional[int] = None, + role: Optional[str] = None, + keys: Optional[Dict[str, Key]] = None, + threshold: Optional[int] = None, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + if version is None: + version = 0 + elif version < 0: + raise ValueError(f"version must be >= 0, got {version}") + self.version = version + + if role is None: + raise ValueError("rotate file must have an associated role") + self.role = role + + self.keys = keys if keys is not None else ROTATE_NULL + + self.threshold = threshold if threshold is not None else 1 + + @property + def _type(self) -> str: + return self.type + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Rotate): + return False + + return ( + self.version == other.version + and self.role == other.role + and self.keys == other.keys + and self.threshold == other.threshold + and self.unrecognized_fields == other.unrecognized_fields + ) + + def is_null(self) -> bool: + return self.keys == ROTATE_NULL + + @classmethod + def from_dict(cls, signed_dict: Dict[str, Any]) -> "Rotate": + """Create ``Rotate`` object from its json/dict representation. + + Raises: + ValueError, KeyError, TypeError: Invalid arguments. + """ + _type = signed_dict.pop("_type") + if _type != cls.type: + raise ValueError(f"Expected type {cls.type}, got {_type}") + + version = signed_dict.pop("version", None) + role = signed_dict.pop("role", None) + keys = signed_dict.pop("keys", None) + + for keyid, key_dict in keys.items(): + keys[keyid] = Key.from_dict(keyid, key_dict) + + threshold = signed_dict.pop("threshold", None) + + # All fields left in the signed_dict are unrecognized. + return cls(version, role, keys, threshold, signed_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the dict representation of self.""" + return { + "_type": self._type, + "version": self.version, + "role": self.role, + "keys": { + keyid: key.to_dict() for (keyid, key) in self.keys.items() + }, + "threshold": self.threshold, + **self.unrecognized_fields, + } + + class BaseFile: """A base class of ``MetaFile`` and ``TargetFile``. @@ -1288,6 +1397,25 @@ def to_dict(self) -> Dict[str, Any]: snapshot_dict["meta"] = meta_dict return snapshot_dict + def verify_rotate_files(self, role: str, rotate_files: List[bytes]) -> None: + """Verify that rotate files are included in snapshot""" + in_snapshot = [] + for key in self.meta: + if key.startswith(role + ".rotate."): + in_snapshot.append(key) + + if len(in_snapshot) > len(rotate_files): + raise exceptions.DownloadError("missing rotate file") + if len(in_snapshot) < len(rotate_files): + raise exceptions.DownloadError("extra rotate file found") + + # check that we have the right set of rotate files + for s in range(len(in_snapshot)): + if f".rotate.{s}" not in in_snapshot: + raise exceptions.DownloadError( + "rotate files in snapshot have non-continuous versions" + ) + class DelegatedRole(Role): """A container with information about a delegated role. diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index dae06b13be..55a61108e9 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -62,14 +62,53 @@ import datetime import logging from collections import abc -from typing import Dict, Iterator, Optional +from typing import Dict, Iterator, List, Optional from tuf.api import exceptions -from tuf.api.metadata import Metadata, Root, Snapshot, Targets, Timestamp +from tuf.api.metadata import ( + Metadata, + Root, + Rotate, + Snapshot, + Targets, + Timestamp, +) logger = logging.getLogger(__name__) +def verify_helper( + delegator: Metadata, + rotate_files: Optional[List[bytes]], + delegated_role: str, + delegated_metadata: Metadata, +) -> None: + """Helper function to call verify_delegate on all rotate files.""" + if rotate_files is None or len(rotate_files) == 0: + delegator.verify_delegate(delegated_role, delegated_metadata) + else: + parent = delegator + rotate_version = 0 + for r in rotate_files: + rotate = Metadata[Rotate].from_bytes(r) + if rotate.signed.version != rotate_version: + raise exceptions.DownloadError( + "Rotate file version does not match filename" + ) + rotate_version = rotate_version + 1 + if rotate.signed.is_null(): + raise exceptions.UnsignedMetadataError("Rotation to null") + try: + parent.verify_delegate(delegated_role, rotate) + except exceptions.UnsignedMetadataError: + # invalid rotate file, skip all remaining rotate files + break + if rotate.signed.role != delegated_role: + raise exceptions.RepositoryError("invalid rotate file") + parent = rotate + parent.verify_delegate(delegated_role, delegated_metadata) + + class TrustedMetadataSet(abc.Mapping): """Internal class to keep track of trusted metadata in ``Updater`` @@ -177,7 +216,9 @@ def update_root(self, data: bytes) -> Metadata[Root]: return new_root - def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: + def update_timestamp( + self, data: bytes, rotate_files: Optional[List[bytes]] + ) -> Metadata[Timestamp]: """Verify and load ``data`` as new timestamp metadata. Note that an intermediate timestamp is allowed to be expired: @@ -215,7 +256,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: f"Expected 'timestamp', got '{new_timestamp.signed.type}'" ) - self.root.verify_delegate(Timestamp.type, new_timestamp) + verify_helper(self.root, rotate_files, Timestamp.type, new_timestamp) # If an existing trusted timestamp is updated, # check for a rollback attack @@ -258,7 +299,10 @@ def _check_final_timestamp(self) -> None: raise exceptions.ExpiredMetadataError("timestamp.json is expired") def update_snapshot( - self, data: bytes, trusted: Optional[bool] = False + self, + data: bytes, + rotate_files: Optional[List[bytes]], + trusted: Optional[bool] = False, ) -> Metadata[Snapshot]: """Verify and load ``data`` as new snapshot metadata. @@ -311,7 +355,7 @@ def update_snapshot( f"Expected 'snapshot', got '{new_snapshot.signed.type}'" ) - self.root.verify_delegate(Snapshot.type, new_snapshot) + verify_helper(self.root, rotate_files, Snapshot.type, new_snapshot) # version not checked against meta version to allow old snapshot to be # used in rollback protection: it is checked when targets is updated @@ -359,7 +403,9 @@ def _check_final_snapshot(self) -> None: f"got {self.snapshot.signed.version}" ) - def update_targets(self, data: bytes) -> Metadata[Targets]: + def update_targets( + self, data: bytes, rotate_files: Optional[List[bytes]] + ) -> Metadata[Targets]: """Verify and load ``data`` as new top-level targets metadata. Args: @@ -372,10 +418,16 @@ def update_targets(self, data: bytes) -> Metadata[Targets]: Returns: Deserialized and verified targets ``Metadata`` object """ - return self.update_delegated_targets(data, Targets.type, Root.type) + return self.update_delegated_targets( + data, Targets.type, Root.type, rotate_files + ) def update_delegated_targets( - self, data: bytes, role_name: str, delegator_name: str + self, + data: bytes, + role_name: str, + delegator_name: str, + rotate_files: Optional[List[bytes]], ) -> Metadata[Targets]: """Verify and load ``data`` as new metadata for target ``role_name``. @@ -414,6 +466,10 @@ def update_delegated_targets( meta.verify_length_and_hashes(data) + if rotate_files is None: + rotate_files = [] + self.snapshot.signed.verify_rotate_files(role_name, rotate_files) + new_delegate = Metadata[Targets].from_bytes(data) if new_delegate.signed.type != Targets.type: @@ -421,7 +477,7 @@ def update_delegated_targets( f"Expected 'targets', got '{new_delegate.signed.type}'" ) - delegator.verify_delegate(role_name, new_delegate) + verify_helper(delegator, rotate_files, role_name, new_delegate) version = new_delegate.signed.version if version != meta.version: diff --git a/tuf/ngclient/config.py b/tuf/ngclient/config.py index e6213d0bed..a12c6d63be 100644 --- a/tuf/ngclient/config.py +++ b/tuf/ngclient/config.py @@ -8,6 +8,7 @@ @dataclass +# pylint: disable=too-many-instance-attributes class UpdaterConfig: """Used to store ``Updater`` configuration. @@ -33,3 +34,4 @@ class UpdaterConfig: snapshot_max_length: int = 2000000 # bytes targets_max_length: int = 5000000 # bytes prefix_targets_with_hash: bool = True + rotate_max_length: int = 2000000 # bytes diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 467a446fb4..a4d74588c1 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -41,7 +41,7 @@ import os import shutil import tempfile -from typing import Optional, Set +from typing import List, Optional, Set from urllib import parse from tuf.api import exceptions @@ -82,6 +82,7 @@ class Updater: RepositoryError: Local root.json is invalid """ + # pylint: disable=too-many-instance-attributes def __init__( self, metadata_dir: str, @@ -98,6 +99,7 @@ def __init__( self._target_base_url = None else: self._target_base_url = _ensure_trailing_slash(target_base_url) + self._rotate_dir = "rotate/" # Read trusted local root metadata data = self._load_local_metadata(Root.type) @@ -263,6 +265,21 @@ def download_target( logger.debug("Downloaded target %s", targetinfo.path) return filepath + def _download_rotate_files(self, rolename: str, length: int) -> List[bytes]: + """Download all rotate files for a role""" + encoded_name = parse.quote(rolename, "") + rotate_files = [] + for version in range(100): + url = f"{self._metadata_base_url}{self._rotate_dir}{encoded_name}.rotate.{version}.json" + try: + r = self._fetcher.download_bytes(url, length) + except exceptions.DownloadHTTPError: + # file not found + break + rotate_files.append(r) + + return rotate_files + def _download_metadata( self, rolename: str, length: int, version: Optional[int] = None ) -> bytes: @@ -333,7 +350,10 @@ def _load_timestamp(self) -> None: """Load local and remote timestamp metadata""" try: data = self._load_local_metadata(Timestamp.type) - self._trusted_set.update_timestamp(data) + rotate_files = self._download_rotate_files( + Timestamp.type, self.config.rotate_max_length + ) + self._trusted_set.update_timestamp(data, rotate_files) except (OSError, exceptions.RepositoryError) as e: # Local timestamp does not exist or is invalid logger.debug("Local timestamp not valid as final: %s", e) @@ -342,8 +362,11 @@ def _load_timestamp(self) -> None: data = self._download_metadata( Timestamp.type, self.config.timestamp_max_length ) + rotate_files = self._download_rotate_files( + Timestamp.type, self.config.rotate_max_length + ) try: - self._trusted_set.update_timestamp(data) + self._trusted_set.update_timestamp(data, rotate_files) except exceptions.EqualVersionNumberError: # If the new timestamp version is the same as current, discard the # new timestamp. This is normal and it shouldn't raise any error. @@ -355,7 +378,10 @@ def _load_snapshot(self) -> None: """Load local (and if needed remote) snapshot metadata""" try: data = self._load_local_metadata(Snapshot.type) - self._trusted_set.update_snapshot(data, trusted=True) + rotate_files = self._download_rotate_files( + Snapshot.type, self.config.rotate_max_length + ) + self._trusted_set.update_snapshot(data, rotate_files, trusted=True) logger.debug("Local snapshot is valid: not downloading new one") except (OSError, exceptions.RepositoryError) as e: # Local snapshot does not exist or is invalid: update from remote @@ -369,7 +395,10 @@ def _load_snapshot(self) -> None: version = snapshot_meta.version data = self._download_metadata(Snapshot.type, length, version) - self._trusted_set.update_snapshot(data) + rotate_files = self._download_rotate_files( + Snapshot.type, self.config.rotate_max_length + ) + self._trusted_set.update_snapshot(data, rotate_files) self._persist_metadata(Snapshot.type, data) def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: @@ -381,8 +410,11 @@ def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: try: data = self._load_local_metadata(role) + rotate_files = self._download_rotate_files( + role, self.config.rotate_max_length + ) delegated_targets = self._trusted_set.update_delegated_targets( - data, role, parent_role + data, role, parent_role, rotate_files ) logger.debug("Local %s is valid: not downloading new one", role) return delegated_targets @@ -405,8 +437,11 @@ def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: version = metainfo.version data = self._download_metadata(role, length, version) + rotate_files = self._download_rotate_files( + role, self.config.rotate_max_length + ) delegated_targets = self._trusted_set.update_delegated_targets( - data, role, parent_role + data, role, parent_role, rotate_files ) self._persist_metadata(role, data)