From 2bc8b61cbc36b1e7aea46798a87b10a1c83dac33 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Mon, 18 Oct 2021 16:52:44 +0300 Subject: [PATCH 1/3] Linting tests with black and exclude old tests Currently, we are using 4 tools: black, pylint, isort and mypy. We want to run all 4 of them on our tests for the new codebase to make it more readable and maintainable, but we don't want to run it on the old test files as they are following the old style guidelines. In order to achieve that we want to use an exclusion list instead of an inclusion list as the new code base will be developed in the future and new test files will appear. On another hand, we don't expect any more additional test files against the old code, so this list is static. I decided to hardcode the names we want to exclude because that way we won't have to remove the Git history, as opposed to renaming or moving the old test files. Even though the list is big around 30 files, I think this solution is fine as this list will only contain files testing the old code meaning the list will have static content. I will apply each of the linters in a separate pr. Signed-off-by: Martin Vrachev --- pyproject.toml | 36 ++++++++++++++++++++++++++++++++++++ tox.ini | 2 +- 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 pyproject.toml diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000..7ed3e472a6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,36 @@ +[tool.black] +force-exclude=""" +(tests/aggregate_tests.py| +tests/fast_server_exit.py| +tests/simple_https_server.py| +tests/simple_server.py| +tests/slow_retrieval_server.py| +tests/utils.py| +tests/test_utils.py| +tests/test_repository_lib.py| +tests/test_arbitrary_package_attack.py| +tests/test_developer_tool.py| +tests/test_download.py| +tests/test_endless_data_attack.py| +tests/test_extraneous_dependencies_attack.py| +tests/test_formats.py| +tests/test_indefinite_freeze_attack.py| +tests/test_key_revocation_integration.py| +tests/test_keydb.py| +tests/test_log.py| +tests/test_mirrors.py| +tests/test_mix_and_match_attack.py| +tests/test_multiple_repositories_integration.py| +tests/test_repository_tool.py| +tests/test_replay_attack.py| +tests/test_roledb.py| +tests/test_root_versioning_integration.py| +tests/test_sig.py| +tests/test_slow_retrieval_attack.py| +tests/test_tutorial.py| +tests/test_unittest_toolbox.py| +tests/test_updater.py| +tests/test_updater_root_rotation_integration.py| +tests/repository_data/generate_project_data.py| +tests/repository_data/generate.py) +""" diff --git a/tox.ini b/tox.ini index 48475539be..b1fba0d71d 100644 --- a/tox.ini +++ b/tox.ini @@ -41,7 +41,7 @@ changedir = {toxinidir} commands = # Use different configs for new (tuf/api/*) and legacy code # TODO: configure black and isort args in pyproject.toml (see #1161) - black --check --diff --line-length 80 tuf/api tuf/ngclient + black --check --diff --line-length 80 --config pyproject.toml tests/ tuf/api tuf/ngclient tests/ isort --check --diff --line-length 80 --profile black -p tuf tuf/api tuf/ngclient pylint -j 0 tuf/api tuf/ngclient --rcfile=tuf/api/pylintrc From 37f686ad2133e531f2be26e579a732d186e5e64b Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 12 Oct 2021 16:14:24 +0300 Subject: [PATCH 2/3] Apply black on the tests of the new code All of the changes included are a result of applying black on our tests on the new code. Signed-off-by: Martin Vrachev --- tests/repository_simulator.py | 11 +- tests/test_api.py | 367 ++++++++++++++------------- tests/test_fetcher.py | 204 +++++++-------- tests/test_fetcher_ng.py | 13 +- tests/test_metadata_serialization.py | 88 +++---- tests/test_trusted_metadata_set.py | 51 ++-- tests/test_updater_with_simulator.py | 21 +- 7 files changed, 378 insertions(+), 377 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 4bd43bbad5..4ab9338e0e 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -76,12 +76,15 @@ SPEC_VER = ".".join(SPECIFICATION_VERSION) + @dataclass class RepositoryTarget: """Contains actual target data and the related target metadata""" + data: bytes target_file: TargetFile + class RepositorySimulator(FetcherInterface): def __init__(self): self.md_root: Metadata[Root] = None @@ -186,7 +189,7 @@ def fetch(self, url: str) -> Iterator[bytes]: elif spliturl.path.startswith("/targets/"): # figure out target path and hash prefix path = spliturl.path[len("/targets/") :] - dir_parts, sep , prefixed_filename = path.rpartition("/") + dir_parts, sep, prefixed_filename = path.rpartition("/") prefix, _, filename = prefixed_filename.partition(".") target_path = f"{dir_parts}{sep}{filename}" @@ -208,7 +211,9 @@ def _fetch_target(self, target_path: str, hash: Optional[str]) -> bytes: logger.debug("fetched target %s", target_path) return repo_target.data - def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: + def _fetch_metadata( + self, role: str, version: Optional[int] = None + ) -> bytes: """Return signed metadata for 'role', using 'version' if it is given. If version is None, non-versioned metadata is being requested @@ -253,7 +258,7 @@ def _compute_hashes_and_length( data = self._fetch_metadata(role) digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM) digest_object.update(data) - hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} + hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} return hashes, len(data) def update_timestamp(self): diff --git a/tests/test_api.py b/tests/test_api.py index d3b3eddec8..ff7b60a498 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -32,35 +32,24 @@ DelegatedRole, ) -from tuf.api.serialization import ( - DeserializationError -) +from tuf.api.serialization import DeserializationError -from tuf.api.serialization.json import ( - JSONSerializer, - CanonicalJSONSerializer -) +from tuf.api.serialization.json import JSONSerializer, CanonicalJSONSerializer from securesystemslib.interface import ( import_ed25519_publickey_from_file, - import_ed25519_privatekey_from_file + import_ed25519_privatekey_from_file, ) from securesystemslib import hash as sslib_hash -from securesystemslib.signer import ( - SSlibSigner, - Signature -) +from securesystemslib.signer import SSlibSigner, Signature -from securesystemslib.keys import ( - generate_ed25519_key -) +from securesystemslib.keys import generate_ed25519_key logger = logging.getLogger(__name__) class TestMetadata(unittest.TestCase): - @classmethod def setUpClass(cls): # Create a temporary directory to store the repository, metadata, and @@ -70,62 +59,65 @@ def setUpClass(cls): cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) test_repo_data = os.path.join( - os.path.dirname(os.path.realpath(__file__)), 'repository_data') + os.path.dirname(os.path.realpath(__file__)), "repository_data" + ) - cls.repo_dir = os.path.join(cls.temporary_directory, 'repository') + cls.repo_dir = os.path.join(cls.temporary_directory, "repository") shutil.copytree( - os.path.join(test_repo_data, 'repository'), cls.repo_dir) + os.path.join(test_repo_data, "repository"), cls.repo_dir + ) - cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore') + cls.keystore_dir = os.path.join(cls.temporary_directory, "keystore") shutil.copytree( - os.path.join(test_repo_data, 'keystore'), cls.keystore_dir) + os.path.join(test_repo_data, "keystore"), cls.keystore_dir + ) # Load keys into memory cls.keystore = {} - for role in ['delegation', 'snapshot', 'targets', 'timestamp']: + for role in ["delegation", "snapshot", "targets", "timestamp"]: cls.keystore[role] = import_ed25519_privatekey_from_file( - os.path.join(cls.keystore_dir, role + '_key'), - password="password" + os.path.join(cls.keystore_dir, role + "_key"), + password="password", ) - @classmethod def tearDownClass(cls): # Remove the temporary repository directory, which should contain all # the metadata, targets, and key files generated for the test cases. shutil.rmtree(cls.temporary_directory) - def test_generic_read(self): for metadata, inner_metadata_cls in [ - ('root', Root), - ('snapshot', Snapshot), - ('timestamp', Timestamp), - ('targets', Targets)]: + ("root", Root), + ("snapshot", Snapshot), + ("timestamp", Timestamp), + ("targets", Targets), + ]: # Load JSON-formatted metdata of each supported type from file # and from out-of-band read JSON string - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") metadata_obj = Metadata.from_file(path) - with open(path, 'rb') as f: + with open(path, "rb") as f: metadata_obj2 = Metadata.from_bytes(f.read()) # Assert that both methods instantiate the right inner class for # each metadata type and ... + self.assertTrue(isinstance(metadata_obj.signed, inner_metadata_cls)) self.assertTrue( - isinstance(metadata_obj.signed, inner_metadata_cls)) - self.assertTrue( - isinstance(metadata_obj2.signed, inner_metadata_cls)) + isinstance(metadata_obj2.signed, inner_metadata_cls) + ) # ... and return the same object (compared by dict representation) self.assertDictEqual( - metadata_obj.to_dict(), metadata_obj2.to_dict()) + metadata_obj.to_dict(), metadata_obj2.to_dict() + ) # Assert that it chokes correctly on an unknown metadata type - bad_metadata_path = 'bad-metadata.json' - bad_metadata = {'signed': {'_type': 'bad-metadata'}} - bad_string = json.dumps(bad_metadata).encode('utf-8') - with open(bad_metadata_path, 'wb') as f: + bad_metadata_path = "bad-metadata.json" + bad_metadata = {"signed": {"_type": "bad-metadata"}} + bad_string = json.dumps(bad_metadata).encode("utf-8") + with open(bad_metadata_path, "wb") as f: f.write(bad_string) with self.assertRaises(DeserializationError): @@ -135,35 +127,33 @@ def test_generic_read(self): os.remove(bad_metadata_path) - def test_compact_json(self): - path = os.path.join(self.repo_dir, 'metadata', 'targets.json') + path = os.path.join(self.repo_dir, "metadata", "targets.json") metadata_obj = Metadata.from_file(path) self.assertTrue( - len(JSONSerializer(compact=True).serialize(metadata_obj)) < - len(JSONSerializer().serialize(metadata_obj))) - + len(JSONSerializer(compact=True).serialize(metadata_obj)) + < len(JSONSerializer().serialize(metadata_obj)) + ) def test_read_write_read_compare(self): - for metadata in ['root', 'snapshot', 'timestamp', 'targets']: - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + for metadata in ["root", "snapshot", "timestamp", "targets"]: + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") metadata_obj = Metadata.from_file(path) - path_2 = path + '.tmp' + path_2 = path + ".tmp" metadata_obj.to_file(path_2) metadata_obj_2 = Metadata.from_file(path_2) self.assertDictEqual( - metadata_obj.to_dict(), - metadata_obj_2.to_dict()) + metadata_obj.to_dict(), metadata_obj_2.to_dict() + ) os.remove(path_2) - def test_to_from_bytes(self): for metadata in ["root", "snapshot", "timestamp", "targets"]: - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - with open(path, 'rb') as f: + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") + with open(path, "rb") as f: metadata_bytes = f.read() metadata_obj = Metadata.from_bytes(metadata_bytes) # Comparate that from_bytes/to_bytes doesn't change the content @@ -177,13 +167,10 @@ def test_to_from_bytes(self): # Case 2: test compact by using the default serializer. obj_bytes = metadata_obj.to_bytes() metadata_obj_2 = Metadata.from_bytes(obj_bytes) - self.assertEqual( - metadata_obj_2.to_bytes(), obj_bytes - ) - + self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes) def test_sign_verify(self): - root_path = os.path.join(self.repo_dir, 'metadata', 'root.json') + root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path).signed # Locate the public keys we need from root @@ -195,7 +182,7 @@ def test_sign_verify(self): timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (targets) and assert ... - path = os.path.join(self.repo_dir, 'metadata', 'targets.json') + path = os.path.join(self.repo_dir, "metadata", "targets.json") metadata_obj = Metadata.from_file(path) # ... it has a single existing signature, @@ -210,7 +197,7 @@ def test_sign_verify(self): with self.assertRaises(exceptions.UnsignedMetadataError): targets_key.verify_signature(metadata_obj, JSONSerializer()) - sslib_signer = SSlibSigner(self.keystore['snapshot']) + sslib_signer = SSlibSigner(self.keystore["snapshot"]) # Append a new signature with the unrelated key and assert that ... sig = metadata_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and @@ -221,7 +208,7 @@ def test_sign_verify(self): # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) - sslib_signer = SSlibSigner(self.keystore['timestamp']) + sslib_signer = SSlibSigner(self.keystore["timestamp"]) # Create and assign (don't append) a new signature and assert that ... metadata_obj.sign(sslib_signer, append=False) # ... there now is only one signature, @@ -253,7 +240,7 @@ def test_sign_verify(self): timestamp_key.verify_signature(metadata_obj) # Test failure with valid but incorrect signature - sig.signature = "ff"*64 + sig.signature = "ff" * 64 with self.assertRaises(exceptions.UnsignedMetadataError): timestamp_key.verify_signature(metadata_obj) sig.signature = correct_sig @@ -261,8 +248,7 @@ def test_sign_verify(self): def test_metadata_base(self): # Use of Snapshot is arbitrary, we're just testing the base class features # with real data - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") md = Metadata.from_file(snapshot_path) self.assertEqual(md.signed.version, 1) @@ -295,33 +281,35 @@ def test_metadata_base(self): # Test deserializing metadata with non-unique signatures: data = md.to_dict() - data["signatures"].append({"keyid": data["signatures"][0]["keyid"], "sig": "foo"}) + data["signatures"].append( + {"keyid": data["signatures"][0]["keyid"], "sig": "foo"} + ) with self.assertRaises(ValueError): Metadata.from_dict(data) - def test_metadata_snapshot(self): - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") snapshot = Metadata[Snapshot].from_file(snapshot_path) # Create a MetaFile instance representing what we expect # the updated data to be. - hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} + hashes = { + "sha256": "c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095" + } fileinfo = MetaFile(2, 123, hashes) self.assertNotEqual( - snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict() + snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict() ) - snapshot.signed.update('role1', fileinfo) + snapshot.signed.update("role1", fileinfo) self.assertEqual( - snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict() + snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict() ) - def test_metadata_timestamp(self): timestamp_path = os.path.join( - self.repo_dir, 'metadata', 'timestamp.json') + self.repo_dir, "metadata", "timestamp.json" + ) timestamp = Metadata[Timestamp].from_file(timestamp_path) self.assertEqual(timestamp.signed.version, 1) @@ -345,7 +333,9 @@ def test_metadata_timestamp(self): # Create a MetaFile instance representing what we expect # the updated data to be. - hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} + hashes = { + "sha256": "0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc" + } fileinfo = MetaFile(2, 520, hashes) self.assertNotEqual( @@ -356,102 +346,99 @@ def test_metadata_timestamp(self): timestamp.signed.snapshot_meta.to_dict(), fileinfo.to_dict() ) - def test_metadata_verify_delegate(self): - root_path = os.path.join(self.repo_dir, 'metadata', 'root.json') + root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path) - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") snapshot = Metadata[Snapshot].from_file(snapshot_path) - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) - role1_path = os.path.join( - self.repo_dir, 'metadata', 'role1.json') + role1_path = os.path.join(self.repo_dir, "metadata", "role1.json") role1 = Metadata[Targets].from_file(role1_path) - role2_path = os.path.join( - self.repo_dir, 'metadata', 'role2.json') + role2_path = os.path.join(self.repo_dir, "metadata", "role2.json") role2 = Metadata[Targets].from_file(role2_path) # test the expected delegation tree - root.verify_delegate('root', root) - root.verify_delegate('snapshot', snapshot) - root.verify_delegate('targets', targets) - targets.verify_delegate('role1', role1) - role1.verify_delegate('role2', role2) + root.verify_delegate("root", root) + root.verify_delegate("snapshot", snapshot) + root.verify_delegate("targets", targets) + targets.verify_delegate("role1", role1) + role1.verify_delegate("role2", role2) # only root and targets can verify delegates with self.assertRaises(TypeError): - snapshot.verify_delegate('snapshot', snapshot) + snapshot.verify_delegate("snapshot", snapshot) # verify fails for roles that are not delegated by delegator with self.assertRaises(ValueError): - root.verify_delegate('role1', role1) + root.verify_delegate("role1", role1) with self.assertRaises(ValueError): - targets.verify_delegate('targets', targets) + targets.verify_delegate("targets", targets) # verify fails when delegator has no delegations with self.assertRaises(ValueError): - role2.verify_delegate('role1', role1) + role2.verify_delegate("role1", role1) # verify fails when delegate content is modified expires = snapshot.signed.expires snapshot.signed.bump_expiration() with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) snapshot.signed.expires = expires # verify fails if roles keys do not sign the metadata with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('timestamp', snapshot) + root.verify_delegate("timestamp", snapshot) # Add a key to snapshot role, make sure the new sig fails to verify ts_keyid = next(iter(root.signed.roles["timestamp"].keyids)) root.signed.add_key("snapshot", root.signed.keys[ts_keyid]) - snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff"*64) + snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64) # verify succeeds if threshold is reached even if some signatures # fail to verify - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) # verify fails if threshold of signatures is not reached - root.signed.roles['snapshot'].threshold = 2 + root.signed.roles["snapshot"].threshold = 2 with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) # verify succeeds when we correct the new signature and reach the # threshold of 2 keys - snapshot.sign(SSlibSigner(self.keystore['timestamp']), append=True) - root.verify_delegate('snapshot', snapshot) - + snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True) + root.verify_delegate("snapshot", snapshot) def test_key_class(self): # Test if from_securesystemslib_key removes the private key from keyval # of a securesystemslib key dictionary. sslib_key = generate_ed25519_key() key = Key.from_securesystemslib_key(sslib_key) - self.assertFalse('private' in key.keyval.keys()) - + self.assertFalse("private" in key.keyval.keys()) def test_root_add_key_and_remove_key(self): - root_path = os.path.join( - self.repo_dir, 'metadata', 'root.json') + root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path) # Create a new key - root_key2 = import_ed25519_publickey_from_file( - os.path.join(self.keystore_dir, 'root_key2.pub')) - keyid = root_key2['keyid'] - key_metadata = Key(keyid, root_key2['keytype'], root_key2['scheme'], - root_key2['keyval']) + root_key2 = import_ed25519_publickey_from_file( + os.path.join(self.keystore_dir, "root_key2.pub") + ) + keyid = root_key2["keyid"] + key_metadata = Key( + keyid, + root_key2["keytype"], + root_key2["scheme"], + root_key2["keyval"], + ) # Assert that root does not contain the new key - self.assertNotIn(keyid, root.signed.roles['root'].keyids) + self.assertNotIn(keyid, root.signed.roles["root"].keyids) self.assertNotIn(keyid, root.signed.keys) # Add new root key - root.signed.add_key('root', key_metadata) + root.signed.add_key("root", key_metadata) # Assert that key is added - self.assertIn(keyid, root.signed.roles['root'].keyids) + self.assertIn(keyid, root.signed.roles["root"].keyids) self.assertIn(keyid, root.signed.keys) # Confirm that the newly added key does not break @@ -459,31 +446,31 @@ def test_root_add_key_and_remove_key(self): root.to_dict() # Try adding the same key again and assert its ignored. - pre_add_keyid = root.signed.roles['root'].keyids.copy() - root.signed.add_key('root', key_metadata) - self.assertEqual(pre_add_keyid, root.signed.roles['root'].keyids) + pre_add_keyid = root.signed.roles["root"].keyids.copy() + root.signed.add_key("root", key_metadata) + self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids) # Add the same key to targets role as well - root.signed.add_key('targets', key_metadata) + root.signed.add_key("targets", key_metadata) # Add the same key to a nonexistent role. with self.assertRaises(ValueError): root.signed.add_key("nosuchrole", key_metadata) # Remove the key from root role (targets role still uses it) - root.signed.remove_key('root', keyid) - self.assertNotIn(keyid, root.signed.roles['root'].keyids) + root.signed.remove_key("root", keyid) + self.assertNotIn(keyid, root.signed.roles["root"].keyids) self.assertIn(keyid, root.signed.keys) # Remove the key from targets as well - root.signed.remove_key('targets', keyid) - self.assertNotIn(keyid, root.signed.roles['targets'].keyids) + root.signed.remove_key("targets", keyid) + self.assertNotIn(keyid, root.signed.roles["targets"].keyids) self.assertNotIn(keyid, root.signed.keys) with self.assertRaises(ValueError): - root.signed.remove_key('root', 'nosuchkey') + root.signed.remove_key("root", "nosuchkey") with self.assertRaises(ValueError): - root.signed.remove_key('nosuchrole', keyid) + root.signed.remove_key("nosuchrole", keyid) def test_is_target_in_pathpattern(self): supported_use_cases = [ @@ -505,28 +492,26 @@ def test_is_target_in_pathpattern(self): invalid_use_cases = [ ("targets/foo.tgz", "*.tgz"), - ("/foo.tgz", "*.tgz",), + ("/foo.tgz", "*.tgz"), ("targets/foo.tgz", "*"), ("foo-version-alpha.tgz", "foo-version-?.tgz"), ("foo//bar", "*/bar"), - ("foo/bar", "f?/bar") + ("foo/bar", "f?/bar"), ] for targetpath, pathpattern in invalid_use_cases: self.assertFalse( DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern) ) - def test_metadata_targets(self): - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) # Create a fileinfo dict representing what we expect the updated data to be - filename = 'file2.txt' + filename = "file2.txt" hashes = { "sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b", - "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0" + "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0", } fileinfo = TargetFile(length=28, hashes=hashes, path=filename) @@ -543,18 +528,19 @@ def test_metadata_targets(self): ) def test_targets_key_api(self): - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets: Targets = Metadata[Targets].from_file(targets_path).signed # Add a new delegated role "role2" in targets - delegated_role = DelegatedRole.from_dict({ + delegated_role = DelegatedRole.from_dict( + { "keyids": [], "name": "role2", "paths": ["fn3", "fn4"], "terminating": False, - "threshold": 1 - }) + "threshold": 1, + } + ) targets.delegations.roles["role2"] = delegated_role key_dict = { @@ -562,7 +548,7 @@ def test_targets_key_api(self): "keyval": { "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" }, - "scheme": "ed25519" + "scheme": "ed25519", } key = Key.from_dict("id2", key_dict) @@ -618,19 +604,18 @@ def test_targets_key_api(self): targets.remove_key("role1", key.keyid) self.assertTrue(targets.delegations is None) - - def test_length_and_hash_validation(self): + def test_length_and_hash_validation(self): # Test metadata files' hash and length verification. # Use timestamp to get a MetaFile object and snapshot # for untrusted metadata file to verify. timestamp_path = os.path.join( - self.repo_dir, 'metadata', 'timestamp.json') + self.repo_dir, "metadata", "timestamp.json" + ) timestamp = Metadata[Timestamp].from_file(timestamp_path) snapshot_metafile = timestamp.signed.snapshot_meta - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") with open(snapshot_path, "rb") as file: # test with data as a file object @@ -643,36 +628,49 @@ def test_length_and_hash_validation(self): # test exceptions expected_length = snapshot_metafile.length snapshot_metafile.length = 2345 - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) snapshot_metafile.length = expected_length - snapshot_metafile.hashes = {'sha256': 'incorrecthash'} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = {"sha256": "incorrecthash"} + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) - snapshot_metafile.hashes = {'unsupported-alg': "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = { + "unsupported-alg": "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" + } + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) # Test wrong algorithm format (sslib.FormatError) - snapshot_metafile.hashes = { 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = { + 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" + } + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) # test optional length and hashes snapshot_metafile.length = None snapshot_metafile.hashes = None snapshot_metafile.verify_length_and_hashes(data) - # Test target files' hash and length verification - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) - file1_targetfile = targets.signed.targets['file1.txt'] - filepath = os.path.join( - self.repo_dir, 'targets', 'file1.txt') + file1_targetfile = targets.signed.targets["file1.txt"] + filepath = os.path.join(self.repo_dir, "targets", "file1.txt") with open(filepath, "rb") as file1: file1_targetfile.verify_length_and_hashes(file1) @@ -680,50 +678,58 @@ def test_length_and_hash_validation(self): # test exceptions expected_length = file1_targetfile.length file1_targetfile.length = 2345 - self.assertRaises(exceptions.LengthOrHashMismatchError, - file1_targetfile.verify_length_and_hashes, file1) + self.assertRaises( + exceptions.LengthOrHashMismatchError, + file1_targetfile.verify_length_and_hashes, + file1, + ) file1_targetfile.length = expected_length - file1_targetfile.hashes = {'sha256': 'incorrecthash'} - self.assertRaises(exceptions.LengthOrHashMismatchError, - file1_targetfile.verify_length_and_hashes, file1) + file1_targetfile.hashes = {"sha256": "incorrecthash"} + self.assertRaises( + exceptions.LengthOrHashMismatchError, + file1_targetfile.verify_length_and_hashes, + file1, + ) def test_targetfile_from_file(self): # Test with an existing file and valid hash algorithm - file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') + file_path = os.path.join(self.repo_dir, "targets", "file1.txt") targetfile_from_file = TargetFile.from_file( - file_path, file_path, ['sha256'] + file_path, file_path, ["sha256"] ) with open(file_path, "rb") as file: targetfile_from_file.verify_length_and_hashes(file) # Test with a non-existing file - file_path = os.path.join(self.repo_dir, 'targets', 'file123.txt') + file_path = os.path.join(self.repo_dir, "targets", "file123.txt") self.assertRaises( - FileNotFoundError, - TargetFile.from_file, - file_path, + FileNotFoundError, + TargetFile.from_file, + file_path, file_path, - [sslib_hash.DEFAULT_HASH_ALGORITHM] + [sslib_hash.DEFAULT_HASH_ALGORITHM], ) # Test with an unsupported algorithm - file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') + file_path = os.path.join(self.repo_dir, "targets", "file1.txt") self.assertRaises( exceptions.UnsupportedAlgorithmError, - TargetFile.from_file, - file_path, + TargetFile.from_file, + file_path, file_path, - ['123'] + ["123"], ) def test_targetfile_from_data(self): data = b"Inline test content" - target_file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') - + target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + # Test with a valid hash algorithm - targetfile_from_data = TargetFile.from_data(target_file_path, data, ['sha256']) + targetfile_from_data = TargetFile.from_data( + target_file_path, data, ["sha256"] + ) targetfile_from_data.verify_length_and_hashes(data) # Test with no algorithms specified @@ -753,7 +759,8 @@ def test_is_delegated_role(self): self.assertFalse(role.is_delegated_path("a/non-matching path")) self.assertTrue(role.is_delegated_path("a/path")) + # Run unit test. -if __name__ == '__main__': +if __name__ == "__main__": utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index bf94f252d8..c575f40f3a 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -25,107 +25,109 @@ class TestFetcher(unittest_toolbox.Modified_TestCase): - def setUp(self): - """ - Create a temporary file and launch a simple server in the - current working directory. - """ - - unittest_toolbox.Modified_TestCase.setUp(self) - - # Making a temporary file. - current_dir = os.getcwd() - target_filepath = self.make_temp_data_file(directory=current_dir) - self.target_fileobj = open(target_filepath, 'r') - self.file_contents = self.target_fileobj.read() - self.file_length = len(self.file_contents) - - # Launch a SimpleHTTPServer (serves files in the current dir). - self.server_process_handler = utils.TestServerProcess(log=logger) - - rel_target_filepath = os.path.basename(target_filepath) - self.url = 'http://' + utils.TEST_HOST_ADDRESS + ':' \ - + str(self.server_process_handler.port) + '/' + rel_target_filepath - - # Create a temporary file where the target file chunks are written - # during fetching - self.temp_file = tempfile.TemporaryFile() - self.fetcher = tuf.requests_fetcher.RequestsFetcher() - - - # Stop server process and perform clean up. - def tearDown(self): - # Cleans the resources and flush the logged lines (if any). - self.server_process_handler.clean() - - self.target_fileobj.close() - self.temp_file.close() - - # Remove temporary directory - unittest_toolbox.Modified_TestCase.tearDown(self) - - - # Test: Normal case. - def test_fetch(self): - for chunk in self.fetcher.fetch(self.url, self.file_length): - self.temp_file.write(chunk) - - self.temp_file.seek(0) - temp_file_data = self.temp_file.read().decode('utf-8') - self.assertEqual(self.file_contents, temp_file_data) - - # Test if fetcher downloads file up to a required length - def test_fetch_restricted_length(self): - for chunk in self.fetcher.fetch(self.url, self.file_length-4): - self.temp_file.write(chunk) - - self.temp_file.seek(0, io.SEEK_END) - self.assertEqual(self.temp_file.tell(), self.file_length-4) - - - # Test that fetcher does not download more than actual file length - def test_fetch_upper_length(self): - for chunk in self.fetcher.fetch(self.url, self.file_length+4): - self.temp_file.write(chunk) - - self.temp_file.seek(0, io.SEEK_END) - self.assertEqual(self.temp_file.tell(), self.file_length) - - - # Test incorrect URL parsing - def test_url_parsing(self): - with self.assertRaises(tuf.exceptions.URLParsingError): - self.fetcher.fetch(self.random_string(), self.file_length) - - - # Test: Normal case with url data downloaded in more than one chunk - def test_fetch_in_chunks(self): - # Set smaller chunk size to ensure that the file will be downloaded - # in more than one chunk - default_chunk_size = tuf.settings.CHUNK_SIZE - tuf.settings.CHUNK_SIZE = 4 - - # expected_chunks_count: 3 - expected_chunks_count = math.ceil(self.file_length/tuf.settings.CHUNK_SIZE) - self.assertEqual(expected_chunks_count, 3) - - chunks_count = 0 - for chunk in self.fetcher.fetch(self.url, self.file_length): - self.temp_file.write(chunk) - chunks_count+=1 - - self.temp_file.seek(0) - temp_file_data = self.temp_file.read().decode('utf-8') - self.assertEqual(self.file_contents, temp_file_data) - # Check that we calculate chunks as expected - self.assertEqual(chunks_count, expected_chunks_count) - - # Restore default settings - tuf.settings.CHUNK_SIZE = default_chunk_size - + def setUp(self): + """ + Create a temporary file and launch a simple server in the + current working directory. + """ + + unittest_toolbox.Modified_TestCase.setUp(self) + + # Making a temporary file. + current_dir = os.getcwd() + target_filepath = self.make_temp_data_file(directory=current_dir) + self.target_fileobj = open(target_filepath, "r") + self.file_contents = self.target_fileobj.read() + self.file_length = len(self.file_contents) + + # Launch a SimpleHTTPServer (serves files in the current dir). + self.server_process_handler = utils.TestServerProcess(log=logger) + + rel_target_filepath = os.path.basename(target_filepath) + self.url = ( + "http://" + + utils.TEST_HOST_ADDRESS + + ":" + + str(self.server_process_handler.port) + + "/" + + rel_target_filepath + ) + + # Create a temporary file where the target file chunks are written + # during fetching + self.temp_file = tempfile.TemporaryFile() + self.fetcher = tuf.requests_fetcher.RequestsFetcher() + + # Stop server process and perform clean up. + def tearDown(self): + # Cleans the resources and flush the logged lines (if any). + self.server_process_handler.clean() + + self.target_fileobj.close() + self.temp_file.close() + + # Remove temporary directory + unittest_toolbox.Modified_TestCase.tearDown(self) + + # Test: Normal case. + def test_fetch(self): + for chunk in self.fetcher.fetch(self.url, self.file_length): + self.temp_file.write(chunk) + + self.temp_file.seek(0) + temp_file_data = self.temp_file.read().decode("utf-8") + self.assertEqual(self.file_contents, temp_file_data) + + # Test if fetcher downloads file up to a required length + def test_fetch_restricted_length(self): + for chunk in self.fetcher.fetch(self.url, self.file_length - 4): + self.temp_file.write(chunk) + + self.temp_file.seek(0, io.SEEK_END) + self.assertEqual(self.temp_file.tell(), self.file_length - 4) + + # Test that fetcher does not download more than actual file length + def test_fetch_upper_length(self): + for chunk in self.fetcher.fetch(self.url, self.file_length + 4): + self.temp_file.write(chunk) + + self.temp_file.seek(0, io.SEEK_END) + self.assertEqual(self.temp_file.tell(), self.file_length) + + # Test incorrect URL parsing + def test_url_parsing(self): + with self.assertRaises(tuf.exceptions.URLParsingError): + self.fetcher.fetch(self.random_string(), self.file_length) + + # Test: Normal case with url data downloaded in more than one chunk + def test_fetch_in_chunks(self): + # Set smaller chunk size to ensure that the file will be downloaded + # in more than one chunk + default_chunk_size = tuf.settings.CHUNK_SIZE + tuf.settings.CHUNK_SIZE = 4 + + # expected_chunks_count: 3 + expected_chunks_count = math.ceil( + self.file_length / tuf.settings.CHUNK_SIZE + ) + self.assertEqual(expected_chunks_count, 3) + + chunks_count = 0 + for chunk in self.fetcher.fetch(self.url, self.file_length): + self.temp_file.write(chunk) + chunks_count += 1 + + self.temp_file.seek(0) + temp_file_data = self.temp_file.read().decode("utf-8") + self.assertEqual(self.file_contents, temp_file_data) + # Check that we calculate chunks as expected + self.assertEqual(chunks_count, expected_chunks_count) + + # Restore default settings + tuf.settings.CHUNK_SIZE = default_chunk_size # Run unit test. -if __name__ == '__main__': - utils.configure_test_logging(sys.argv) - unittest.main() +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_fetcher_ng.py b/tests/test_fetcher_ng.py index d384ee3132..652fa4cbb8 100644 --- a/tests/test_fetcher_ng.py +++ b/tests/test_fetcher_ng.py @@ -25,7 +25,6 @@ class TestFetcher(unittest_toolbox.Modified_TestCase): - @classmethod def setUpClass(cls): # Launch a SimpleHTTPServer (serves files in the current dir). @@ -111,10 +110,14 @@ def test_http_error(self): self.assertEqual(cm.exception.status_code, 404) # Response read timeout error - @patch.object(requests.Session, 'get') + @patch.object(requests.Session, "get") def test_response_read_timeout(self, mock_session_get): mock_response = Mock() - attr = {'raw.read.side_effect': urllib3.exceptions.ReadTimeoutError(None, None, "Read timed out.")} + attr = { + "raw.read.side_effect": urllib3.exceptions.ReadTimeoutError( + None, None, "Read timed out." + ) + } mock_response.configure_mock(**attr) mock_session_get.return_value = mock_response @@ -123,7 +126,9 @@ def test_response_read_timeout(self, mock_session_get): mock_response.raw.read.assert_called_once() # Read/connect session timeout error - @patch.object(requests.Session, 'get', side_effect=urllib3.exceptions.TimeoutError) + @patch.object( + requests.Session, "get", side_effect=urllib3.exceptions.TimeoutError + ) def test_session_get_timeout(self, mock_session_get): with self.assertRaises(exceptions.SlowRetrievalError): self.fetcher.fetch(self.url) diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 13bb55003a..d030801375 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -41,28 +41,17 @@ class TestSerialization(unittest.TestCase): "no spec_version": '{"_type": "signed", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no version": '{"_type": "signed", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no expires": '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "meta": {}}', - "empty str _type": - '{"_type": "", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "empty str spec_version": - '{"_type": "signed", "spec_version": "", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "_type wrong type": - '{"_type": "foo", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version wrong type": - '{"_type": "signed", "spec_version": "1.0.0", "version": "a", "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "invalid spec_version str": - '{"_type": "signed", "spec_version": "abc", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "two digit spec_version": - '{"_type": "signed", "spec_version": "1.2.a", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "no digit spec_version": - '{"_type": "signed", "spec_version": "a.b.c", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "different major spec_version": - '{"_type": "signed", "spec_version": "0.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version 0": - '{"_type": "signed", "spec_version": "1.0.0", "version": 0, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version below 0": - '{"_type": "signed", "spec_version": "1.0.0", "version": -1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "wrong datetime string": - '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}', + "empty str _type": '{"_type": "", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "empty str spec_version": '{"_type": "signed", "spec_version": "", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "_type wrong type": '{"_type": "foo", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version wrong type": '{"_type": "signed", "spec_version": "1.0.0", "version": "a", "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "invalid spec_version str": '{"_type": "signed", "spec_version": "abc", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "two digit spec_version": '{"_type": "signed", "spec_version": "1.2.a", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "no digit spec_version": '{"_type": "signed", "spec_version": "a.b.c", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "different major spec_version": '{"_type": "signed", "spec_version": "0.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version 0": '{"_type": "signed", "spec_version": "1.0.0", "version": 0, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version below 0": '{"_type": "signed", "spec_version": "1.0.0", "version": -1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "wrong datetime string": '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}', } @utils.run_sub_tests_with_dataset(invalid_signed) @@ -71,7 +60,6 @@ def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]): with self.assertRaises((KeyError, ValueError, TypeError)): Snapshot.from_dict(copy.deepcopy(case_dict)) - valid_keys: utils.DataSet = { "all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ "keyval": {"public": "foo"}}', @@ -87,7 +75,6 @@ def test_valid_key_serialization(self, test_case_data: str): key = Key.from_dict("id", copy.copy(case_dict)) self.assertDictEqual(case_dict, key.to_dict()) - invalid_keys: utils.DataSet = { "no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}', "no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}', @@ -120,7 +107,6 @@ def test_invalid_role_serialization(self, test_case_data: Dict[str, str]): with self.assertRaises((KeyError, TypeError, ValueError)): Role.from_dict(copy.deepcopy(case_dict)) - valid_roles: utils.DataSet = { "all": '{"keyids": ["keyid"], "threshold": 3}', "many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}', @@ -134,7 +120,6 @@ def test_role_serialization(self, test_case_data: str): role = Role.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, role.to_dict()) - valid_roots: utils.DataSet = { "all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ @@ -168,7 +153,6 @@ def test_root_serialization(self, test_case_data: str): root = Root.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, root.to_dict()) - invalid_metafiles: utils.DataSet = { "wrong length type": '{"version": 1, "length": "a", "hashes": {"sha256" : "abc"}}', "length 0": '{"version": 1, "length": 0, "hashes": {"sha256" : "abc"}}', @@ -179,12 +163,13 @@ def test_root_serialization(self, test_case_data: str): } @utils.run_sub_tests_with_dataset(invalid_metafiles) - def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_metafile_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises((TypeError, ValueError, AttributeError)): MetaFile.from_dict(copy.deepcopy(case_dict)) - valid_metafiles: utils.DataSet = { "all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}', "no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }', @@ -204,12 +189,13 @@ def test_metafile_serialization(self, test_case_data: str): } @utils.run_sub_tests_with_dataset(invalid_timestamps) - def test_invalid_timestamp_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_timestamp_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises((ValueError, KeyError)): Timestamp.from_dict(copy.deepcopy(case_dict)) - valid_timestamps: utils.DataSet = { "all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}', @@ -223,7 +209,6 @@ def test_timestamp_serialization(self, test_case_data: str): timestamp = Timestamp.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, timestamp.to_dict()) - valid_snapshots: utils.DataSet = { "all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": { \ @@ -243,23 +228,18 @@ def test_snapshot_serialization(self, test_case_data: str): snapshot = Snapshot.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, snapshot.to_dict()) - valid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the valid_roles. - "no hash prefix attribute": - '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ + "no hash prefix attribute": '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ "terminating": false, "threshold": 1}', - "no path attribute": - '{"keyids": ["keyid"], "name": "a", "terminating": false, \ + "no path attribute": '{"keyids": ["keyid"], "name": "a", "terminating": false, \ "path_hash_prefixes": ["h1", "h2"], "threshold": 99}', "empty paths": '{"keyids": ["keyid"], "name": "a", "paths": [], \ "terminating": false, "threshold": 1}', "empty path_hash_prefixes": '{"keyids": ["keyid"], "name": "a", "terminating": false, \ "path_hash_prefixes": [], "threshold": 99}', - "unrecognized field": - '{"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3, "foo": "bar"}', - "many keyids": - '{"keyids": ["keyid1", "keyid2"], "name": "a", "paths": ["fn1", "fn2"], \ + "unrecognized field": '{"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3, "foo": "bar"}', + "many keyids": '{"keyids": ["keyid1", "keyid2"], "name": "a", "paths": ["fn1", "fn2"], \ "terminating": false, "threshold": 1}', } @@ -269,13 +249,10 @@ def test_delegated_role_serialization(self, test_case_data: str): deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, deserialized_role.to_dict()) - invalid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the invalid_roles. - "missing hash prefixes and paths": - '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', - "both hash prefixes and paths": - '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ + "missing hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', + "both hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ "paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}', } @@ -285,9 +262,8 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str): with self.assertRaises(ValueError): DelegatedRole.from_dict(copy.copy(case_dict)) - invalid_delegations: utils.DataSet = { - "empty delegations": '{}', + "empty delegations": "{}", "bad keys": '{"keys": "foo", \ "roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}', "bad roles": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ @@ -306,18 +282,15 @@ def test_invalid_delegation_serialization(self, test_case_data: str): with self.assertRaises((ValueError, KeyError, AttributeError)): Delegations.from_dict(copy.deepcopy(case_dict)) - valid_delegations: utils.DataSet = { - "all": - '{"keys": { \ + "all": '{"keys": { \ "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ "keyid2" : {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"public": "bar"}}}, \ "roles": [ \ {"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3}, \ {"keyids": ["keyid2"], "name": "b", "terminating": true, "paths": ["fn2"], "threshold": 4} ] \ }', - "unrecognized field": - '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ + "unrecognized field": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ "roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ], \ "foo": "bar"}', "empty keys and roles": '{"keys": {}, \ @@ -331,7 +304,6 @@ def test_delegation_serialization(self, test_case_data: str): delegation = Delegations.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, delegation.to_dict()) - invalid_targetfiles: utils.DataSet = { "no hashes": '{"length": 1}', "no length": '{"hashes": {"sha256": "abc"}}' @@ -340,12 +312,13 @@ def test_delegation_serialization(self, test_case_data: str): } @utils.run_sub_tests_with_dataset(invalid_targetfiles) - def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_targetfile_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises(KeyError): TargetFile.from_dict(copy.deepcopy(case_dict), "file1.txt") - valid_targetfiles: utils.DataSet = { "all": '{"length": 12, "hashes": {"sha256" : "abc"}, \ "custom" : {"foo": "bar"} }', @@ -360,7 +333,6 @@ def test_targetfile_serialization(self, test_case_data: str): target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt") self.assertDictEqual(case_dict, target_file.to_dict()) - valid_targets: utils.DataSet = { "all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": { \ @@ -395,6 +367,6 @@ def test_targets_serialization(self, test_case_data): # Run unit test. -if __name__ == '__main__': +if __name__ == "__main__": utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 925b16a935..12e429b25a 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -13,22 +13,22 @@ Timestamp, Snapshot, MetaFile, - Targets + Targets, ) from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet from securesystemslib.signer import SSlibSigner -from securesystemslib.interface import( +from securesystemslib.interface import ( import_ed25519_privatekey_from_file, - import_rsa_privatekey_from_file + import_rsa_privatekey_from_file, ) from tests import utils logger = logging.getLogger(__name__) -class TestTrustedMetadataSet(unittest.TestCase): +class TestTrustedMetadataSet(unittest.TestCase): def modify_metadata( self, rolename: str, modification_func: Callable[["Signed"], None] ) -> bytes: @@ -48,24 +48,29 @@ def modify_metadata( @classmethod def setUpClass(cls): cls.repo_dir = os.path.join( - os.getcwd(), 'repository_data', 'repository', 'metadata' + os.getcwd(), "repository_data", "repository", "metadata" ) cls.metadata = {} - for md in ["root", "timestamp", "snapshot", "targets", "role1", "role2"]: + for md in [ + "root", + "timestamp", + "snapshot", + "targets", + "role1", + "role2", + ]: with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f: cls.metadata[md] = f.read() - keystore_dir = os.path.join(os.getcwd(), 'repository_data', 'keystore') + keystore_dir = os.path.join(os.getcwd(), "repository_data", "keystore") cls.keystore = {} root_key_dict = import_rsa_privatekey_from_file( - os.path.join(keystore_dir, "root" + '_key'), - password="password" + os.path.join(keystore_dir, "root" + "_key"), password="password" ) cls.keystore["root"] = SSlibSigner(root_key_dict) for role in ["delegation", "snapshot", "targets", "timestamp"]: key_dict = import_ed25519_privatekey_from_file( - os.path.join(keystore_dir, role + '_key'), - password="password" + os.path.join(keystore_dir, role + "_key"), password="password" ) cls.keystore[role] = SSlibSigner(key_dict) @@ -80,7 +85,6 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: def setUp(self) -> None: self.trusted_set = TrustedMetadataSet(self.metadata["root"]) - def _update_all_besides_targets( self, timestamp_bytes: Optional[bytes] = None, @@ -103,7 +107,6 @@ def _update_all_besides_targets( snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] self.trusted_set.update_snapshot(snapshot_bytes) - def test_update(self): self.trusted_set.update_timestamp(self.metadata["timestamp"]) self.trusted_set.update_snapshot(self.metadata["snapshot"]) @@ -224,7 +227,7 @@ def test_update_root_new_root_ver_same_as_trusted_root_ver(self): def test_root_expired_final_root(self): def root_expired_modifier(root: Root) -> None: root.expires = datetime(1970, 1, 1) - + # intermediate root can be expired root = self.modify_metadata("root", root_expired_modifier) tmp_trusted_set = TrustedMetadataSet(root) @@ -232,12 +235,11 @@ def root_expired_modifier(root: Root) -> None: with self.assertRaises(exceptions.ExpiredMetadataError): tmp_trusted_set.update_timestamp(self.metadata["timestamp"]) - def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): # new_timestamp.version < trusted_timestamp.version def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 - + timestamp = self.modify_metadata("timestamp", version_modifier) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): @@ -261,7 +263,9 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: timestamp.expires = datetime(1970, 1, 1) # expired intermediate timestamp is loaded but raises - timestamp = self.modify_metadata("timestamp", timestamp_expired_modifier) + timestamp = self.modify_metadata( + "timestamp", timestamp_expired_modifier + ) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_timestamp(timestamp) @@ -291,7 +295,9 @@ def test_update_snapshot_version_different_timestamp_snapshot_version(self): def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 - timestamp = self.modify_metadata("timestamp", timestamp_version_modifier) + timestamp = self.modify_metadata( + "timestamp", timestamp_version_modifier + ) self.trusted_set.update_timestamp(timestamp) # if intermediate snapshot version is incorrect, load it but also raise @@ -302,9 +308,9 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_targets(self.metadata["targets"]) - def test_update_snapshot_file_removed_from_meta(self): self._update_all_besides_targets(self.metadata["timestamp"]) + def remove_file_from_meta(snapshot: Snapshot) -> None: del snapshot.meta["targets.json"] @@ -327,6 +333,7 @@ def version_meta_modifier(snapshot: Snapshot) -> None: def test_update_snapshot_expired_new_snapshot(self): self.trusted_set.update_timestamp(self.metadata["timestamp"]) + def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) @@ -406,6 +413,6 @@ def target_expired_modifier(target: Targets) -> None: # TODO test updating over initial metadata (new keys, newer timestamp, etc) -if __name__ == '__main__': - utils.configure_test_logging(sys.argv) - unittest.main() +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index f7037cc1bd..53ba2e84a3 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -22,7 +22,7 @@ class TestUpdater(unittest.TestCase): # set dump_dir to trigger repository state dumps - dump_dir:Optional[str] = None + dump_dir: Optional[str] = None def setUp(self): self.temp_dir = tempfile.TemporaryDirectory() @@ -34,12 +34,14 @@ def setUp(self): # Setup the repository, bootstrap client root.json self.sim = RepositorySimulator() with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: - root = self.sim.download_bytes("https://example.com/metadata/1.root.json", 100000) + root = self.sim.download_bytes( + "https://example.com/metadata/1.root.json", 100000 + ) f.write(root) if self.dump_dir is not None: # create test specific dump directory - name = self.id().split('.')[-1] + name = self.id().split(".")[-1] self.sim.dump_dir = os.path.join(self.dump_dir, name) os.mkdir(self.sim.dump_dir) @@ -54,7 +56,7 @@ def _run_refresh(self) -> Updater: self.metadata_dir, "https://example.com/metadata/", "https://example.com/targets/", - self.sim + self.sim, ) updater.refresh() return updater @@ -83,7 +85,11 @@ def test_refresh(self): targets: utils.DataSet = { "standard case": ("targetpath", b"content", "targetpath"), "non-asci case": ("åäö", b"more content", "%C3%A5%C3%A4%C3%B6"), - "subdirectory case": ("a/b/c/targetpath", b"dir target content", "a%2Fb%2Fc%2Ftargetpath"), + "subdirectory case": ( + "a/b/c/targetpath", + b"dir target content", + "a%2Fb%2Fc%2Ftargetpath", + ), } @utils.run_sub_tests_with_dataset(targets) @@ -103,8 +109,7 @@ def test_targets(self, test_case_data: Tuple[str, bytes, str]): file_info = updater.get_one_valid_targetinfo(targetpath) self.assertIsNotNone(file_info) self.assertEqual( - updater.updated_targets([file_info], self.targets_dir), - [file_info] + updater.updated_targets([file_info], self.targets_dir), [file_info] ) # Assert consistent_snapshot is True and downloaded targets have prefix. @@ -123,8 +128,6 @@ def test_targets(self, test_case_data: Tuple[str, bytes, str]): encoded_absolute_path = os.path.join(self.targets_dir, encoded_path) self.assertEqual(local_path, encoded_absolute_path) - - def test_keys_and_signatures(self): """Example of the two trickiest test areas: keys and root updates""" From 8dd6d8c8a531b1463a2842533a54f5af0ab5079b Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 12 Oct 2021 16:46:52 +0300 Subject: [PATCH 3/3] Rename & simplify a couple of tests in test_api.py Signed-off-by: Martin Vrachev --- tests/test_api.py | 77 +++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 43 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index ff7b60a498..73e270ac3c 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -97,21 +97,17 @@ def test_generic_read(self): # Load JSON-formatted metdata of each supported type from file # and from out-of-band read JSON string path = os.path.join(self.repo_dir, "metadata", metadata + ".json") - metadata_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(path) with open(path, "rb") as f: - metadata_obj2 = Metadata.from_bytes(f.read()) + md_obj2 = Metadata.from_bytes(f.read()) # Assert that both methods instantiate the right inner class for # each metadata type and ... - self.assertTrue(isinstance(metadata_obj.signed, inner_metadata_cls)) - self.assertTrue( - isinstance(metadata_obj2.signed, inner_metadata_cls) - ) + self.assertTrue(isinstance(md_obj.signed, inner_metadata_cls)) + self.assertTrue(isinstance(md_obj2.signed, inner_metadata_cls)) # ... and return the same object (compared by dict representation) - self.assertDictEqual( - metadata_obj.to_dict(), metadata_obj2.to_dict() - ) + self.assertDictEqual(md_obj.to_dict(), md_obj2.to_dict()) # Assert that it chokes correctly on an unknown metadata type bad_metadata_path = "bad-metadata.json" @@ -129,24 +125,21 @@ def test_generic_read(self): def test_compact_json(self): path = os.path.join(self.repo_dir, "metadata", "targets.json") - metadata_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(path) self.assertTrue( - len(JSONSerializer(compact=True).serialize(metadata_obj)) - < len(JSONSerializer().serialize(metadata_obj)) + len(JSONSerializer(compact=True).serialize(md_obj)) + < len(JSONSerializer().serialize(md_obj)) ) def test_read_write_read_compare(self): for metadata in ["root", "snapshot", "timestamp", "targets"]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") - metadata_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(path) path_2 = path + ".tmp" - metadata_obj.to_file(path_2) - metadata_obj_2 = Metadata.from_file(path_2) - - self.assertDictEqual( - metadata_obj.to_dict(), metadata_obj_2.to_dict() - ) + md_obj.to_file(path_2) + md_obj_2 = Metadata.from_file(path_2) + self.assertDictEqual(md_obj.to_dict(), md_obj_2.to_dict()) os.remove(path_2) @@ -155,17 +148,15 @@ def test_to_from_bytes(self): path = os.path.join(self.repo_dir, "metadata", metadata + ".json") with open(path, "rb") as f: metadata_bytes = f.read() - metadata_obj = Metadata.from_bytes(metadata_bytes) + md_obj = Metadata.from_bytes(metadata_bytes) # Comparate that from_bytes/to_bytes doesn't change the content # for two cases for the serializer: noncompact and compact. # Case 1: test noncompact by overriding the default serializer. - self.assertEqual( - metadata_obj.to_bytes(JSONSerializer()), metadata_bytes - ) + self.assertEqual(md_obj.to_bytes(JSONSerializer()), metadata_bytes) # Case 2: test compact by using the default serializer. - obj_bytes = metadata_obj.to_bytes() + obj_bytes = md_obj.to_bytes() metadata_obj_2 = Metadata.from_bytes(obj_bytes) self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes) @@ -183,66 +174,66 @@ def test_sign_verify(self): # Load sample metadata (targets) and assert ... path = os.path.join(self.repo_dir, "metadata", "targets.json") - metadata_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(path) # ... it has a single existing signature, - self.assertEqual(len(metadata_obj.signatures), 1) + self.assertEqual(len(md_obj.signatures), 1) # ... which is valid for the correct key. - targets_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) with self.assertRaises(exceptions.UnsignedMetadataError): - snapshot_key.verify_signature(metadata_obj) + snapshot_key.verify_signature(md_obj) # Test verifying with explicitly set serializer - targets_key.verify_signature(metadata_obj, CanonicalJSONSerializer()) + targets_key.verify_signature(md_obj, CanonicalJSONSerializer()) with self.assertRaises(exceptions.UnsignedMetadataError): - targets_key.verify_signature(metadata_obj, JSONSerializer()) + targets_key.verify_signature(md_obj, JSONSerializer()) sslib_signer = SSlibSigner(self.keystore["snapshot"]) # Append a new signature with the unrelated key and assert that ... - sig = metadata_obj.sign(sslib_signer, append=True) + sig = md_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and - self.assertEqual(len(metadata_obj.signatures), 2) + self.assertEqual(len(md_obj.signatures), 2) # ... both are valid for the corresponding keys. - targets_key.verify_signature(metadata_obj) - snapshot_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) + snapshot_key.verify_signature(md_obj) # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) sslib_signer = SSlibSigner(self.keystore["timestamp"]) # Create and assign (don't append) a new signature and assert that ... - metadata_obj.sign(sslib_signer, append=False) + md_obj.sign(sslib_signer, append=False) # ... there now is only one signature, - self.assertEqual(len(metadata_obj.signatures), 1) + self.assertEqual(len(md_obj.signatures), 1) # ... valid for that key. - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) with self.assertRaises(exceptions.UnsignedMetadataError): - targets_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) # Test failure on unknown scheme (securesystemslib UnsupportedAlgorithmError) scheme = timestamp_key.scheme timestamp_key.scheme = "foo" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) timestamp_key.scheme = scheme # Test failure on broken public key data (securesystemslib CryptoError) public = timestamp_key.keyval["public"] timestamp_key.keyval["public"] = "ffff" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) timestamp_key.keyval["public"] = public # Test failure with invalid signature (securesystemslib FormatError) - sig = metadata_obj.signatures[timestamp_keyid] + sig = md_obj.signatures[timestamp_keyid] correct_sig = sig.signature sig.signature = "foo" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) # Test failure with valid but incorrect signature sig.signature = "ff" * 64 with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) sig.signature = correct_sig def test_metadata_base(self):