Skip to content

Commit

Permalink
Merge pull request #1591 from MVrachev/consistent-targets
Browse files Browse the repository at this point in the history
Fix handling consistent targets same as legacy updater
  • Loading branch information
sechkova committed Oct 12, 2021
2 parents 5cdc7dc + c3e746a commit 88245f1
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 84 deletions.
90 changes: 37 additions & 53 deletions tests/test_metadata_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import unittest
import copy

from typing import Dict, Callable
from typing import Dict

from tests import utils

Expand All @@ -31,28 +31,12 @@

logger = logging.getLogger(__name__)

# DataSet is only here so type hints can be used:
# It is a dict of name to test dict
DataSet = Dict[str, str]

# Test runner decorator: Runs the test as a set of N SubTests,
# (where N is number of items in dataset), feeding the actual test
# function one test case at a time
def run_sub_tests_with_dataset(dataset: DataSet):
def real_decorator(function: Callable[["TestSerialization", str], None]):
def wrapper(test_cls: "TestSerialization"):
for case, data in dataset.items():
with test_cls.subTest(case=case):
function(test_cls, data)
return wrapper
return real_decorator


class TestSerialization(unittest.TestCase):

# Snapshot instances with meta = {} are valid, but for a full valid
# repository it's required that meta has at least one element inside it.
invalid_signed: DataSet = {
invalid_signed: utils.DataSet = {
"no _type": '{"spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}',
"no spec_version": '{"_type": "signed", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}',
"no version": '{"_type": "signed", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}',
Expand Down Expand Up @@ -81,14 +65,14 @@ class TestSerialization(unittest.TestCase):
'{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}',
}

@run_sub_tests_with_dataset(invalid_signed)
@utils.run_sub_tests_with_dataset(invalid_signed)
def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((KeyError, ValueError, TypeError)):
Snapshot.from_dict(copy.deepcopy(case_dict))


valid_keys: DataSet = {
valid_keys: utils.DataSet = {
"all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \
"keyval": {"public": "foo"}}',
"unrecognized field": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \
Expand All @@ -97,14 +81,14 @@ def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]):
"keyval": {"public": "foo", "foo": "bar"}}',
}

@run_sub_tests_with_dataset(valid_keys)
@utils.run_sub_tests_with_dataset(valid_keys)
def test_valid_key_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
key = Key.from_dict("id", copy.copy(case_dict))
self.assertDictEqual(case_dict, key.to_dict())


invalid_keys: DataSet = {
invalid_keys: utils.DataSet = {
"no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}',
"no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}',
"no scheme": '{"keyid": "id", "keytype": "rsa", "keyval": {"public": "foo"}}',
Expand All @@ -115,43 +99,43 @@ def test_valid_key_serialization(self, test_case_data: str):
"keyval wrong type": '{"keyid": "id", "keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": 1}',
}

@run_sub_tests_with_dataset(invalid_keys)
@utils.run_sub_tests_with_dataset(invalid_keys)
def test_invalid_key_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((TypeError, KeyError)):
keyid = case_dict.pop("keyid")
Key.from_dict(keyid, copy.copy(case_dict))

invalid_roles: DataSet = {
invalid_roles: utils.DataSet = {
"no threshold": '{"keyids": ["keyid"]}',
"no keyids": '{"threshold": 3}',
"wrong threshold type": '{"keyids": ["keyid"], "threshold": "a"}',
"threshold below 1": '{"keyids": ["keyid"], "threshold": 0}',
"duplicate keyids": '{"keyids": ["keyid", "keyid"], "threshold": 3}',
}

@run_sub_tests_with_dataset(invalid_roles)
@utils.run_sub_tests_with_dataset(invalid_roles)
def test_invalid_role_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((KeyError, TypeError, ValueError)):
Role.from_dict(copy.deepcopy(case_dict))


valid_roles: DataSet = {
valid_roles: utils.DataSet = {
"all": '{"keyids": ["keyid"], "threshold": 3}',
"many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}',
"empty keyids": '{"keyids": [], "threshold": 1}',
"unrecognized field": '{"keyids": ["keyid"], "threshold": 3, "foo": "bar"}',
}

@run_sub_tests_with_dataset(valid_roles)
@utils.run_sub_tests_with_dataset(valid_roles)
def test_role_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
role = Role.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, role.to_dict())


valid_roots: DataSet = {
valid_roots: utils.DataSet = {
"all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \
"expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \
"keys": { \
Expand All @@ -178,14 +162,14 @@ def test_role_serialization(self, test_case_data: str):
"foo": "bar"}',
}

@run_sub_tests_with_dataset(valid_roots)
@utils.run_sub_tests_with_dataset(valid_roots)
def test_root_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
root = Root.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, root.to_dict())


invalid_metafiles: DataSet = {
invalid_metafiles: utils.DataSet = {
"wrong length type": '{"version": 1, "length": "a", "hashes": {"sha256" : "abc"}}',
"length 0": '{"version": 1, "length": 0, "hashes": {"sha256" : "abc"}}',
"length below 0": '{"version": 1, "length": -1, "hashes": {"sha256" : "abc"}}',
Expand All @@ -194,53 +178,53 @@ def test_root_serialization(self, test_case_data: str):
"hashes values wrong type": '{"version": 1, "length": 1, "hashes": {"sha256": 1}}',
}

@run_sub_tests_with_dataset(invalid_metafiles)
@utils.run_sub_tests_with_dataset(invalid_metafiles)
def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((TypeError, ValueError, AttributeError)):
MetaFile.from_dict(copy.deepcopy(case_dict))


valid_metafiles: DataSet = {
valid_metafiles: utils.DataSet = {
"all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}',
"no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }',
"no hashes": '{"length": 12, "version": 1}',
"unrecognized field": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1, "foo": "bar"}',
"many hashes": '{"hashes": {"sha256" : "abc", "sha512": "cde"}, "length": 12, "version": 1}',
}

@run_sub_tests_with_dataset(valid_metafiles)
@utils.run_sub_tests_with_dataset(valid_metafiles)
def test_metafile_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
metafile = MetaFile.from_dict(copy.copy(case_dict))
self.assertDictEqual(case_dict, metafile.to_dict())

invalid_timestamps: DataSet = {
invalid_timestamps: utils.DataSet = {
"no metafile": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z"}',
}

@run_sub_tests_with_dataset(invalid_timestamps)
@utils.run_sub_tests_with_dataset(invalid_timestamps)
def test_invalid_timestamp_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((ValueError, KeyError)):
Timestamp.from_dict(copy.deepcopy(case_dict))


valid_timestamps: DataSet = {
valid_timestamps: utils.DataSet = {
"all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
"meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}',
"unrecognized field": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
"meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}, "foo": "bar"}',
}

@run_sub_tests_with_dataset(valid_timestamps)
@utils.run_sub_tests_with_dataset(valid_timestamps)
def test_timestamp_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
timestamp = Timestamp.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, timestamp.to_dict())


valid_snapshots: DataSet = {
valid_snapshots: utils.DataSet = {
"all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
"meta": { \
"file1.txt": {"hashes": {"sha256" : "abc"}, "version": 1}, \
Expand All @@ -253,14 +237,14 @@ def test_timestamp_serialization(self, test_case_data: str):
"meta": { "file.txt": { "hashes": {"sha256" : "abc"}, "version": 1 }}, "foo": "bar"}',
}

@run_sub_tests_with_dataset(valid_snapshots)
@utils.run_sub_tests_with_dataset(valid_snapshots)
def test_snapshot_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
snapshot = Snapshot.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, snapshot.to_dict())


valid_delegated_roles: DataSet = {
valid_delegated_roles: utils.DataSet = {
# DelegatedRole inherits Role and some use cases can be found in the valid_roles.
"no hash prefix attribute":
'{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \
Expand All @@ -279,14 +263,14 @@ def test_snapshot_serialization(self, test_case_data: str):
"terminating": false, "threshold": 1}',
}

@run_sub_tests_with_dataset(valid_delegated_roles)
@utils.run_sub_tests_with_dataset(valid_delegated_roles)
def test_delegated_role_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict))
self.assertDictEqual(case_dict, deserialized_role.to_dict())


invalid_delegated_roles: DataSet = {
invalid_delegated_roles: utils.DataSet = {
# DelegatedRole inherits Role and some use cases can be found in the invalid_roles.
"missing hash prefixes and paths":
'{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}',
Expand All @@ -295,14 +279,14 @@ def test_delegated_role_serialization(self, test_case_data: str):
"paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}',
}

@run_sub_tests_with_dataset(invalid_delegated_roles)
@utils.run_sub_tests_with_dataset(invalid_delegated_roles)
def test_invalid_delegated_role_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
with self.assertRaises(ValueError):
DelegatedRole.from_dict(copy.copy(case_dict))


invalid_delegations: DataSet = {
invalid_delegations: utils.DataSet = {
"empty delegations": '{}',
"bad keys": '{"keys": "foo", \
"roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}',
Expand All @@ -316,14 +300,14 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str):
}',
}

@run_sub_tests_with_dataset(invalid_delegations)
@utils.run_sub_tests_with_dataset(invalid_delegations)
def test_invalid_delegation_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
with self.assertRaises((ValueError, KeyError, AttributeError)):
Delegations.from_dict(copy.deepcopy(case_dict))


valid_delegations: DataSet = {
valid_delegations: utils.DataSet = {
"all":
'{"keys": { \
"keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \
Expand All @@ -341,43 +325,43 @@ def test_invalid_delegation_serialization(self, test_case_data: str):
}',
}

@run_sub_tests_with_dataset(valid_delegations)
@utils.run_sub_tests_with_dataset(valid_delegations)
def test_delegation_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
delegation = Delegations.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, delegation.to_dict())


invalid_targetfiles: DataSet = {
invalid_targetfiles: utils.DataSet = {
"no hashes": '{"length": 1}',
"no length": '{"hashes": {"sha256": "abc"}}'
# The remaining cases are the same as for invalid_hashes and
# invalid_length datasets.
}

@run_sub_tests_with_dataset(invalid_targetfiles)
@utils.run_sub_tests_with_dataset(invalid_targetfiles)
def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises(KeyError):
TargetFile.from_dict(copy.deepcopy(case_dict), "file1.txt")


valid_targetfiles: DataSet = {
valid_targetfiles: utils.DataSet = {
"all": '{"length": 12, "hashes": {"sha256" : "abc"}, \
"custom" : {"foo": "bar"} }',
"no custom": '{"length": 12, "hashes": {"sha256" : "abc"}}',
"unrecognized field": '{"length": 12, "hashes": {"sha256" : "abc"}, \
"custom" : {"foo": "bar"}, "foo": "bar"}',
}

@run_sub_tests_with_dataset(valid_targetfiles)
@utils.run_sub_tests_with_dataset(valid_targetfiles)
def test_targetfile_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt")
self.assertDictEqual(case_dict, target_file.to_dict())


valid_targets: DataSet = {
valid_targets: utils.DataSet = {
"all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
"targets": { \
"file.txt": {"length": 12, "hashes": {"sha256" : "abc"} }, \
Expand All @@ -403,7 +387,7 @@ def test_targetfile_serialization(self, test_case_data: str):
"targets": {}, "foo": "bar"}',
}

@run_sub_tests_with_dataset(valid_targets)
@utils.run_sub_tests_with_dataset(valid_targets)
def test_targets_serialization(self, test_case_data):
case_dict = json.loads(test_case_data)
targets = Targets.from_dict(copy.deepcopy(case_dict))
Expand Down

0 comments on commit 88245f1

Please sign in to comment.