Skip to content

Commit

Permalink
Merge 26982f7 into 41f7e80
Browse files Browse the repository at this point in the history
  • Loading branch information
MVrachev committed Apr 21, 2021
2 parents 41f7e80 + 26982f7 commit 3af506c
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 38 deletions.
44 changes: 41 additions & 3 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,21 +306,21 @@ def test_metadata_root(self):
root_key2['keytype'], root_key2['scheme'], root_key2['keyval'])

# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles['root']['keyids'])
self.assertNotIn(keyid, root.signed.roles['root'].keyids)
self.assertNotIn(keyid, root.signed.keys)

# Add new root key
root.signed.add_key('root', keyid, key_metadata)

# Assert that key is added
self.assertIn(keyid, root.signed.roles['root']['keyids'])
self.assertIn(keyid, root.signed.roles['root'].keyids)
self.assertIn(keyid, root.signed.keys)

# Remove the key
root.signed.remove_key('root', keyid)

# Assert that root does not contain the new key anymore
self.assertNotIn(keyid, root.signed.roles['root']['keyids'])
self.assertNotIn(keyid, root.signed.roles['root'].keyids)
self.assertNotIn(keyid, root.signed.keys)


Expand Down Expand Up @@ -349,6 +349,44 @@ def test_metadata_targets(self):
# Verify that data is updated
self.assertEqual(targets.signed.targets[filename], fileinfo)

def setup_dict_with_unrecognized_field(self, file_path, field, value):
json_dict = {}
with open(file_path) as f:
json_dict = json.loads(f.read())
# We are changing the json dict without changing the signature.
# This could be a problem if we want to do verification on this dict.
json_dict["signed"][field] = value
return json_dict

def test_support_for_unrecognized_fields(self):
for metadata in ["root", "timestamp", "snapshot", "targets"]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
dict1 = self.setup_dict_with_unrecognized_field(path, "f", "b")
# Test that the metadata classes store unrecognized fields when
# initializing and passes them when casting the instance to a dict.

# Add unrecognized fields to all metadata sub (helper) classes.
if metadata == "root":
for keyid in dict1["signed"]["keys"].keys():
dict1["signed"]["keys"][keyid]["d"] = "c"
for role_str in dict1["signed"]["roles"].keys():
dict1["signed"]["roles"][role_str]["e"] = "g"

temp_copy = copy.deepcopy(dict1)
metadata_obj = Metadata.from_dict(temp_copy)

self.assertEqual(dict1["signed"], metadata_obj.signed.to_dict())

# Test that two instances of the same class could have different
# unrecognized fields.
dict2 = self.setup_dict_with_unrecognized_field(path, "f2", "b2")
temp_copy2 = copy.deepcopy(dict2)
metadata_obj2 = Metadata.from_dict(temp_copy2)
self.assertNotEqual(
metadata_obj.signed.to_dict(), metadata_obj2.signed.to_dict()
)


# Run unit test.
if __name__ == '__main__':
utils.configure_test_logging(sys.argv)
Expand Down
188 changes: 153 additions & 35 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,20 @@ class Signed:
spec_version: The TUF specification version number (semver) the
metadata format adheres to.
expires: The metadata expiration datetime object.
unrecognized_fields: Dictionary of all unrecognized fields.
"""

# NOTE: Signed is a stupid name, because this might not be signed yet, but
# we keep it to match spec terminology (I often refer to this as "payload",
# or "inner metadata")
def __init__(
self, _type: str, version: int, spec_version: str, expires: datetime
self,
_type: str,
version: int,
spec_version: str,
expires: datetime,
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:

self._type = _type
Expand All @@ -338,6 +344,9 @@ def __init__(
if version < 0:
raise ValueError(f"version must be >= 0, got {version}")
self.version = version
if unrecognized_fields is None:
unrecognized_fields = {}
self.unrecognized_fields = unrecognized_fields

@staticmethod
def _common_fields_from_dict(signed_dict: Mapping[str, Any]) -> list:
Expand Down Expand Up @@ -369,6 +378,7 @@ def _common_fields_to_dict(self) -> Dict[str, Any]:
"version": self.version,
"spec_version": self.spec_version,
"expires": self.expires.isoformat() + "Z",
**self.unrecognized_fields,
}

def is_expired(self, reference_time: datetime = None) -> bool:
Expand Down Expand Up @@ -397,6 +407,84 @@ def bump_version(self) -> None:
self.version += 1


class Key:
"""A container class representing the public portion of a Key.
Attributes:
keytype: A string denoting a public key signature system,
such as "rsa", "ed25519", and "ecdsa-sha2-nistp256".
scheme: A string denoting a corresponding signature scheme. For example:
"rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256".
keyval: A dictionary containing the public portion of the key.
unrecognized_fields: Dictionary of all unrecognized fields.
"""

def __init__(
self,
keytype: str,
scheme: str,
keyval: Mapping[str, str],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
self.keytype = keytype
self.scheme = scheme
self.keyval = keyval
if unrecognized_fields is None:
unrecognized_fields = {}
self.unrecognized_fields = unrecognized_fields

def to_dict(self) -> Dict:
"""Returns the dictionary representation of self."""
res_dict = {
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
**self.unrecognized_fields,
}

return res_dict

def get_public_val(self):
"""Helper function to access the public value of the key."""
return self.keyval["public"]


class Role:
"""A container class containing the set of keyids and threshold associated
with a particular role.
Attributes:
keyids: A set of strings each of which represents a given key.
threshold: An integer representing the required number of keys for that
particular role.
unrecognized_fields: Dictionary of all unrecognized fields.
"""

def __init__(
self,
keyids: set,
threshold: int,
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
self.keyids = keyids
self.threshold = threshold
if unrecognized_fields is None:
unrecognized_fields = {}
self.unrecognized_fields = unrecognized_fields

def to_dict(self) -> Dict:
"""Returns the dictionary representation of self."""
res_dict = {
"keyids": self.keyids,
"threshold": self.threshold,
**self.unrecognized_fields,
}

return res_dict


class Root(Signed):
"""A container for the signed part of root metadata.
Expand All @@ -406,27 +494,13 @@ class Root(Signed):
keys: A dictionary that contains a public key store used to verify
top level roles metadata signatures::
{
'<KEYID>': {
'keytype': '<KEY TYPE>',
'scheme': '<KEY SCHEME>',
'keyid_hash_algorithms': [
'<HASH ALGO 1>',
'<HASH ALGO 2>'
...
],
'keyval': {
'public': '<PUBLIC KEY HEX REPRESENTATION>'
}
},
'<KEYID>': <Key instance>,
...
},
roles: A dictionary that contains a list of signing keyids and
a signature threshold for each top level role::
{
'<ROLE>': {
'keyids': ['<SIGNING KEY KEYID>', ...],
'threshold': <SIGNATURE THRESHOLD>,
},
'<ROLE>': <Role istance>,
...
}
Expand All @@ -443,11 +517,13 @@ def __init__(
spec_version: str,
expires: datetime,
consistent_snapshot: bool,
keys: Mapping[str, Any],
roles: Mapping[str, Any],
keys: Mapping[str, Key],
roles: Mapping[str, Role],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(_type, version, spec_version, expires)
# TODO: Add classes for keys and roles
super().__init__(
_type, version, spec_version, expires, unrecognized_fields
)
self.consistent_snapshot = consistent_snapshot
self.keys = keys
self.roles = roles
Expand All @@ -459,16 +535,43 @@ def from_dict(cls, root_dict: Mapping[str, Any]) -> "Root":
consistent_snapshot = root_dict.pop("consistent_snapshot")
keys = root_dict.pop("keys")
roles = root_dict.pop("roles")
return cls(*common_args, consistent_snapshot, keys, roles)

for keyid, key in keys.items():
keytype = key.pop("keytype")
scheme = key.pop("scheme")
keyval = key.pop("keyval")
# All fields left in the key dict are unrecognized.
unrecognized_key_fields = key
keys[keyid] = Key(keytype, scheme, keyval, unrecognized_key_fields)

for role_str, role_dict in roles.items():
keyids = role_dict.pop("keyids")
threshold = role_dict.pop("threshold")
# All fields left in the role_dict are unrecognized.
unrecognized_role_fields = role_dict
roles[role_str] = Role(keyids, threshold, unrecognized_role_fields)

# All fields left in the root_dict are unrecognized.
unrecognized_fields = root_dict
return cls(
*common_args, consistent_snapshot, keys, roles, unrecognized_fields
)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self. """
root_dict = self._common_fields_to_dict()
keys = {}
for keyid, key in self.keys.items():
keys[keyid] = key.to_dict()
roles = {}
for role_str, role in self.roles.items():
roles[role_str] = role.to_dict()

root_dict.update(
{
"consistent_snapshot": self.consistent_snapshot,
"keys": self.keys,
"roles": self.roles,
"keys": keys,
"roles": roles,
}
)
return root_dict
Expand All @@ -478,17 +581,17 @@ def add_key(
self, role: str, keyid: str, key_metadata: Mapping[str, Any]
) -> None:
"""Adds new key for 'role' and updates the key store. """
if keyid not in self.roles[role]["keyids"]:
self.roles[role]["keyids"].append(keyid)
if keyid not in self.roles[role].keyids:
self.roles[role].keyids.append(keyid)
self.keys[keyid] = key_metadata

# Remove key for a role.
def remove_key(self, role: str, keyid: str) -> None:
"""Removes key for 'role' and updates the key store. """
if keyid in self.roles[role]["keyids"]:
self.roles[role]["keyids"].remove(keyid)
if keyid in self.roles[role].keyids:
self.roles[role].keyids.remove(keyid)
for keyinfo in self.roles.values():
if keyid in keyinfo["keyids"]:
if keyid in keyinfo.keyids:
return

del self.keys[keyid]
Expand Down Expand Up @@ -521,8 +624,11 @@ def __init__(
spec_version: str,
expires: datetime,
meta: Mapping[str, Any],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(_type, version, spec_version, expires)
super().__init__(
_type, version, spec_version, expires, unrecognized_fields
)
# TODO: Add class for meta
self.meta = meta

Expand All @@ -531,7 +637,9 @@ def from_dict(cls, timestamp_dict: Mapping[str, Any]) -> "Timestamp":
"""Creates Timestamp object from its dict representation. """
common_args = cls._common_fields_from_dict(timestamp_dict)
meta = timestamp_dict.pop("meta")
return cls(*common_args, meta)
# All fields left in the timestamp_dict are unrecognized.
unrecognized_fields = timestamp_dict
return cls(*common_args, meta, unrecognized_fields)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self. """
Expand Down Expand Up @@ -585,8 +693,11 @@ def __init__(
spec_version: str,
expires: datetime,
meta: Mapping[str, Any],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(_type, version, spec_version, expires)
super().__init__(
_type, version, spec_version, expires, unrecognized_fields
)
# TODO: Add class for meta
self.meta = meta

Expand All @@ -595,7 +706,9 @@ def from_dict(cls, snapshot_dict: Mapping[str, Any]) -> "Snapshot":
"""Creates Snapshot object from its dict representation. """
common_args = cls._common_fields_from_dict(snapshot_dict)
meta = snapshot_dict.pop("meta")
return cls(*common_args, meta)
# All fields left in the snapshot_dict are unrecognized.
unrecognized_fields = snapshot_dict
return cls(*common_args, meta, unrecognized_fields)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self. """
Expand Down Expand Up @@ -688,8 +801,11 @@ def __init__(
expires: datetime,
targets: Mapping[str, Any],
delegations: Mapping[str, Any],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(_type, version, spec_version, expires)
super().__init__(
_type, version, spec_version, expires, unrecognized_fields
)
# TODO: Add class for meta
self.targets = targets
self.delegations = delegations
Expand All @@ -700,7 +816,9 @@ def from_dict(cls, targets_dict: Mapping[str, Any]) -> "Targets":
common_args = cls._common_fields_from_dict(targets_dict)
targets = targets_dict.pop("targets")
delegations = targets_dict.pop("delegations")
return cls(*common_args, targets, delegations)
# All fields left in the targets_dict are unrecognized.
unrecognized_fields = targets_dict
return cls(*common_args, targets, delegations, unrecognized_fields)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self. """
Expand Down

0 comments on commit 3af506c

Please sign in to comment.