Skip to content

Commit

Permalink
MetadataBundle: Require load_local_metadata()
Browse files Browse the repository at this point in the history
Change the rules a bit: require calling load_local_metadata() before
update_metadata() is legal. This removes the need to run
load_local_metadata() "just to be sure" in update_metadata()

Improve comments

Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
  • Loading branch information
Jussi Kukkonen committed Apr 22, 2021
1 parent f081a48 commit 057649c
Showing 1 changed file with 55 additions and 21 deletions.
76 changes: 55 additions & 21 deletions tuf/client_rework/metadata_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,23 @@
care of persisting valid metadata on disk, loading local metadata from disk
and deleting invalid local metadata.
New metadata (downloaded from a remote repository) can be loaded using
'update_metadata()'. The type of accepted metadata depends on bundle state:
* Metadata is loadable only if all metadata it depends on is loaded
* Metadata is immutable if any metadata depending on it has been loaded
Loaded metadata can be accessed via the index access with rolename as key
or, in the case of top-level metadata using the helper properties like
'MetadataBundle.root'
Metadata can be loaded into bundle by two means:
* loading from local storage: load_local_metadata()
(and, in the case of root metadata, the constuctor)
* updating from remote repository: update_metadata()
The rules for top-level metadata are
* Metadata is loadable only if metadata it depends on is loaded
* Metadata is immutable if any metadata depending on it has been loaded
* Loading from local storage must be attempted before updating from remote
* Updating from remote is never required
Exceptions are raised if metadata fails to load in any way (except in the
case of local loads -- see locad_local_metadata()).
case of local loads -- see load_local_metadata()).
Example (with hypothetical download function):
Expand Down Expand Up @@ -52,14 +62,24 @@
TODO:
* Delegated targets not implement yet
* Delegated targets are implemented but they are not covered
by same immutability guarantees: the top-level metadata is handled
by hard-coded rules (can't update root if snapshot is loaded)
but delegations would require storing the delegation tree ...
* exceptions are all over the place and not thought out at all
* usefulness of root_update_finished() can be debated: it could be done
in the beginning of _load_timestamp()...
* there are some divergences from spec:
* 5.3.11: timestamp and snapshot are not deleted right away (only on next load):
the load functions will refuse to load the files when they are not signed by
current root keys. Deleting at the specified point is possible but means additional
code with some quirks..
* a bit of repetition
* No tests!
* Naming maybe not final?
* some metadata interactions might work better in Metadata itself
* Progress through Specification update process should be documented
(not sure yet how)
(not sure yet how: maybe a spec_logger that logs specification events?)
"""

from collections import abc
Expand All @@ -79,16 +99,16 @@
# This is a placeholder until ...
# TODO issue 1306: implement this in Metadata API
def verify_with_threshold(delegator: Metadata, role_name: str, unverified: Metadata):
if delegator.signed._type == 'root':
if delegator.signed._type == "root":
keys = delegator.signed.keys
role = delegator.signed.roles.get(role_name)
elif delegator.signed._type == 'targets':
elif delegator.signed._type == "targets":
keys = delegator.signed.delegations["keys"]
# role names are unique: first match is enough
roles = delegator.signed.delegations["roles"]
role = next((role for role in roles if role["name"] == role_name), None)
else:
raise ValueError('Call is valid only on delegator metadata')
raise ValueError("Call is valid only on delegator metadata")

if role is None:
raise exceptions.UnknownRoleError
Expand All @@ -110,10 +130,10 @@ def verify_with_threshold(delegator: Metadata, role_name: str, unverified: Metad

class MetadataBundle(abc.Mapping):
def __init__(self, path: str):
"""Initialize by loading root metadata from disk
"""
"""Initialize by loading root metadata from disk"""
self._path = path
self._bundle = {} # type: Dict[str: Metadata]
self._local_load_attempted = {}
self.reference_time = None

if not os.path.exists(path):
Expand All @@ -127,12 +147,14 @@ def __init__(self, path: str):

def load_local_metadata(self, role_name: str, delegator_name: str = None) -> bool:
"""Loads metadata from local storage and inserts into bundle
If bundle already contains 'role_name', nothing is loaded.
Failure to read the file, failure to parse it and failure to
load it as valid metadata will not raise exceptions: the function
will just fail.
Raises if 'role_name' cannot be loaded from local storage at this state
Returns True if 'role_name' is now in the bundle
"""
if self.get(role_name) is not None:
Expand All @@ -142,16 +164,17 @@ def load_local_metadata(self, role_name: str, delegator_name: str = None) -> boo
logger.debug("Loading local %s.json", role_name)

self._raise_on_unsupported_state(role_name)
self._local_load_attempted[role_name] = True

try:
with open(os.path.join(self._path, f"{role_name}.json"), "rb") as f:
data = f.read()

if role_name == "root":
self._load_intermediate_root(data)
elif role_name == "timestamp":
elif role_name == "timestamp":
self._load_timestamp(data)
elif role_name == "snapshot":
elif role_name == "snapshot":
self._load_snapshot(data)
elif role_name == "targets":
self._load_targets(data)
Expand All @@ -166,20 +189,26 @@ def load_local_metadata(self, role_name: str, delegator_name: str = None) -> boo
return False

def update_metadata(self, data: bytes, role_name: str, delegator_name: str = None):
"""Takes new metadata (from remote repository) and loads it into bundle
Raises if 'role_name' cannot be update from remote at this state
Raises if 'data' cannot be parsed or validated
Raises if the new metadata cannot be verified by the bundle
"""
logger.debug("Updating %s", role_name)

self._raise_on_unsupported_state(role_name)

if not self._local_load_attempted.get(role_name):
raise exceptions.RepositoryError

if role_name == "root":
self.load_local_metadata("root")
self._load_intermediate_root(data)
self.root.to_file(os.path.join(self._path, "root.json"))
elif role_name == "timestamp":
self.load_local_metadata("timestamp")
self._load_timestamp(data)
self.timestamp.to_file(os.path.join(self._path, "timestamp.json"))
elif role_name == "snapshot":
self.load_local_metadata("snapshot")
self._load_snapshot(data)
self.snapshot.to_file(os.path.join(self._path, "snapshot.json"))
elif role_name == "targets":
Expand All @@ -190,6 +219,11 @@ def update_metadata(self, data: bytes, role_name: str, delegator_name: str = Non
self[role_name].to_file(os.path.join(self._path, f"{role_name}.json"))

def root_update_finished(self):
"""Marks root update as finished, validates the root metadata
Raises if root update is not a valid operation at this state
Raises if validation fails
"""
if self.timestamp is not None:
# bundle does not support this order of ops
raise exceptions.RepositoryError
Expand Down Expand Up @@ -223,7 +257,7 @@ def _raise_on_unsupported_state(self, role_name: str):
if len(self) > 4:
# delegates have been loaded already
raise exceptions.RepositoryError
else: # delegated role
else: # delegated role
if self.targets is None:
raise exceptions.RepositoryError

Expand Down Expand Up @@ -413,4 +447,4 @@ def _load_delegated_targets(self, data: bytes, role_name: str, delegator_name: s
raise exceptions.ExpiredMetadataError

self._bundle[role_name] = new_delegate
logger.debug("Loaded {role_name}")
logger.debug(f"Loaded {role_name}")

0 comments on commit 057649c

Please sign in to comment.