Skip to content

Commit

Permalink
MetadataBundle: Fix loads of linting issues
Browse files Browse the repository at this point in the history
Lots of fixes, mostly obvious ones. The trickier ones and pylint
disables have comments added to explain them.

Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
  • Loading branch information
Jussi Kukkonen committed May 6, 2021
1 parent c580da3 commit e839931
Showing 1 changed file with 59 additions and 36 deletions.
95 changes: 59 additions & 36 deletions tuf/client_rework/metadata_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,14 @@
TODO:
* exceptions are all over the place: the idea is that client could just handle a
generic RepositoryError that covers every issue that server provided metadata
could inflict (other errors would be user errors), but this is not yet the case
* exceptions are all over the place: the idea is that client could just handle
a generic RepositoryError that covers every issue that server provided
metadata could inflict (other errors would be user errors), but this is not
yet the case
* usefulness of root_update_finished() can be debated: it could be done
in the beginning of load_timestamp()...
* there are some divergences from spec:
* 5.3.11: timestamp and snapshot are not deleted right away (only on next load):
the load functions will refuse to load the files when they are not signed by
current root keys. Deleting at the specified point is possible but means additional
code with some quirks..
* in general local metadata files are not deleted (they just won't succesfully load)
* there are some divergences from spec: in general local metadata files are
not deleted (they just won't succesfully load)
* a bit of repetition
* No tests!
* Naming maybe not final?
Expand All @@ -80,10 +77,10 @@
(not sure yet how: maybe a spec_logger that logs specification events?)
"""

from collections import abc
from datetime import datetime
import logging
import os
from collections import abc
from datetime import datetime
from typing import Dict, Iterator, Optional

from securesystemslib import hash as sslib_hash
Expand All @@ -93,11 +90,21 @@
from tuf.api.metadata import Metadata
from tuf.api.serialization import SerializationError

# TODO: Either enaable old-style logging in pylintc (issue #1334)
# or change this file to use f-strings for logging
# pylint: disable=logging-too-many-args

# TODO: signed._type really does not work issue #1375:
# pylint: disable=protected-access

logger = logging.getLogger(__name__)

# This is a placeholder until ...
# TODO issue 1306: implement this in Metadata API
def verify_with_threshold(delegator: Metadata, role_name: str, unverified: Metadata) -> bool:
def verify_with_threshold(
delegator: Metadata, role_name: str, unverified: Metadata
) -> bool:
"""Verify 'unverified' with keys and treshold defined in delegator"""
if delegator.signed._type == "root":
keys = delegator.signed.keys
role = delegator.signed.roles.get(role_name)
Expand All @@ -121,13 +128,20 @@ def verify_with_threshold(delegator: Metadata, role_name: str, unverified: Metad
try:
if unverified.verify(key):
unique_keys.add(key["keyval"]["public"])
except: # TODO specify the Exceptions
pass
except Exception as e: # pylint: disable=broad-except
# TODO specify the Exceptions (see issue #1351)
logger.info("verify failed: %s", e)

return len(unique_keys) >= role["threshold"]


class MetadataBundle(abc.Mapping):
"""Internal class to keep track of valid metadata in Updater
MetadataBundle ensures that metadata is valid. It provides easy ways to
update the metadata with the caller making decisions on what is updated.
"""

def __init__(self, repository_path: str):
"""Initialize by loading root metadata from disk"""
self._path = repository_path
Expand All @@ -141,7 +155,9 @@ def __init__(self, repository_path: str):
with open(os.path.join(self._path, "root.json"), "rb") as f:
self._load_intermediate_root(f.read())
except (OSError, exceptions.RepositoryError) as e:
raise exceptions.RepositoryError("Failed to load local root metadata")
raise exceptions.RepositoryError(
"Failed to load local root metadata"
) from e

# Implement Mapping
def __getitem__(self, key: str) -> Metadata:
Expand Down Expand Up @@ -242,7 +258,9 @@ def update_targets(self, data: bytes):
"""Update targets metadata with data from remote repository."""
self.update_delegated_targets(data, "targets", "root")

def load_local_delegated_targets(self, role_name: str, delegator_name: str) -> bool:
def load_local_delegated_targets(
self, role_name: str, delegator_name: str
) -> bool:
"""Load cached metadata for 'role_name' from local storage.
Metadata for 'delegator_name' must be loaded already.
Expand All @@ -256,13 +274,17 @@ def load_local_delegated_targets(self, role_name: str, delegator_name: str) -> b

try:
with open(os.path.join(self._path, f"{role_name}.json"), "rb") as f:
self._load_delegated_targets(f.read(), role_name, delegator_name)
self._load_delegated_targets(
f.read(), role_name, delegator_name
)
return True
except (OSError, exceptions.RepositoryError) as e:
logger.debug("Failed to load local %s: %s", role_name, e)
return False

def update_delegated_targets(self, data: bytes, role_name: str, delegator_name: str = None):
def update_delegated_targets(
self, data: bytes, role_name: str, delegator_name: str = None
):
"""Update 'rolename' metadata with data from remote repository.
Metadata for 'delegator_name' must be loaded already."""
Expand Down Expand Up @@ -296,7 +318,6 @@ def _load_intermediate_root(self, data: bytes):
)

if new_root.signed.version != self.root.signed.version + 1:
# TODO not a "Replayed Metadata attack": the version is just not what we expected
raise exceptions.ReplayedMetadataError(
"root", new_root.signed.version, self.root.signed.version
)
Expand Down Expand Up @@ -358,7 +379,8 @@ def _load_timestamp(self, data: bytes):
self._bundle["timestamp"] = new_timestamp
logger.debug("Loaded timestamp")

def _load_snapshot(self, data: bytes):
# TODO: remove pylint disable once the hash verification is in metadata.py
def _load_snapshot(self, data: bytes): # pylint: disable=too-many-branches
"""Verifies and loads 'data' as new snapshot metadata."""

if self.timestamp is None:
Expand All @@ -370,12 +392,12 @@ def _load_snapshot(self, data: bytes):

# Verify against the hashes in timestamp, if any
hashes = meta.get("hashes") or {}
for algo, _hash in hashes.items():
for algo, stored_hash in hashes.items():
digest_object = sslib_hash.digest(algo)
digest_object.update(data)
observed_hash = digest_object.hexdigest()
if observed_hash != _hash:
raise exceptions.BadHashError(_hash, observed_hash)
if observed_hash != stored_hash:
raise exceptions.BadHashError(stored_hash, observed_hash)

try:
new_snapshot = Metadata.from_bytes(data)
Expand Down Expand Up @@ -425,19 +447,19 @@ def _load_snapshot(self, data: bytes):
self._bundle["snapshot"] = new_snapshot
logger.debug("Loaded snapshot")

def _load_delegated_targets(self, data: bytes, role_name: str, delegator_name: str):
"""Verifies and loads 'data' as new metadata for delegated target 'role_name'.
def _load_delegated_targets(
self, data: bytes, role_name: str, delegator_name: str
):
"""Verifies and loads 'data' as new metadata for target 'role_name'.
Raises if verification fails
"""
if self.snapshot is None:
raise ValueError("Cannot load targets before snapshot")

delegator = self.get(delegator_name)
if delegator == None:
raise exceptions.ValueError(
"Cannot load delegated target before delegator"
)
if delegator is None:
raise ValueError("Cannot load targets before delegator")

# Verify against the hashes in snapshot, if any
meta = self.snapshot.signed.meta.get(f"{role_name}.json")
Expand All @@ -447,12 +469,12 @@ def _load_delegated_targets(self, data: bytes, role_name: str, delegator_name: s
)

hashes = meta.get("hashes") or {}
for algo, _hash in hashes.items():
for algo, stored_hash in hashes.items():
digest_object = sslib_hash.digest(algo)
digest_object.update(data)
observed_hash = digest_object.hexdigest()
if observed_hash != _hash:
raise exceptions.BadHashError(_hash, observed_hash)
if observed_hash != stored_hash:
raise exceptions.BadHashError(stored_hash, observed_hash)

try:
new_delegate = Metadata.from_bytes(data)
Expand All @@ -466,17 +488,18 @@ def _load_delegated_targets(self, data: bytes, role_name: str, delegator_name: s

if not verify_with_threshold(delegator, role_name, new_delegate):
raise exceptions.UnsignedMetadataError(
f"New {role_name} is not signed by {delegator_name}"
f"New {role_name} is not signed by {delegator_name}",
new_delegate,
)

if new_delegate.signed.version != meta["version"]:
raise exceptions.BadVersionNumberError(
f"Expected {role_name} version"
f"{meta['version']}, got {new_delegate.signed.version}"
f"Expected {role_name} version"
f"{meta['version']}, got {new_delegate.signed.version}"
)

if new_delegate.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError(f"New {role_name} is expired")

self._bundle[role_name] = new_delegate
logger.debug(f"Loaded {role_name} delegated by {delegator_name}")
logger.debug("Loaded %s delegated by %s", role_name, delegator_name)

0 comments on commit e839931

Please sign in to comment.