-
Notifications
You must be signed in to change notification settings - Fork 266
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
399 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2021, New York University and the TUF contributors | ||
# SPDX-License-Identifier: MIT OR Apache-2.0 | ||
|
||
""""Test utility to simulate a repository | ||
RepositorySimulator provides methods to modify repository metadata so that it's | ||
easy to "publish" new repository versions with modified metadata, while serving | ||
the versions to client test code. | ||
RepositorySimulator implements FetcherInterface so Updaters in tests can use it | ||
as a way to "download" new metadata from remote: in practice no downloading, | ||
network connections or even file access happens as RepositorySimulator serves | ||
everything from memory. | ||
""" | ||
|
||
from collections import OrderedDict | ||
from datetime import datetime, timedelta | ||
import logging | ||
import os | ||
import tempfile | ||
from securesystemslib.keys import generate_ed25519_key | ||
from securesystemslib.signer import SSlibSigner | ||
from typing import Dict, Iterator, List, Optional, Tuple | ||
from urllib import parse | ||
|
||
from tuf.api.serialization.json import JSONSerializer | ||
from tuf.exceptions import FetcherHTTPError | ||
from tuf.api.metadata import( | ||
Key, | ||
Metadata, | ||
MetaFile, | ||
Role, | ||
Root, | ||
SPECIFICATION_VERSION, | ||
Snapshot, | ||
Targets, | ||
Timestamp | ||
) | ||
from tuf.ngclient.fetcher import FetcherInterface | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
SPEC_VER = ".".join(SPECIFICATION_VERSION) | ||
|
||
class RepositorySimulator(FetcherInterface): | ||
def __init__(self): | ||
self.md_root: Metadata[Root] = None | ||
self.md_timestamp: Metadata[Timestamp] = None | ||
self.md_snapshot: Metadata[Snapshot] = None | ||
self.md_targets: Metadata[Targets] = None | ||
self.md_delegates: Dict[str, Metadata[Targets]] = {} | ||
|
||
# other metadata is signed on-demand (when fetched) but roots must be | ||
# explicitly published with publish_root() which maintains this list | ||
self.signed_roots: List[bytes] = [] | ||
|
||
# signers are used on-demand at fetch time to sign metadata | ||
self.signers: Dict[str, List[SSlibSigner]] = {} | ||
|
||
self.dump_dir = None | ||
self.dump_version = 0 | ||
|
||
self._initialize() | ||
|
||
@property | ||
def root(self) -> Root: | ||
return self.md_root.signed | ||
|
||
@property | ||
def timestamp(self) -> Timestamp: | ||
return self.md_timestamp.signed | ||
|
||
@property | ||
def snapshot(self) -> Snapshot: | ||
return self.md_snapshot.signed | ||
|
||
@property | ||
def targets(self) -> Targets: | ||
return self.md_targets.signed | ||
|
||
def delegates(self) -> Iterator[Tuple[str, Targets]]: | ||
for role, md in self.md_delegates.items(): | ||
yield role, md.signed | ||
|
||
def create_key(self) -> Tuple[Key, SSlibSigner]: | ||
sslib_key = generate_ed25519_key() | ||
return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) | ||
|
||
def _initialize(self): | ||
"""Setup a minimal valid repository""" | ||
expiry = datetime.utcnow().replace(microsecond=0) + timedelta(days=30) | ||
|
||
targets = Targets(1, SPEC_VER, expiry, {}, None) | ||
self.md_targets = Metadata(targets, OrderedDict()) | ||
|
||
meta = {"targets.json": MetaFile(targets.version)} | ||
snapshot = Snapshot(1, SPEC_VER, expiry, meta) | ||
self.md_snapshot = Metadata(snapshot, OrderedDict()) | ||
|
||
meta = {"snapshot.json": MetaFile(snapshot.version)} | ||
timestamp = Timestamp(1, SPEC_VER, expiry, meta) | ||
self.md_timestamp = Metadata(timestamp, OrderedDict()) | ||
|
||
root = Root(1, SPEC_VER, expiry, {}, {}, True) | ||
for role in ["root", "timestamp", "snapshot", "targets"]: | ||
key, signer = self.create_key() | ||
root.roles[role] = Role([], 1) | ||
root.add_key(role, key) | ||
# store the private key | ||
if role not in self.signers: | ||
self.signers[role] = [] | ||
self.signers[role].append(signer) | ||
self.md_root = Metadata(root, OrderedDict()) | ||
self.publish_root() | ||
|
||
def publish_root(self): | ||
"""Sign and store a new serialized version of root""" | ||
self.md_root.signatures.clear() | ||
for signer in self.signers["root"]: | ||
self.md_root.sign(signer) | ||
|
||
self.signed_roots.append(self.md_root.to_bytes(JSONSerializer())) | ||
logger.debug("Published root v%d", self.root.version) | ||
|
||
def fetch(self, url: str) -> Iterator[bytes]: | ||
spliturl = parse.urlparse(url) | ||
if spliturl.path.startswith("/metadata/"): | ||
parts = spliturl.path[len("/metadata/"):].split(".") | ||
if len(parts) == 3: | ||
version = int(parts[0]) | ||
role = parts[1] | ||
else: | ||
version = None | ||
role = parts[0] | ||
yield self._fetch_metadata (role, version) | ||
else: | ||
raise FetcherHTTPError(f"Unknown path '{spliturl.path}'", 404) | ||
|
||
def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: | ||
if role == "root": | ||
# return a version previously serialized in publish_root() | ||
if version > len(self.signed_roots): | ||
raise FetcherHTTPError(f"Unknown root version {version}", 404) | ||
logger.debug("fetched root version %d", role, version) | ||
return self.signed_roots[version - 1] | ||
else: | ||
# sign and serialize the requested metadata | ||
if role == "timestamp": | ||
md = self.md_timestamp | ||
elif role == "snapshot": | ||
md = self.md_snapshot | ||
elif role == "targets": | ||
md = self.md_targets | ||
else: | ||
md = self.md_delegates.get(role) | ||
|
||
if md is None: | ||
raise FetcherHTTPError(f"Unknown role {role}", 404) | ||
if version is not None and version != md.signed.version: | ||
raise FetcherHTTPError(f"Unknown {role} version {version}", 404) | ||
|
||
md.signatures.clear() | ||
for signer in self.signers[role]: | ||
md.sign(signer,append=True) | ||
|
||
logger.debug( | ||
"fetched %s v%d with %d sigs", | ||
role, | ||
md.signed.version, | ||
len(self.signers[role])) | ||
return md.to_bytes(JSONSerializer()) | ||
|
||
def update_timestamp(self): | ||
self.timestamp.meta["snapshot.json"].version = self.snapshot.version | ||
|
||
self.timestamp.version += 1 | ||
|
||
def update_snapshot(self): | ||
self.snapshot.meta["targets.json"].version = self.targets.version | ||
for role, delegate in self.delegates(): | ||
self.snapshot.meta[f"{role}.json"].version = delegate.version | ||
|
||
self.snapshot.version += 1 | ||
self.update_timestamp() | ||
|
||
def write(self): | ||
"""Dump current repository metadata to self.dump_dir | ||
This is a debugging tool: dumping repository state before running | ||
Updater refresh may be useful while debugging a test. | ||
""" | ||
if self.dump_dir is None: | ||
self.dump_dir = tempfile.mkdtemp() | ||
print(f"Repository Simulator dumps in {self.dump_dir}") | ||
|
||
self.dump_version += 1 | ||
dir = os.path.join(self.dump_dir, str(self.dump_version)) | ||
os.makedirs(dir) | ||
|
||
for ver in range(1, len(self.signed_roots) + 1): | ||
with open(os.path.join(dir, f"{ver}.root.json"), "wb") as f: | ||
f.write(self._fetch_metadata("root", ver)) | ||
|
||
for role in ["timestamp", "snapshot", "targets"]: | ||
with open(os.path.join(dir, f"{role}.json"), "wb") as f: | ||
f.write(self._fetch_metadata(role)) | ||
|
||
for role in self.md_delegates.keys(): | ||
with open(os.path.join(dir, f"{role}.json"), "wb") as f: | ||
f.write(self._fetch_metadata(role)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2021, New York University and the TUF contributors | ||
# SPDX-License-Identifier: MIT OR Apache-2.0 | ||
|
||
"""Test ngclient Updater using the repository simulator | ||
""" | ||
|
||
import logging | ||
import os | ||
import sys | ||
import tempfile | ||
from tuf.exceptions import UnsignedMetadataError | ||
import unittest | ||
|
||
from tuf.ngclient import Updater | ||
|
||
from tests import utils | ||
from tests.repository_simulator import RepositorySimulator | ||
|
||
class TestUpdater(unittest.TestCase): | ||
# set dump_dir to trigger repository state dumps | ||
dump_dir:str = None | ||
|
||
def setUp(self): | ||
self.client_dir = tempfile.TemporaryDirectory() | ||
|
||
# Setup the repository, bootstrap client root.json | ||
self.sim = RepositorySimulator() | ||
with open(os.path.join(self.client_dir.name, "root.json"), "bw") as f: | ||
root = self.sim.download_bytes("https://example.com/metadata/1.root.json", 100000) | ||
f.write(root) | ||
|
||
if self.dump_dir is not None: | ||
# create test specific dump directory | ||
name = self.id().split('.')[-1] | ||
self.sim.dump_dir = os.path.join(self.dump_dir, name) | ||
os.mkdir(self.sim.dump_dir) | ||
|
||
def _run_refresh(self): | ||
if self.sim.dump_dir is not None: | ||
self.sim.write() | ||
|
||
updater = Updater( | ||
self.client_dir.name, | ||
"https://example.com/metadata/", | ||
"https://example.com/targets/", | ||
self.sim | ||
) | ||
updater.refresh() | ||
|
||
def test_refresh(self): | ||
# TODO: how do we ensure updater does what we expected it to? | ||
# TODO: how do we ensure updater state is correct after update()? | ||
|
||
# Update top level metadata | ||
self._run_refresh() | ||
|
||
# New root (root needs to be explicitly signed) | ||
self.sim.root.version += 1 | ||
self.sim.publish_root() | ||
|
||
self._run_refresh() | ||
|
||
# New timestamp | ||
self.sim.update_timestamp() | ||
|
||
self._run_refresh() | ||
|
||
# New targets, snapshot, timestamp version | ||
self.sim.targets.version += 1 | ||
self.sim.update_snapshot() | ||
|
||
self._run_refresh() | ||
|
||
def test_keys_and_signatures(self): | ||
"""Example of the two trickiest test areas: keys and root updates""" | ||
|
||
# Update top level metadata | ||
self._run_refresh() | ||
|
||
# New targets: signed with a new key that is not in roles keys | ||
old_signer = self.sim.signers["targets"].pop() | ||
key, signer = self.sim.create_key() | ||
self.sim.signers["targets"] = [signer] | ||
self.sim.targets.version += 1 | ||
self.sim.update_snapshot() | ||
|
||
with self.assertRaises(UnsignedMetadataError): | ||
self._run_refresh() | ||
|
||
# New root: Add the new key as targets role key | ||
# (root changes require explicit publishing) | ||
self.sim.root.add_key("targets", key) | ||
self.sim.root.version += 1 | ||
self.sim.publish_root() | ||
|
||
self._run_refresh() | ||
|
||
# New root: Raise targets threshold to 2 | ||
self.sim.root.roles["targets"].threshold = 2 | ||
self.sim.root.version += 1 | ||
self.sim.publish_root() | ||
|
||
with self.assertRaises(UnsignedMetadataError): | ||
self._run_refresh() | ||
|
||
# New targets: sign with both new and old key | ||
self.sim.signers["targets"] = [signer, old_signer] | ||
self.sim.targets.version += 1 | ||
self.sim.update_snapshot() | ||
|
||
self._run_refresh() | ||
|
||
def tearDown(self): | ||
self.client_dir.cleanup() | ||
|
||
if __name__ == "__main__": | ||
if "--dump" in sys.argv: | ||
TestUpdater.dump_dir = tempfile.mkdtemp() | ||
print(f"Repository Simulator dumps in {TestUpdater.dump_dir}") | ||
sys.argv.remove("--dump") | ||
|
||
utils.configure_test_logging(sys.argv) | ||
unittest.main() |
Oops, something went wrong.