Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimental client: Add MetadataBundle #1355

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3b161bf
experimental client: Add MetadataBundle
Apr 20, 2021
5e1fe0d
MetadataBundle: Update outdated docstring
Apr 20, 2021
3f7c405
MetadataBundle: Modify state handling
Apr 20, 2021
6228454
MetadataBundle: implement delegates support
Apr 21, 2021
f503ebe
MetadataBundle: use Metadata.from_bytes()
Apr 21, 2021
c1afe57
MetadataBundle: Require load_local_metadata()
Apr 22, 2021
e7a0feb
Improve loading rules checks
Apr 22, 2021
cadbd84
Comment and variable name improvements
Apr 22, 2021
7e457ec
Exceptions refactoring
Apr 27, 2021
53f5ccb
MetadataBundle: Exception refactor
Apr 27, 2021
71793b6
Update MetadataBundle with named methods
May 4, 2021
5596f77
MetadataBundle: Handle targets like delegated targets
May 5, 2021
766f494
MetadataBundle: Avoid loading targets twice
May 5, 2021
9051358
MetadataBundle: fix import name
May 6, 2021
1d22d5a
MetadataBundle: Improve hints and docs
May 6, 2021
e26772c
Remove unnecessary directory check at startup
May 6, 2021
800b088
MetadataBundle: Fix loads of linting issues
May 6, 2021
b681788
Improve documentation
May 11, 2021
66fa37b
MetadataBundle: Update to API changes
May 14, 2021
0bbfe03
tests: Add minimal test case for Bundle
May 14, 2021
112b333
Metadata API: Fix DelegatedRole serialization issue
May 14, 2021
f8b714d
Metadata API: Don't do equality comparisons on containers
May 14, 2021
2d155fa
MetadataBundle: Change ValueErrors to RuntimeErrors
May 14, 2021
eb648d1
MetadataBundle: Save original files on disk
May 14, 2021
3b30d08
MetadataBundle: Store reference time earlier
May 14, 2021
8d0245a
MetadataBundle: Use type, not _type
May 14, 2021
876fda1
MetadataBundle: Add comments about the process
May 14, 2021
112f3b6
MetadataBundle: Handle Deserialization errors
May 14, 2021
b86d1f7
MetadataBundle: Raise instead of returning bool
May 14, 2021
a371258
MetadataBundle: Use builtin errors when possible
May 14, 2021
6b53ac7
Make BadHashError derive from RepositoryError
May 16, 2021
f2cff95
MetadataBundle: Don't do any file IO
May 16, 2021
377eac1
MetadataBundle: Improve docstrings
May 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,21 +450,23 @@ def test_delegated_role_class(self):
with self.assertRaises(ValueError):
DelegatedRole.from_dict(role.copy())

# Test creating DelegatedRole only with "path_hash_prefixes"
# Test creating DelegatedRole only with "path_hash_prefixes" (an empty one)
del role["paths"]
DelegatedRole.from_dict(role.copy())
role["paths"] = "foo"
role["path_hash_prefixes"] = []
role_obj = DelegatedRole.from_dict(role.copy())
self.assertEqual(role_obj.to_dict(), role)

# Test creating DelegatedRole only with "paths"
# Test creating DelegatedRole only with "paths" (now an empty one)
del role["path_hash_prefixes"]
DelegatedRole.from_dict(role.copy())
role["path_hash_prefixes"] = "foo"
role["paths"] = []
role_obj = DelegatedRole.from_dict(role.copy())
self.assertEqual(role_obj.to_dict(), role)

# Test creating DelegatedRole without "paths" and
# "path_hash_prefixes" set
del role["paths"]
del role["path_hash_prefixes"]
DelegatedRole.from_dict(role)
role_obj = DelegatedRole.from_dict(role.copy())
self.assertEqual(role_obj.to_dict(), role)


def test_delegation_class(self):
Expand Down Expand Up @@ -494,6 +496,21 @@ def test_delegation_class(self):
delegations = Delegations.from_dict(copy.deepcopy(delegations_dict))
self.assertEqual(delegations_dict, delegations.to_dict())

# empty keys and roles
delegations_dict = {"keys":{}, "roles":[]}
delegations = Delegations.from_dict(delegations_dict.copy())
self.assertEqual(delegations_dict, delegations.to_dict())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: assertDictEqual() gives more meaningful messages when this fails.


# Test some basic missing or broken input
invalid_delegations_dicts = [
{},
{"keys":None, "roles":None},
{"keys":{"foo":0}, "roles":[]},
{"keys":{}, "roles":["foo"]},
]
for d in invalid_delegations_dicts:
with self.assertRaises((KeyError, AttributeError)):
Delegations.from_dict(d)

def test_metadata_targets(self):
targets_path = os.path.join(
Expand Down
124 changes: 124 additions & 0 deletions tests/test_metadata_bundle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import json
import logging
import os
import shutil
import sys
import tempfile
import unittest

from tuf import exceptions
from tuf.api.metadata import Metadata
from tuf.client_rework.metadata_bundle import MetadataBundle

from tests import utils

logger = logging.getLogger(__name__)

class TestMetadataBundle(unittest.TestCase):

def test_update(self):
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')

with open(os.path.join(repo_dir, "root.json"), "rb") as f:
bundle = MetadataBundle(f.read())
bundle.root_update_finished()

with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f:
bundle.update_timestamp(f.read())
with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f:
bundle.update_snapshot(f.read())
with open(os.path.join(repo_dir, "targets.json"), "rb") as f:
bundle.update_targets(f.read())
with open(os.path.join(repo_dir, "role1.json"), "rb") as f:
bundle.update_delegated_targets(f.read(), "role1", "targets")
with open(os.path.join(repo_dir, "role2.json"), "rb") as f:
bundle.update_delegated_targets(f.read(), "role2", "role1")

def test_out_of_order_ops(self):
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
data={}
for md in ["root", "timestamp", "snapshot", "targets", "role1"]:
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
data[md] = f.read()

bundle = MetadataBundle(data["root"])

# Update timestamp before root is finished
with self.assertRaises(RuntimeError):
bundle.update_timestamp(data["timestamp"])

bundle.root_update_finished()
with self.assertRaises(RuntimeError):
bundle.root_update_finished()

# Update snapshot before timestamp
with self.assertRaises(RuntimeError):
bundle.update_snapshot(data["snapshot"])

bundle.update_timestamp(data["timestamp"])

# Update targets before snapshot
with self.assertRaises(RuntimeError):
bundle.update_targets(data["targets"])

bundle.update_snapshot(data["snapshot"])

#update timestamp after snapshot
with self.assertRaises(RuntimeError):
bundle.update_timestamp(data["timestamp"])

# Update delegated targets before targets
with self.assertRaises(RuntimeError):
bundle.update_delegated_targets(data["role1"], "role1", "targets")

bundle.update_targets(data["targets"])
bundle.update_delegated_targets(data["role1"], "role1", "targets")

def test_update_with_invalid_json(self):
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
data={}
for md in ["root", "timestamp", "snapshot", "targets", "role1"]:
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
data[md] = f.read()

# root.json not a json file at all
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(b"")
# root.json is invalid
root = Metadata.from_bytes(data["root"])
root.signed.version += 1
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(json.dumps(root.to_dict()).encode())

bundle = MetadataBundle(data["root"])
bundle.root_update_finished()

top_level_md = [
(data["timestamp"], bundle.update_timestamp),
(data["snapshot"], bundle.update_snapshot),
(data["targets"], bundle.update_targets),
]
for metadata, update_func in top_level_md:
# metadata is not json
with self.assertRaises(exceptions.RepositoryError):
update_func(b"")
# metadata is invalid
md = Metadata.from_bytes(metadata)
md.signed.version += 1
with self.assertRaises(exceptions.RepositoryError):
update_func(json.dumps(md.to_dict()).encode())

# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
update_func(data["root"])

update_func(metadata)


# TODO test updating over initial metadata (new keys, newer timestamp, etc)
# TODO test the actual specification checks


if __name__ == '__main__':
utils.configure_test_logging(sys.argv)
unittest.main()
17 changes: 10 additions & 7 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ def __init__(
super().__init__(keyids, threshold, unrecognized_fields)
self.name = name
self.terminating = terminating
if paths and path_hash_prefixes:
if paths is not None and path_hash_prefixes is not None:
raise ValueError(
"Only one of the attributes 'paths' and"
"'path_hash_prefixes' can be set!"
Expand Down Expand Up @@ -806,9 +806,9 @@ def to_dict(self) -> Dict[str, Any]:
"terminating": self.terminating,
**base_role_dict,
}
if self.paths:
if self.paths is not None:
res_dict["paths"] = self.paths
elif self.path_hash_prefixes:
elif self.path_hash_prefixes is not None:
res_dict["path_hash_prefixes"] = self.path_hash_prefixes
return res_dict

Expand Down Expand Up @@ -911,17 +911,20 @@ def from_dict(cls, targets_dict: Dict[str, Any]) -> "Targets":
"""Creates Targets object from its dict representation."""
common_args = cls._common_fields_from_dict(targets_dict)
targets = targets_dict.pop("targets")
delegations = targets_dict.pop("delegations", None)
if delegations:
delegations = Delegations.from_dict(delegations)
try:
delegations_dict = targets_dict.pop("delegations")
except KeyError:
delegations = None
else:
delegations = Delegations.from_dict(delegations_dict)
# All fields left in the targets_dict are unrecognized.
return cls(*common_args, targets, delegations, targets_dict)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
targets_dict = self._common_fields_to_dict()
targets_dict["targets"] = self.targets
if self.delegations:
if self.delegations is not None:
targets_dict["delegations"] = self.delegations.to_dict()
return targets_dict

Expand Down
Loading