diff --git a/docs/api/tuf.api.metadata.supporting.rst b/docs/api/tuf.api.metadata.supporting.rst index 906e70e95c..c4771c90ac 100644 --- a/docs/api/tuf.api.metadata.supporting.rst +++ b/docs/api/tuf.api.metadata.supporting.rst @@ -13,6 +13,7 @@ ones (Root, Timestamp, Snapshot, Targets): tuf.api.metadata.MetaFile tuf.api.metadata.Role tuf.api.metadata.TargetFile + tuf.api.metadata.SuccinctRoles .. autoclass:: tuf.api.metadata.DelegatedRole @@ -25,3 +26,5 @@ ones (Root, Timestamp, Snapshot, Targets): .. autoclass:: tuf.api.metadata.Role .. autoclass:: tuf.api.metadata.TargetFile + +.. autoclass:: tuf.api.metadata.SuccinctRoles \ No newline at end of file diff --git a/docs/repository-library-design.md b/docs/repository-library-design.md index 5a9b0fde48..e5f166122f 100644 --- a/docs/repository-library-design.md +++ b/docs/repository-library-design.md @@ -63,7 +63,7 @@ are found, in the python-tuf library): ```python with repository.edit(“targets”) as targets: # adds a key for role1 (as an example, arbitrary edits are allowed) - targets.add_key(“role1”, key) + targets.add_key(key, “role1”) ``` This code loads current targets metadata for editing, adds the key to a role, diff --git a/examples/repo_example/basic_repo.py b/examples/repo_example/basic_repo.py index 8d61ba1a81..20ebe09ab0 100644 --- a/examples/repo_example/basic_repo.py +++ b/examples/repo_example/basic_repo.py @@ -157,7 +157,7 @@ def _in(days: float) -> datetime: for name in ["targets", "snapshot", "timestamp", "root"]: keys[name] = generate_ed25519_key() roles["root"].signed.add_key( - name, Key.from_securesystemslib_key(keys[name]) + Key.from_securesystemslib_key(keys[name]), name ) # NOTE: We only need the public part to populate root, so it is possible to use @@ -173,7 +173,7 @@ def _in(days: float) -> datetime: # required signature threshold. another_root_key = generate_ed25519_key() roles["root"].signed.add_key( - "root", Key.from_securesystemslib_key(another_root_key) + Key.from_securesystemslib_key(another_root_key), "root" ) roles["root"].signed.roles["root"].threshold = 2 @@ -343,9 +343,9 @@ def _in(days: float) -> datetime: # remains in place, it can be used to count towards the old and new threshold. new_root_key = generate_ed25519_key() -roles["root"].signed.remove_key("root", keys["root"]["keyid"]) +roles["root"].signed.revoke_key(keys["root"]["keyid"], "root") roles["root"].signed.add_key( - "root", Key.from_securesystemslib_key(new_root_key) + Key.from_securesystemslib_key(new_root_key), "root" ) roles["root"].signed.version += 1 diff --git a/examples/repo_example/hashed_bin_delegation.py b/examples/repo_example/hashed_bin_delegation.py index c8bc3b34b2..f287afe106 100644 --- a/examples/repo_example/hashed_bin_delegation.py +++ b/examples/repo_example/hashed_bin_delegation.py @@ -162,6 +162,7 @@ def find_hash_bin(path: str) -> str: # 10-17 10 11 12 13 14 15 16 17 # ... ... # f8-ff f8 f9 fa fb fc fd fe ff +assert roles["bins"].signed.delegations.roles is not None for bin_n_name, bin_n_hash_prefixes in generate_hash_bins(): # Update delegating targets role (bins) with delegation details for each # delegated targets role (bin_n). diff --git a/tests/generated_data/generate_md.py b/tests/generated_data/generate_md.py index 649e2bab74..fef33678d3 100644 --- a/tests/generated_data/generate_md.py +++ b/tests/generated_data/generate_md.py @@ -89,10 +89,10 @@ def generate_all_files( md_snapshot = Metadata(Snapshot(expires=EXPIRY)) md_targets = Metadata(Targets(expires=EXPIRY)) - md_root.signed.add_key("root", keys["ed25519_0"]) - md_root.signed.add_key("timestamp", keys["ed25519_1"]) - md_root.signed.add_key("snapshot", keys["ed25519_2"]) - md_root.signed.add_key("targets", keys["ed25519_3"]) + md_root.signed.add_key(keys["ed25519_0"], "root") + md_root.signed.add_key(keys["ed25519_1"], "timestamp") + md_root.signed.add_key(keys["ed25519_2"], "snapshot") + md_root.signed.add_key(keys["ed25519_3"], "targets") for i, md in enumerate([md_root, md_timestamp, md_snapshot, md_targets]): assert isinstance(md, Metadata) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index ae1ad3e6ca..abb7f37141 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -67,6 +67,7 @@ MetaFile, Root, Snapshot, + SuccinctRoles, TargetFile, Targets, Timestamp, @@ -169,7 +170,7 @@ def rotate_keys(self, role: str) -> None: self.signers[role].clear() for _ in range(0, self.root.roles[role].threshold): key, signer = self.create_key() - self.root.add_key(role, key) + self.root.add_key(key, role) self.add_signer(role, signer) def _initialize(self) -> None: @@ -182,7 +183,7 @@ def _initialize(self) -> None: for role in TOP_LEVEL_ROLE_NAMES: key, signer = self.create_key() - self.md_root.signed.add_key(role, key) + self.md_root.signed.add_key(key, role) self.add_signer(role, signer) self.publish_root() @@ -334,12 +335,16 @@ def update_snapshot(self) -> None: self.snapshot.version += 1 self.update_timestamp() + def _get_delegator(self, delegator_name: str) -> Targets: + """Given a delegator name return, its corresponding Targets object.""" + if delegator_name == Targets.type: + return self.targets + + return self.md_delegates[delegator_name].signed + def add_target(self, role: str, data: bytes, path: str) -> None: """Create a target from data and add it to the target_files.""" - if role == Targets.type: - targets = self.targets - else: - targets = self.md_delegates[role].signed + targets = self._get_delegator(role) target = TargetFile.from_data(path, data, ["sha256"]) targets.targets[path] = target @@ -349,26 +354,63 @@ def add_delegation( self, delegator_name: str, role: DelegatedRole, targets: Targets ) -> None: """Add delegated target role to the repository.""" - if delegator_name == Targets.type: - delegator = self.targets - else: - delegator = self.md_delegates[delegator_name].signed + delegator = self._get_delegator(delegator_name) + + if ( + delegator.delegations is not None + and delegator.delegations.succinct_roles is not None + ): + raise ValueError("Can't add a role when succinct_roles is used") # Create delegation if delegator.delegations is None: - delegator.delegations = Delegations({}, {}) + delegator.delegations = Delegations({}, roles={}) + + assert delegator.delegations.roles is not None # put delegation last by default delegator.delegations.roles[role.name] = role # By default add one new key for the role key, signer = self.create_key() - delegator.add_key(role.name, key) + delegator.add_key(key, role.name) self.add_signer(role.name, signer) # Add metadata for the role if role.name not in self.md_delegates: self.md_delegates[role.name] = Metadata(targets, {}) + def add_succinct_roles( + self, delegator_name: str, bit_length: int, name_prefix: str + ) -> None: + """Add succinct roles info to a delegator with name "delegator_name". + + Note that for each delegated role represented by succinct roles an empty + Targets instance is created. + """ + delegator = self._get_delegator(delegator_name) + + if ( + delegator.delegations is not None + and delegator.delegations.roles is not None + ): + raise ValueError( + "Can't add a succinct_roles when delegated roles are used" + ) + + key, signer = self.create_key() + succinct_roles = SuccinctRoles([], 1, bit_length, name_prefix) + delegator.delegations = Delegations({}, None, succinct_roles) + + # Add targets metadata for all bins. + for delegated_name in succinct_roles.get_roles(): + self.md_delegates[delegated_name] = Metadata( + Targets(expires=self.safe_expiry) + ) + + self.add_signer(delegated_name, signer) + + delegator.add_key(key) + def write(self) -> None: """Dump current repository metadata to self.dump_dir diff --git a/tests/test_api.py b/tests/test_api.py index a0eaf1e910..530ca55ad1 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -35,6 +35,7 @@ Metadata, Root, Snapshot, + SuccinctRoles, TargetFile, Targets, Timestamp, @@ -360,7 +361,7 @@ def test_metadata_verify_delegate(self) -> None: # Add a key to snapshot role, make sure the new sig fails to verify ts_keyid = next(iter(root.signed.roles[Timestamp.type].keyids)) - root.signed.add_key(Snapshot.type, root.signed.keys[ts_keyid]) + root.signed.add_key(root.signed.keys[ts_keyid], Snapshot.type) snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64) # verify succeeds if threshold is reached even if some signatures @@ -389,7 +390,7 @@ def test_key_class(self) -> None: with self.assertRaises(ValueError): Key.from_securesystemslib_key(sslib_key) - def test_root_add_key_and_remove_key(self) -> None: + def test_root_add_key_and_revoke_key(self) -> None: root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path) @@ -409,8 +410,12 @@ def test_root_add_key_and_remove_key(self) -> None: self.assertNotIn(keyid, root.signed.roles[Root.type].keyids) self.assertNotIn(keyid, root.signed.keys) + # Assert that add_key with old argument order will raise an error + with self.assertRaises(ValueError): + root.signed.add_key(Root.type, key_metadata) # type: ignore + # Add new root key - root.signed.add_key(Root.type, key_metadata) + root.signed.add_key(key_metadata, Root.type) # Assert that key is added self.assertIn(keyid, root.signed.roles[Root.type].keyids) @@ -422,30 +427,30 @@ def test_root_add_key_and_remove_key(self) -> None: # Try adding the same key again and assert its ignored. pre_add_keyid = root.signed.roles[Root.type].keyids.copy() - root.signed.add_key(Root.type, key_metadata) + root.signed.add_key(key_metadata, Root.type) self.assertEqual(pre_add_keyid, root.signed.roles[Root.type].keyids) # Add the same key to targets role as well - root.signed.add_key(Targets.type, key_metadata) + root.signed.add_key(key_metadata, Targets.type) # Add the same key to a nonexistent role. with self.assertRaises(ValueError): - root.signed.add_key("nosuchrole", key_metadata) + root.signed.add_key(key_metadata, "nosuchrole") # Remove the key from root role (targets role still uses it) - root.signed.remove_key(Root.type, keyid) + root.signed.revoke_key(keyid, Root.type) self.assertNotIn(keyid, root.signed.roles[Root.type].keyids) self.assertIn(keyid, root.signed.keys) # Remove the key from targets as well - root.signed.remove_key(Targets.type, keyid) + root.signed.revoke_key(keyid, Targets.type) self.assertNotIn(keyid, root.signed.roles[Targets.type].keyids) self.assertNotIn(keyid, root.signed.keys) with self.assertRaises(ValueError): - root.signed.remove_key(Root.type, "nosuchkey") + root.signed.revoke_key("nosuchkey", Root.type) with self.assertRaises(ValueError): - root.signed.remove_key("nosuchrole", keyid) + root.signed.revoke_key(keyid, "nosuchrole") def test_is_target_in_pathpattern(self) -> None: # pylint: disable=protected-access @@ -494,6 +499,7 @@ def test_targets_key_api(self) -> None: } ) assert isinstance(targets.delegations, Delegations) + assert isinstance(targets.delegations.roles, Dict) targets.delegations.roles["role2"] = delegated_role key_dict = { @@ -505,9 +511,13 @@ def test_targets_key_api(self) -> None: } key = Key.from_dict("id2", key_dict) + # Assert that add_key with old argument order will raise an error + with self.assertRaises(ValueError): + targets.add_key("role1", key) # type: ignore + # Assert that delegated role "role1" does not contain the new key self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids) - targets.add_key("role1", key) + targets.add_key(key, "role1") # Assert that the new key is added to the delegated role "role1" self.assertIn(key.keyid, targets.delegations.roles["role1"].keyids) @@ -517,46 +527,89 @@ def test_targets_key_api(self) -> None: # Try adding the same key again and assert its ignored. past_keyid = targets.delegations.roles["role1"].keyids.copy() - targets.add_key("role1", key) + targets.add_key(key, "role1") self.assertEqual(past_keyid, targets.delegations.roles["role1"].keyids) # Try adding a key to a delegated role that doesn't exists with self.assertRaises(ValueError): - targets.add_key("nosuchrole", key) + targets.add_key(key, "nosuchrole") # Add the same key to "role2" as well - targets.add_key("role2", key) + targets.add_key(key, "role2") # Remove the key from "role1" role ("role2" still uses it) - targets.remove_key("role1", key.keyid) + targets.revoke_key(key.keyid, "role1") # Assert that delegated role "role1" doesn't contain the key. self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids) self.assertIn(key.keyid, targets.delegations.roles["role2"].keyids) # Remove the key from "role2" as well - targets.remove_key("role2", key.keyid) + targets.revoke_key(key.keyid, "role2") self.assertNotIn(key.keyid, targets.delegations.roles["role2"].keyids) # Try remove key not used by "role1" with self.assertRaises(ValueError): - targets.remove_key("role1", key.keyid) + targets.revoke_key(key.keyid, "role1") # Try removing a key from delegated role that doesn't exists with self.assertRaises(ValueError): - targets.remove_key("nosuchrole", key.keyid) + targets.revoke_key(key.keyid, "nosuchrole") # Remove delegations as a whole targets.delegations = None - # Test that calling add_key and remove_key throws an error + # Test that calling add_key and revoke_key throws an error # and that delegations is still None after each of the api calls with self.assertRaises(ValueError): - targets.add_key("role1", key) + targets.add_key(key, "role1") self.assertTrue(targets.delegations is None) with self.assertRaises(ValueError): - targets.remove_key("role1", key.keyid) + targets.revoke_key(key.keyid, "role1") self.assertTrue(targets.delegations is None) + def test_targets_key_api_with_succinct_roles(self) -> None: + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") + targets: Targets = Metadata[Targets].from_file(targets_path).signed + key_dict = { + "keytype": "ed25519", + "keyval": { + "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" + }, + "scheme": "ed25519", + } + key = Key.from_dict("id2", key_dict) + + # Remove delegated roles. + assert targets.delegations is not None + assert targets.delegations.roles is not None + targets.delegations.roles = None + targets.delegations.keys = {} + + # Add succinct_roles information. + targets.delegations.succinct_roles = SuccinctRoles([], 1, 8, "foo") + self.assertEqual(len(targets.delegations.keys), 0) + self.assertEqual(len(targets.delegations.succinct_roles.keyids), 0) + + # Add a key to succinct_roles and verify it's saved. + targets.add_key(key) + self.assertIn(key.keyid, targets.delegations.keys) + self.assertIn(key.keyid, targets.delegations.succinct_roles.keyids) + self.assertEqual(len(targets.delegations.keys), 1) + + # Try adding the same key again and verify that noting is added. + targets.add_key(key) + self.assertEqual(len(targets.delegations.keys), 1) + + # Remove the key and verify it's not stored anymore. + targets.revoke_key(key.keyid) + self.assertNotIn(key.keyid, targets.delegations.keys) + self.assertNotIn(key.keyid, targets.delegations.succinct_roles.keyids) + self.assertEqual(len(targets.delegations.keys), 0) + + # Try removing it again. + with self.assertRaises(ValueError): + targets.revoke_key(key.keyid) + def test_length_and_hash_validation(self) -> None: # Test metadata files' hash and length verification. @@ -695,6 +748,31 @@ def test_is_delegated_role(self) -> None: self.assertFalse(role.is_delegated_path("a/non-matching path")) self.assertTrue(role.is_delegated_path("a/path")) + def test_is_delegated_role_in_succinct_roles(self) -> None: + succinct_roles = SuccinctRoles([], 1, 5, "bin") + false_role_name_examples = ["foo", "bin-", "bin-s", "bin-20", "bin-100"] + for role_name in false_role_name_examples: + msg = f"Error for {role_name}" + self.assertFalse(succinct_roles.is_delegated_role(role_name), msg) + + # delegated role name suffixes are in hex format. + true_name_examples = ["bin-00", "bin-0f", "bin-1f"] + for role_name in true_name_examples: + msg = f"Error for {role_name}" + self.assertTrue(succinct_roles.is_delegated_role(role_name), msg) + + def test_get_roles_in_succinct_roles(self) -> None: + succinct_roles = SuccinctRoles([], 1, 16, "bin") + # bin names are in hex format and 4 hex digits are enough to represent + # all bins between 0 and 2^16 - 1 meaning suffix_len must be 4 + expected_suffix_length = 4 + self.assertEqual(succinct_roles.suffix_len, expected_suffix_length) + for bin_numer, role_name in enumerate(succinct_roles.get_roles()): + # This adds zero-padding if the bin_numer is represented by a hex + # number with a length less than expected_suffix_length. + expected_bin_suffix = f"{bin_numer:0{expected_suffix_length}x}" + self.assertEqual(role_name, f"bin-{expected_bin_suffix}") + # Run unit test. if __name__ == "__main__": diff --git a/tests/test_metadata_eq_.py b/tests/test_metadata_eq_.py index e79815f0f7..a3b3f9fd91 100644 --- a/tests/test_metadata_eq_.py +++ b/tests/test_metadata_eq_.py @@ -23,6 +23,7 @@ Metadata, MetaFile, Role, + SuccinctRoles, TargetFile, ) @@ -55,6 +56,7 @@ def setUpClass(cls) -> None: cls.objects["Role"] = Role(["keyid1", "keyid2"], 3) cls.objects["MetaFile"] = MetaFile(1, 12, {"sha256": "abc"}) cls.objects["DelegatedRole"] = DelegatedRole("a", [], 1, False, ["d"]) + cls.objects["SuccinctRoles"] = SuccinctRoles(["keyid"], 1, 8, "foo") cls.objects["Delegations"] = Delegations( {"keyid": cls.objects["Key"]}, {"a": cls.objects["DelegatedRole"]} ) @@ -79,6 +81,7 @@ def setUpClass(cls) -> None: "paths": [""], "path_hash_prefixes": [""], }, + "SuccinctRoles": {"bit_length": 0, "name_prefix": ""}, "Delegations": {"keys": {}, "roles": {}}, "TargetFile": {"length": 0, "hashes": {}, "path": ""}, "Targets": {"targets": {}, "delegations": []}, @@ -166,6 +169,7 @@ def test_delegations_eq_roles_reversed_order(self) -> None: # Create a second delegations obj with reversed roles order delegations_2 = copy.deepcopy(delegations) # In python3.7 we need to cast to a list and then reverse. + assert isinstance(delegations.roles, dict) delegations_2.roles = dict(reversed(list(delegations.roles.items()))) # Both objects are not the equal because of delegated roles order. diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index a856ab5454..cec4fd0888 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -24,6 +24,7 @@ Role, Root, Snapshot, + SuccinctRoles, TargetFile, Targets, Timestamp, @@ -175,6 +176,7 @@ def test_invalid_key_serialization(self, test_case_data: str) -> None: "no threshold": '{"keyids": ["keyid"]}', "no keyids": '{"threshold": 3}', "wrong threshold type": '{"keyids": ["keyid"], "threshold": "a"}', + "wrong keyids type": '{"keyids": 1, "threshold": 3}', "threshold below 1": '{"keyids": ["keyid"], "threshold": 0}', "duplicate keyids": '{"keyids": ["keyid", "keyid"], "threshold": 3}', } @@ -408,6 +410,36 @@ def test_invalid_delegated_role_serialization( with self.assertRaises(ValueError): DelegatedRole.from_dict(case_dict) + valid_succinct_roles: utils.DataSet = { + # SuccinctRoles inherits Role and some use cases can be found in the valid_roles. + "standard succinct_roles information": '{"keyids": ["keyid"], "threshold": 1, \ + "bit_length": 8, "name_prefix": "foo"}', + "succinct_roles with unrecognized fields": '{"keyids": ["keyid"], "threshold": 1, \ + "bit_length": 8, "name_prefix": "foo", "foo": "bar"}', + } + + @utils.run_sub_tests_with_dataset(valid_succinct_roles) + def test_succinct_roles_serialization(self, test_case_data: str) -> None: + case_dict = json.loads(test_case_data) + succinct_roles = SuccinctRoles.from_dict(copy.copy(case_dict)) + self.assertDictEqual(case_dict, succinct_roles.to_dict()) + + invalid_succinct_roles: utils.DataSet = { + # SuccinctRoles inherits Role and some use cases can be found in the invalid_roles. + "missing bit_length from succinct_roles": '{"keyids": ["keyid"], "threshold": 1, "name_prefix": "foo"}', + "missing name_prefix from succinct_roles": '{"keyids": ["keyid"], "threshold": 1, "bit_length": 8}', + "succinct_roles with invalid bit_length type": '{"keyids": ["keyid"], "threshold": 1, "bit_length": "a", "name_prefix": "foo"}', + "succinct_roles with invalid name_prefix type": '{"keyids": ["keyid"], "threshold": 1, "bit_length": 8, "name_prefix": 1}', + "succinct_roles with high bit_length value": '{"keyids": ["keyid"], "threshold": 1, "bit_length": 50, "name_prefix": "foo"}', + "succinct_roles with low bit_length value": '{"keyids": ["keyid"], "threshold": 1, "bit_length": 0, "name_prefix": "foo"}', + } + + @utils.run_sub_tests_with_dataset(invalid_succinct_roles) + def test_invalid_succinct_roles_serialization(self, test_data: str) -> None: + case_dict = json.loads(test_data) + with self.assertRaises((ValueError, KeyError, TypeError)): + SuccinctRoles.from_dict(case_dict) + invalid_delegations: utils.DataSet = { "empty delegations": "{}", "missing keys": '{ "roles": [ \ @@ -459,6 +491,13 @@ def test_invalid_delegated_role_serialization( {"keyids": ["keyid1"], "name": "b", "terminating": true, "paths": ["fn1"], "threshold": 3}, \ {"keyids": ["keyid2"], "name": "root", "terminating": true, "paths": ["fn2"], "threshold": 4} ] \ }', + "roles and succinct_roles set": '{"keys": { \ + "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ + "keyid2" : {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"public": "bar"}}}, \ + "roles": [ \ + {"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3}, \ + {"keyids": ["keyid2"], "name": "b", "terminating": true, "paths": ["fn2"], "threshold": 4} ], \ + "succinct_roles": {"keyids": ["keyid"], "threshold": 1, "bit_length": 8, "name_prefix": "foo"}}', } @utils.run_sub_tests_with_dataset(invalid_delegations) @@ -470,13 +509,17 @@ def test_invalid_delegation_serialization( Delegations.from_dict(case_dict) valid_delegations: utils.DataSet = { - "all": '{"keys": { \ + "with roles": '{"keys": { \ "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ "keyid2" : {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"public": "bar"}}}, \ "roles": [ \ {"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3}, \ {"keyids": ["keyid2"], "name": "b", "terminating": true, "paths": ["fn2"], "threshold": 4} ] \ }', + "with succinct_roles": '{"keys": { \ + "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ + "keyid2" : {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"public": "bar"}}}, \ + "succinct_roles": {"keyids": ["keyid"], "threshold": 1, "bit_length": 8, "name_prefix": "foo"}}', "unrecognized field": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ "roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ], \ "foo": "bar"}', diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index b1ab219687..ca04621da0 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -425,6 +425,91 @@ def test_hash_bins_graph_traversal( finally: self.teardown_subtest() + @dataclass + class SuccinctRolesTestCase: + bit_length: int + target_path: str + expected_target_bin: str + + # By setting the bit_length the total number of bins is 2^bit_length. + # In each test case target_path is a path to a random target we want to + # fetch and expected_target_bin is the bin we are expecting to visit. + succinct_bins_graph: utils.DataSet = { + "bin amount = 2, taget bin index 0": SuccinctRolesTestCase( + bit_length=1, + target_path="boo", + expected_target_bin="bin-0", + ), + "bin amount = 2, taget bin index 1": SuccinctRolesTestCase( + bit_length=1, + target_path="too", + expected_target_bin="bin-1", + ), + "bin amount = 4, taget bin index 0": SuccinctRolesTestCase( + bit_length=2, + target_path="foo", + expected_target_bin="bin-0", + ), + "bin amount = 4, taget bin index 1": SuccinctRolesTestCase( + bit_length=2, + target_path="doo", + expected_target_bin="bin-1", + ), + "bin amount = 4, taget bin index 2": SuccinctRolesTestCase( + bit_length=2, + target_path="too", + expected_target_bin="bin-2", + ), + "bin amount = 4, taget bin index 3": SuccinctRolesTestCase( + bit_length=2, + target_path="bar", + expected_target_bin="bin-3", + ), + "bin amount = 256, taget bin index fc": SuccinctRolesTestCase( + bit_length=8, + target_path="bar", + expected_target_bin="bin-fc", + ), + } + + @utils.run_sub_tests_with_dataset(succinct_bins_graph) + def test_succinct_roles_graph_traversal( + self, test_data: SuccinctRolesTestCase + ) -> None: + # Test traversing the delegation tree when succinct roles is used. For a + # successful traversal all top level metadata files plus the expected + # bin should exist locally and only one bin must be downloaded. + + try: + exp_files = [*TOP_LEVEL_ROLE_NAMES, test_data.expected_target_bin] + exp_calls = [(test_data.expected_target_bin, 1)] + + self.sim = RepositorySimulator() + self.sim.add_succinct_roles("targets", test_data.bit_length, "bin") + self.sim.update_snapshot() + + self.setup_subtest() + + updater = self._init_updater() + # Call explicitly refresh to simplify the expected_calls list. + updater.refresh() + self.sim.fetch_tracker.metadata.clear() + # Check that metadata dir contains only top-level roles + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + + # Looking for a non-existing targetpath forces updater + # to visit a corresponding delegated role. + targetfile = updater.get_targetinfo(test_data.target_path) + self.assertIsNone(targetfile) + + # Check that the delegated roles were visited in the expected + # order and the corresponding metadata files were persisted. + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) + self._assert_files_exist(exp_files) + + finally: + self.teardown_subtest() + class TestTargetFileSearch(TestDelegations): r""" diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index c78e5b65aa..83fc7fcf09 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -184,7 +184,7 @@ def test_root_rotation(self, root_versions: List[MdVersion]) -> None: self.sim.root.roles[Root.type].threshold = rootver.threshold for i in rootver.keys: - self.sim.root.add_key(Root.type, self.keys[i]) + self.sim.root.add_key(self.keys[i], Root.type) for i in rootver.sigs: self.sim.add_signer(Root.type, self.signers[i]) self.sim.root.version += 1 @@ -254,7 +254,7 @@ def test_non_root_rotations(self, md_version: MdVersion) -> None: self.sim.root.roles[role].threshold = md_version.threshold for i in md_version.keys: - self.sim.root.add_key(role, self.keys[i]) + self.sim.root.add_key(self.keys[i], role) for i in md_version.sigs: self.sim.add_signer(role, self.signers[i]) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 7bdccb9594..072ee1ff61 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -39,6 +39,7 @@ ClassVar, Dict, Generic, + Iterator, List, Mapping, Optional, @@ -412,7 +413,7 @@ def verify_delegate( """ # Find the keys and role in delegator metadata - role = None + role: Optional[Role] = None if isinstance(self.signed, Root): keys = self.signed.keys role = self.signed.roles.get(delegated_role) @@ -421,7 +422,13 @@ def verify_delegate( raise ValueError(f"No delegation found for {delegated_role}") keys = self.signed.delegations.keys - role = self.signed.delegations.roles.get(delegated_role) + if self.signed.delegations.roles is not None: + role = self.signed.delegations.roles.get(delegated_role) + elif self.signed.delegations.succinct_roles is not None: + if self.signed.delegations.succinct_roles.is_delegated_role( + delegated_role + ): + role = self.signed.delegations.succinct_roles else: raise TypeError("Call is valid only on delegator metadata") @@ -938,28 +945,33 @@ def to_dict(self) -> Dict[str, Any]: ) return root_dict - def add_key(self, role: str, key: Key) -> None: + def add_key(self, key: Key, role: str) -> None: """Adds new signing key for delegated role ``role``. Args: - role: Name of the role, for which ``key`` is added. key: Signing key to be added for ``role``. + role: Name of the role, for which ``key`` is added. Raises: - ValueError: If ``role`` doesn't exist. + ValueError: If the argument order is wrong or if ``role`` doesn't + exist. """ + # Verify that our users are not using the old argument order. + if isinstance(role, Key): + raise ValueError("Role must be a string, not a Key instance") + if role not in self.roles: raise ValueError(f"Role {role} doesn't exist") if key.keyid not in self.roles[role].keyids: self.roles[role].keyids.append(key.keyid) self.keys[key.keyid] = key - def remove_key(self, role: str, keyid: str) -> None: - """Removes key from ``role`` and updates the key store. + def revoke_key(self, keyid: str, role: str) -> None: + """Revoke key from ``role`` and updates the key store. Args: - role: Name of the role, for which a signing key is removed. keyid: Identifier of the key to be removed for ``role``. + role: Name of the role, for which a signing key is removed. Raises: ValueError: If ``role`` doesn't exist or if ``role`` doesn't include @@ -1296,7 +1308,7 @@ class DelegatedRole(Role): paths: Path patterns. See note above. path_hash_prefixes: Hash prefixes. See note above. unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API + by TUF Metadata API. Raises: ValueError: Invalid arguments. @@ -1315,11 +1327,11 @@ def __init__( super().__init__(keyids, threshold, unrecognized_fields) self.name = name self.terminating = terminating - if paths is not None and path_hash_prefixes is not None: - raise ValueError("Either paths or path_hash_prefixes can be set") - - if paths is None and path_hash_prefixes is None: - raise ValueError("One of paths or path_hash_prefixes must be set") + exclusive_vars = [paths, path_hash_prefixes] + if sum(1 for var in exclusive_vars if var is not None) != 1: + raise ValueError( + "Only one of (paths, path_hash_prefixes) must be set" + ) if paths is not None and any(not isinstance(p, str) for p in paths): raise ValueError("Paths must be strings") @@ -1348,7 +1360,7 @@ def from_dict(cls, role_dict: Dict[str, Any]) -> "DelegatedRole": """Creates ``DelegatedRole`` object from its json/dict representation. Raises: - ValueError, KeyError: Invalid arguments. + ValueError, KeyError, TypeError: Invalid arguments. """ name = role_dict.pop("name") keyids = role_dict.pop("keyids") @@ -1435,6 +1447,150 @@ def is_delegated_path(self, target_filepath: str) -> bool: return False +class SuccinctRoles(Role): + """Succinctly defines a hash bin delegation graph. + + A ``SuccinctRoles`` object describes a delegation graph that covers all + targets, distributing them uniformly over the delegated roles (i.e. bins) + in the graph. + + The total number of bins is 2 to the power of the passed ``bit_length``. + + Bin names are the concatenation of the passed ``name_prefix`` and a + zero-padded hex representation of the bin index separated by a hyphen. + + The passed ``keyids`` and ``threshold`` is used for each bin, and each bin + is 'terminating'. + + For details: https://github.com/theupdateframework/taps/blob/master/tap15.md + + Args: + keyids: Signing key identifiers for any bin metadata. + threshold: Number of keys required to sign any bin metadata. + bit_length: Number of bits between 1 and 32. + name_prefix: Prefix of all bin names. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API. + + Raises: + ValueError, TypeError, AttributeError: Invalid arguments. + """ + + def __init__( + self, + keyids: List[str], + threshold: int, + bit_length: int, + name_prefix: str, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ) -> None: + super().__init__(keyids, threshold, unrecognized_fields) + + if bit_length <= 0 or bit_length > 32: + raise ValueError("bit_length must be between 1 and 32") + if not isinstance(name_prefix, str): + raise ValueError("name_prefix must be a string") + + self.bit_length = bit_length + self.name_prefix = name_prefix + + # Calculate the suffix_len value based on the total number of bins in + # hex. If bit_length = 8 then number_of_bins = 256 or 100 in hex + # and suffix_len = 3 meaning the third bin will have a suffix of "003" + self.number_of_bins = 2**bit_length + # suffix_len is calculated based on "number_of_bins - 1" as the name + # of the last bin contains the number "number_of_bins -1" as a suffix. + self.suffix_len = len(f"{self.number_of_bins-1:x}") + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, SuccinctRoles): + return False + + return ( + super().__eq__(other) + and self.bit_length == other.bit_length + and self.name_prefix == other.name_prefix + ) + + @classmethod + def from_dict(cls, role_dict: Dict[str, Any]) -> "SuccinctRoles": + """Creates ``SuccinctRoles`` object from its json/dict representation. + + Raises: + ValueError, KeyError, AttributeError, TypeError: Invalid arguments. + """ + keyids = role_dict.pop("keyids") + threshold = role_dict.pop("threshold") + bit_length = role_dict.pop("bit_length") + name_prefix = role_dict.pop("name_prefix") + # All fields left in the role_dict are unrecognized. + return cls(keyids, threshold, bit_length, name_prefix, role_dict) + + def to_dict(self) -> Dict[str, Any]: + """Returns the dict representation of self.""" + base_role_dict = super().to_dict() + return { + "bit_length": self.bit_length, + "name_prefix": self.name_prefix, + **base_role_dict, + } + + def get_role_for_target(self, target_filepath: str) -> str: + """Calculates the name of the delegated role responsible for + ``target_filepath``. + + The target at path ``target_filepath`` is assigned to a bin by casting + the left-most ``bit_length`` of bits of the file path hash digest to + int, using it as bin index between 0 and ``2**bit_length - 1``. + + Args: + target_filepath: URL path to a target file, relative to a base + targets URL. + """ + hasher = sslib_hash.digest(algorithm="sha256") + hasher.update(target_filepath.encode("utf-8")) + + # We can't ever need more than 4 bytes (32 bits). + hash_bytes = hasher.digest()[:4] + # Right shift hash bytes, so that we only have the leftmost + # bit_length bits that we care about. + shift_value = 32 - self.bit_length + bin_number = int.from_bytes(hash_bytes, byteorder="big") >> shift_value + # Add zero padding if necessary and cast to hex the suffix. + suffix = f"{bin_number:0{self.suffix_len}x}" + return f"{self.name_prefix}-{suffix}" + + def get_roles(self) -> Iterator[str]: + """Yield the names of all different delegated roles one by one.""" + for i in range(0, self.number_of_bins): + suffix = f"{i:0{self.suffix_len}x}" + yield f"{self.name_prefix}-{suffix}" + + def is_delegated_role(self, role_name: str) -> bool: + """Determines whether the given ``role_name`` is in one of + the delegated roles that ``SuccinctRoles`` represents. + + Args: + role_name: The name of the role to check against. + """ + desired_prefix = self.name_prefix + "-" + + if not role_name.startswith(desired_prefix): + return False + + suffix = role_name[len(desired_prefix) :] + if len(suffix) != self.suffix_len: + return False + + try: + # make sure suffix is hex value + num = int(suffix, 16) + except ValueError: + return False + + return 0 <= num < self.number_of_bins + + class Delegations: """A container object storing information about all delegations. @@ -1447,9 +1603,17 @@ class Delegations: defines which keys are required to sign the metadata for a specific role. The roles order also defines the order that role delegations are considered during target searches. + succinct_roles: Contains succinct information about hash bin + delegations. Note that succinct roles is not a TUF specification + feature yet and setting `succinct_roles` to a value makes the + resulting metadata non-compliant. The metadata will not be accepted + as valid by specification compliant clients such as those built with + python-tuf <= 1.1.0. For more information see: https://github.com/theupdateframework/taps/blob/master/tap15.md unrecognized_fields: Dictionary of all attributes that are not managed by TUF Metadata API + Exactly one of ``roles`` and ``succinct_roles`` must be set. + Raises: ValueError: Invalid arguments. """ @@ -1457,18 +1621,24 @@ class Delegations: def __init__( self, keys: Dict[str, Key], - roles: Dict[str, DelegatedRole], + roles: Optional[Dict[str, DelegatedRole]] = None, + succinct_roles: Optional[SuccinctRoles] = None, unrecognized_fields: Optional[Dict[str, Any]] = None, ): self.keys = keys - - for role in roles: - if not role or role in TOP_LEVEL_ROLE_NAMES: - raise ValueError( - "Delegated roles cannot be empty or use top-level role names" - ) + if sum(1 for v in [roles, succinct_roles] if v is not None) != 1: + raise ValueError("One of roles and succinct_roles must be set") + + if roles is not None: + for role in roles: + if not role or role in TOP_LEVEL_ROLE_NAMES: + raise ValueError( + "Delegated roles cannot be empty or use top-level " + "role names" + ) self.roles = roles + self.succinct_roles = succinct_roles if unrecognized_fields is None: unrecognized_fields = {} @@ -1478,14 +1648,22 @@ def __eq__(self, other: Any) -> bool: if not isinstance(other, Delegations): return False - return ( + all_attributes_check = ( self.keys == other.keys - # Order of the delegated roles matters (see issue #1788). - and list(self.roles.items()) == list(other.roles.items()) and self.roles == other.roles + and self.succinct_roles == other.succinct_roles and self.unrecognized_fields == other.unrecognized_fields ) + if self.roles is not None and other.roles is not None: + all_attributes_check = ( + all_attributes_check + # Order of the delegated roles matters (see issue #1788). + and list(self.roles.items()) == list(other.roles.items()) + ) + + return all_attributes_check + @classmethod def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations": """Creates ``Delegations`` object from its json/dict representation. @@ -1497,25 +1675,59 @@ def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations": keys_res = {} for keyid, key_dict in keys.items(): keys_res[keyid] = Key.from_dict(keyid, key_dict) - roles = delegations_dict.pop("roles") - roles_res: Dict[str, DelegatedRole] = {} - for role_dict in roles: - new_role = DelegatedRole.from_dict(role_dict) - if new_role.name in roles_res: - raise ValueError(f"Duplicate role {new_role.name}") - roles_res[new_role.name] = new_role + roles = delegations_dict.pop("roles", None) + roles_res: Optional[Dict[str, DelegatedRole]] = None + + if roles is not None: + roles_res = {} + for role_dict in roles: + new_role = DelegatedRole.from_dict(role_dict) + if new_role.name in roles_res: + raise ValueError(f"Duplicate role {new_role.name}") + roles_res[new_role.name] = new_role + + succinct_roles_dict = delegations_dict.pop("succinct_roles", None) + succinct_roles_info = None + if succinct_roles_dict is not None: + succinct_roles_info = SuccinctRoles.from_dict(succinct_roles_dict) + # All fields left in the delegations_dict are unrecognized. - return cls(keys_res, roles_res, delegations_dict) + return cls(keys_res, roles_res, succinct_roles_info, delegations_dict) def to_dict(self) -> Dict[str, Any]: """Returns the dict representation of self.""" keys = {keyid: key.to_dict() for keyid, key in self.keys.items()} - roles = [role_obj.to_dict() for role_obj in self.roles.values()] - return { + res_dict: Dict[str, Any] = { "keys": keys, - "roles": roles, **self.unrecognized_fields, } + if self.roles is not None: + roles = [role_obj.to_dict() for role_obj in self.roles.values()] + res_dict["roles"] = roles + elif self.succinct_roles is not None: + res_dict["succinct_roles"] = self.succinct_roles.to_dict() + + return res_dict + + def get_roles_for_target( + self, target_filepath: str + ) -> Iterator[Tuple[str, bool]]: + """Given ``target_filepath`` get names and terminating status of all + delegated roles who are responsible for it. + + Args: + target_filepath: URL path to a target file, relative to a base + targets URL. + """ + if self.roles is not None: + for role in self.roles.values(): + if role.is_delegated_path(target_filepath): + yield role.name, role.terminating + + elif self.succinct_roles is not None: + # We consider all succinct_roles as terminating. + # For more information read TAP 15. + yield self.succinct_roles.get_role_for_target(target_filepath), True class TargetFile(BaseFile): @@ -1765,42 +1977,73 @@ def to_dict(self) -> Dict[str, Any]: targets_dict["delegations"] = self.delegations.to_dict() return targets_dict - def add_key(self, role: str, key: Key) -> None: + def add_key(self, key: Key, role: Optional[str] = None) -> None: """Adds new signing key for delegated role ``role``. + If succinct_roles is used then the ``role`` argument is not required. + Args: - role: Name of the role, for which ``key`` is added. key: Signing key to be added for ``role``. + role: Name of the role, for which ``key`` is added. Raises: - ValueError: If there are no delegated roles or if ``role`` is not - delegated by this Target. + ValueError: If the argument order is wrong or if there are no + delegated roles or if ``role`` is not delegated by this Target. """ - if self.delegations is None or role not in self.delegations.roles: + # Verify that our users are not using the old argument order. + if isinstance(role, Key): + raise ValueError("Role must be a string, not a Key instance") + + if self.delegations is None: raise ValueError(f"Delegated role {role} doesn't exist") - if key.keyid not in self.delegations.roles[role].keyids: - self.delegations.roles[role].keyids.append(key.keyid) + + if self.delegations.roles is not None: + if role not in self.delegations.roles: + raise ValueError(f"Delegated role {role} doesn't exist") + if key.keyid not in self.delegations.roles[role].keyids: + self.delegations.roles[role].keyids.append(key.keyid) + + elif self.delegations.succinct_roles is not None: + if key.keyid not in self.delegations.succinct_roles.keyids: + self.delegations.succinct_roles.keyids.append(key.keyid) + self.delegations.keys[key.keyid] = key - def remove_key(self, role: str, keyid: str) -> None: - """Removes key from delegated role ``role`` and updates the delegations + def revoke_key(self, keyid: str, role: Optional[str] = None) -> None: + """Revokes key from delegated role ``role`` and updates the delegations key store. + If succinct_roles is used then the ``role`` argument is not required. + Args: - role: Name of the role, for which a signing key is removed. keyid: Identifier of the key to be removed for ``role``. + role: Name of the role, for which a signing key is removed. Raises: ValueError: If there are no delegated roles or if ``role`` is not - delegated by this ``Target`` or if key is not used by ``role``. + delegated by this ``Target`` or if key is not used by ``role`` + or if key with id ``keyid`` is not used by succinct roles. """ - if self.delegations is None or role not in self.delegations.roles: + if self.delegations is None: raise ValueError(f"Delegated role {role} doesn't exist") - if keyid not in self.delegations.roles[role].keyids: - raise ValueError(f"Key with id {keyid} is not used by {role}") - self.delegations.roles[role].keyids.remove(keyid) - for keyinfo in self.delegations.roles.values(): - if keyid in keyinfo.keyids: - return + + if self.delegations.roles is not None: + if role not in self.delegations.roles: + raise ValueError(f"Delegated role {role} doesn't exist") + if keyid not in self.delegations.roles[role].keyids: + raise ValueError(f"Key with id {keyid} is not used by {role}") + + self.delegations.roles[role].keyids.remove(keyid) + for keyinfo in self.delegations.roles.values(): + if keyid in keyinfo.keyids: + return + + elif self.delegations.succinct_roles is not None: + if keyid not in self.delegations.succinct_roles.keyids: + raise ValueError( + f"Key with id {keyid} is not used by succinct_roles" + ) + + self.delegations.succinct_roles.keyids.remove(keyid) del self.delegations.keys[keyid] diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 914163e7ab..150d195a2b 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -438,17 +438,17 @@ def _preorder_depth_first_walk( child_roles_to_visit = [] # NOTE: This may be a slow operation if there are many # delegated roles. - for child_role in targets.delegations.roles.values(): - if child_role.is_delegated_path(target_filepath): - logger.debug("Adding child role %s", child_role.name) - - child_roles_to_visit.append( - (child_role.name, role_name) - ) - if child_role.terminating: - logger.debug("Not backtracking to other roles") - delegations_to_visit = [] - break + for ( + child_name, + terminating, + ) in targets.delegations.get_roles_for_target(target_filepath): + + logger.debug("Adding child role %s", child_name) + child_roles_to_visit.append((child_name, role_name)) + if terminating: + logger.debug("Not backtracking to other roles") + delegations_to_visit = [] + break # Push 'child_roles_to_visit' in reverse order of appearance # onto 'delegations_to_visit'. Roles are popped from the end of # the list.