Skip to content

Commit

Permalink
Merge 9f5327c into 6a5b642
Browse files Browse the repository at this point in the history
  • Loading branch information
MVrachev committed Sep 14, 2021
2 parents 6a5b642 + 9f5327c commit a601582
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 50 deletions.
1 change: 0 additions & 1 deletion tests/slow_retrieval_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"""

import os
import sys
import time
import http.server

Expand Down
95 changes: 77 additions & 18 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
Key,
MetaFile,
TargetFile,
Delegations,
DelegatedRole,
)

Expand Down Expand Up @@ -512,23 +511,6 @@ def test_is_target_in_pathpattern(self):
)


def test_delegation_class(self):
# empty keys and roles
delegations_dict = {"keys":{}, "roles":[]}
delegations = Delegations.from_dict(delegations_dict.copy())
self.assertEqual(delegations_dict, delegations.to_dict())

# Test some basic missing or broken input
invalid_delegations_dicts = [
{},
{"keys":None, "roles":None},
{"keys":{"foo":0}, "roles":[]},
{"keys":{}, "roles":["foo"]},
]
for d in invalid_delegations_dicts:
with self.assertRaises((KeyError, AttributeError)):
Delegations.from_dict(d)

def test_metadata_targets(self):
targets_path = os.path.join(
self.repo_dir, 'metadata', 'targets.json')
Expand All @@ -554,6 +536,83 @@ def test_metadata_targets(self):
targets.signed.targets[filename].to_dict(), fileinfo.to_dict()
)

def test_targets_key_api(self):
targets_path = os.path.join(
self.repo_dir, 'metadata', 'targets.json')
targets: Targets = Metadata[Targets].from_file(targets_path).signed

# Add a new delegated role "role2" in targets
delegated_role = DelegatedRole.from_dict({
"keyids": [],
"name": "role2",
"paths": ["fn3", "fn4"],
"terminating": False,
"threshold": 1
})
targets.delegations.roles["role2"] = delegated_role

key_dict = {
"keytype": "ed25519",
"keyval": {
"public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd"
},
"scheme": "ed25519"
}
key = Key.from_dict("id2", key_dict)

# Assert that delegated role "role1" does not contain the new key
self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids)
targets.add_key("role1", key)

# Assert that the new key is added to the delegated role "role1"
self.assertIn(key.keyid, targets.delegations.roles["role1"].keyids)

# Confirm that the newly added key does not break the obj serialization
targets.to_dict()

# Try adding the same key again and assert its ignored.
past_keyid = targets.delegations.roles["role1"].keyids.copy()
targets.add_key("role1", key)
self.assertEqual(past_keyid, targets.delegations.roles["role1"].keyids)

# Try adding a key to a delegated role that doesn't exists
with self.assertRaises(KeyError):
targets.add_key("abc", key)

# Add the same key to "role2" as well
targets.add_key("role2", key)

# Remove the key from "role1" role ("role2" still uses it)
targets.remove_key("role1", key.keyid)

# Assert that delegated role "role1" doesn't contain the key.
self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids)
self.assertIn(key.keyid, targets.delegations.roles["role2"].keyids)

# Remove the key from "role2" as well
targets.remove_key("role2", key.keyid)
self.assertNotIn(key.keyid, targets.delegations.roles["role2"].keyids)

# Try remove key not used by "role1"
with self.assertRaises(ValueError):
targets.remove_key("role1", key.keyid)

# Try removing a key from delegated role that doesn't exists
with self.assertRaises(ValueError):
targets.remove_key("abc", key.keyid)

# Remove delegations as a whole
targets.delegations = None

# Test that calling add_key and remove_key throws an error
# and that delegations is still None after each of the api calls
with self.assertRaises(KeyError):
targets.add_key("role1", key)
self.assertTrue(targets.delegations is None)
with self.assertRaises(ValueError):
targets.remove_key("role1", key.keyid)
self.assertTrue(targets.delegations is None)


def test_length_and_hash_validation(self):

Expand Down
6 changes: 1 addition & 5 deletions tests/test_indefinite_freeze_attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@
import unittest
import sys
from urllib import request

if sys.version_info >= (3, 3):
import unittest.mock as mock
else:
import mock
import unittest.mock as mock

import tuf.formats
import tuf.log
Expand Down
25 changes: 22 additions & 3 deletions tests/test_metadata_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
import unittest
import copy

from typing import Dict, Callable, Optional, Mapping, Any
from datetime import datetime
from typing import Dict, Callable

from tests import utils

from tuf.api.metadata import (
Signed,
Root,
Snapshot,
Timestamp,
Expand Down Expand Up @@ -304,6 +302,27 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str):
DelegatedRole.from_dict(copy.copy(case_dict))


invalid_delegations: DataSet = {
"empty delegations": '{}',
"bad keys": '{"keys": "foo", \
"roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}',
"bad roles": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \
"roles": ["foo"]}',
"duplicate role names": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \
"roles": [ \
{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}, \
{"keyids": ["keyid2"], "name": "a", "paths": ["fn3"], "terminating": false, "threshold": 2} \
] \
}',
}

@run_sub_tests_with_dataset(invalid_delegations)
def test_invalid_delegation_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
with self.assertRaises((ValueError, KeyError, AttributeError)):
Delegations.from_dict(copy.deepcopy(case_dict))


valid_delegations: DataSet = {
"all":
'{"keys": { \
Expand Down
2 changes: 1 addition & 1 deletion tests/test_trusted_metadata_set.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Optional, Union, Callable
from typing import Optional, Callable
import os
import sys
import unittest
Expand Down
7 changes: 1 addition & 6 deletions tests/test_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@
import shutil
import tempfile
import sys

if sys.version_info >= (3, 3):
import unittest.mock as mock

else:
import mock
import unittest.mock as mock

from tuf.repository_tool import * # part of TUTORIAL.md

Expand Down
6 changes: 1 addition & 5 deletions tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@
import sys
import unittest
import json

if sys.version_info >= (3, 3):
import unittest.mock as mock
else:
import mock
import unittest.mock as mock

import tuf
import tuf.exceptions
Expand Down
54 changes: 44 additions & 10 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,9 @@ def verify_delegate(

keys = self.signed.delegations.keys
roles = self.signed.delegations.roles
# Assume role names are unique in delegations.roles: #1426
# Find first role in roles with matching name (or None if no match)
role = next((r for r in roles if r.name == delegated_role), None)
# Role names are unique in delegations.roles
# Find role in roles with matching name (or None if no match).
role = roles.get(delegated_role)
else:
raise TypeError("Call is valid only on delegator metadata")

Expand Down Expand Up @@ -1129,16 +1129,17 @@ class Delegations:
Attributes:
keys: Dictionary of keyids to Keys. Defines the keys used in 'roles'.
roles: List of DelegatedRoles that define which keys are required to
sign the metadata for a specific role. The roles order also
defines the order that role delegations are considered in.
roles: Ordered dictionary of role names to DelegatedRoles instances. It
defines which keys are required to sign the metadata for a specific
role. The roles order also defines the order that role delegations
are considered in.
unrecognized_fields: Dictionary of all unrecognized fields.
"""

def __init__(
self,
keys: Dict[str, Key],
roles: List[DelegatedRole],
roles: "OrderedDict[str, DelegatedRole]",
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
self.keys = keys
Expand All @@ -1153,17 +1154,19 @@ def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations":
for keyid, key_dict in keys.items():
keys_res[keyid] = Key.from_dict(keyid, key_dict)
roles = delegations_dict.pop("roles")
roles_res = []
roles_res: "OrderedDict[str, DelegatedRole]" = OrderedDict()
for role_dict in roles:
new_role = DelegatedRole.from_dict(role_dict)
roles_res.append(new_role)
if roles_res.get(new_role.name) is not None:
raise ValueError(f"Duplicate role {new_role.name}")
roles_res[new_role.name] = new_role
# All fields left in the delegations_dict are unrecognized.
return cls(keys_res, roles_res, delegations_dict)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
keys = {keyid: key.to_dict() for keyid, key in self.keys.items()}
roles = [role_obj.to_dict() for role_obj in self.roles]
roles = [role_obj.to_dict() for role_obj in self.roles.values()]
return {
"keys": keys,
"roles": roles,
Expand Down Expand Up @@ -1368,3 +1371,34 @@ def to_dict(self) -> Dict[str, Any]:
def update(self, fileinfo: TargetFile) -> None:
"""Assigns passed target file info to meta dict."""
self.targets[fileinfo.path] = fileinfo

def add_key(self, role: str, key: Key) -> None:
"""Adds new signing key for delegated role 'role'.
Raises:
KeyError: If there are no delegations, delegate role 'role' or key
with id `key.keyid`.
"""
if self.delegations is None:
raise KeyError("Delegations doesn't exists!")
self.delegations.roles[role].keyids.add(key.keyid)
self.delegations.keys[key.keyid] = key

def remove_key(self, role: str, keyid: str) -> None:
"""Removes key from delegated role 'role' and updates the delegations
key store.
Raises:
ValueError: If there are no delegated roles or if 'role' is not
delegated by this Target or if key is not used by 'role'.
"""
if self.delegations is None or self.delegations.roles.get(role) is None:
raise ValueError(f"Delegated role {role} doesn't exists!")
if keyid not in self.delegations.roles[role].keyids:
raise ValueError(f"Key with id {keyid} is not used by {role}")
self.delegations.roles[role].keyids.remove(keyid)
for keyinfo in self.delegations.roles.values():
if keyid in keyinfo.keyids:
return

del self.delegations.keys[keyid]
2 changes: 1 addition & 1 deletion tuf/ngclient/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def _preorder_depth_first_walk(
child_roles_to_visit = []
# NOTE: This may be a slow operation if there are many
# delegated roles.
for child_role in role_metadata.delegations.roles:
for child_role in role_metadata.delegations.roles.values():
if child_role.is_delegated_path(target_filepath):
logger.debug("Adding child role %s", child_role.name)

Expand Down

0 comments on commit a601582

Please sign in to comment.