Skip to content
Closed
41 changes: 40 additions & 1 deletion tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
Metadata,
MetaFile,
Root,
Rotate,
Snapshot,
SuccinctRoles,
TargetFile,
Expand Down Expand Up @@ -180,6 +181,7 @@ def _initialize(self) -> None:
self.md_snapshot = Metadata(Snapshot(expires=self.safe_expiry))
self.md_timestamp = Metadata(Timestamp(expires=self.safe_expiry))
self.md_root = Metadata(Root(expires=self.safe_expiry))
self.md_rotate: Dict[str, List[Metadata]] = {}

for role in TOP_LEVEL_ROLE_NAMES:
key, signer = self.create_key()
Expand All @@ -197,6 +199,33 @@ def publish_root(self) -> None:
self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
logger.debug("Published root v%d", self.root.version)

def add_rotate_file(
self,
rolename: str,
new_keys: Dict[str, Key],
new_threshold: int,
signers: Dict[str, SSlibSigner],
) -> None:
"""Add rotate file"""
if rolename in self.md_rotate:
rotate_version = len(self.md_rotate[rolename])
inner_rotate = Rotate(
rotate_version, rolename, new_keys, new_threshold
)
rotate_file = Metadata(inner_rotate)
self.md_rotate[rolename].append(rotate_file)
else:
rotate_version = 0
inner_rotate = Rotate(
rotate_version, rolename, new_keys, new_threshold
)
rotate_file = Metadata(inner_rotate)
self.md_rotate[rolename] = [rotate_file]

rotate_rolename = f"rotate/{rolename}.rotate.{rotate_version}"

self.signers[rotate_rolename] = signers

def _fetch(self, url: str) -> Iterator[bytes]:
"""Fetches data from the given url and returns an Iterator (or yields
bytes).
Expand All @@ -208,7 +237,9 @@ def _fetch(self, url: str) -> Iterator[bytes]:
version_str, _, role = ver_and_name.partition(".")
# root is always version-prefixed while timestamp is always NOT
if role == Root.type or (
self.root.consistent_snapshot and ver_and_name != Timestamp.type
self.root.consistent_snapshot
and ver_and_name != Timestamp.type
and not version_str.startswith("rotate")
):
version: Optional[int] = int(version_str)
else:
Expand Down Expand Up @@ -277,6 +308,14 @@ def fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes:
md = self.md_snapshot
elif role == Targets.type:
md = self.md_targets
elif role.startswith("rotate/"):
rotate_parts = role.split(".")
rotate_role = rotate_parts[0][len("rotate/") :]
rotate_version = rotate_parts[2]
try:
md = self.md_rotate[rotate_role][int(rotate_version)]
except (KeyError, IndexError) as e:
raise DownloadHTTPError(f"Unknown role {role}", 404) from e
else:
md = self.md_delegates.get(role)

Expand Down
17 changes: 17 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
Key,
Metadata,
Root,
Rotate,
Snapshot,
SuccinctRoles,
TargetFile,
Expand Down Expand Up @@ -780,6 +781,22 @@ def test_get_roles_in_succinct_roles(self) -> None:
expected_bin_suffix = f"{bin_numer:0{expected_suffix_length}x}"
self.assertEqual(role_name, f"bin-{expected_bin_suffix}")

def test_rotate_generate(self) -> None:
# create a valid rotate file
rotate = Rotate(0, "timestamp", {}, 1)

# version automatically set to 0
rotate = Rotate(None, "timestamp", {}, 1)
self.assertEqual(rotate.version, 0)

# version must not be negative
with self.assertRaises(ValueError):
rotate = Rotate(-1, "timestamp", {}, 1)

# rotate file needs a role
with self.assertRaises(ValueError):
rotate = Rotate(0, None, {}, 1)


# Run unit test.
if __name__ == "__main__":
Expand Down
5 changes: 5 additions & 0 deletions tests/test_metadata_eq_.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Metadata,
MetaFile,
Role,
Rotate,
SuccinctRoles,
TargetFile,
)
Expand Down Expand Up @@ -63,6 +64,9 @@ def setUpClass(cls) -> None:
cls.objects["TargetFile"] = TargetFile(
1, {"sha256": "abc"}, "file1.txt"
)
cls.objects["Rotate"] = Rotate(
0, "timestamp", {"keyid": cls.objects["Key"]}, 1
)

# Keys are class names.
# Values are dictionaries containing attribute names and their new values.
Expand All @@ -85,6 +89,7 @@ def setUpClass(cls) -> None:
"Delegations": {"keys": {}, "roles": {}},
"TargetFile": {"length": 0, "hashes": {}, "path": ""},
"Targets": {"targets": {}, "delegations": []},
"Rotate": {"version": 1, "role": "", "keys": {}, "threshold": 0},
}

@utils.run_sub_tests_with_dataset(classes_attributes_modifications)
Expand Down
108 changes: 108 additions & 0 deletions tests/test_rotate_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python

# Copyright 2023, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0

""" Test ngclient handling of rotate files"""

import os
import sys
import tempfile
import unittest
from typing import ClassVar, List, Optional

from securesystemslib.signer import SSlibSigner

from tests import utils
from tests.repository_simulator import RepositorySimulator
from tuf.api import exceptions
from tuf.api.metadata import Key
from tuf.ngclient import Updater


class TestRotateFiles(unittest.TestCase):
"""Test ngclient handling of rotate files"""

# set dump_dir to trigger repository state dumps
dump_dir: Optional[str] = None
temp_dir: ClassVar[tempfile.TemporaryDirectory]
keys: ClassVar[List[Key]]
signers: ClassVar[List[SSlibSigner]]

@classmethod
def setUpClass(cls) -> None:
# pylint: disable-next=consider-using-with
cls.temp_dir = tempfile.TemporaryDirectory()

# pre-create keys and signers
cls.keys = []
cls.signers = []
for _ in range(10):
key, signer = RepositorySimulator.create_key()
cls.keys.append(key)
cls.signers.append(signer)

@classmethod
def tearDownClass(cls) -> None:
cls.temp_dir.cleanup()

def setUp(self) -> None:
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
os.mkdir(self.metadata_dir)
os.mkdir(self.targets_dir)

self.sim = RepositorySimulator()
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
f.write(self.sim.signed_roots[0])

if self.dump_dir is not None:
# create subtest dumpdir
# pylint: disable=no-member
name = f"{self.id().split('.')[-1]}-{self.case_name}"
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)

def _init_updater(self) -> Updater:
"""Creates a new updater instance."""
if self.sim.dump_dir is not None:
self.sim.write()

updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim,
)
return updater

def test_read_rotate_file(self) -> None:
root = self.sim.root
new_keyids = root.roles["snapshot"].keyids
new_keys = {k: v for (k, v) in root.keys.items() if k in new_keyids}
self.sim.add_rotate_file(
"timestamp", new_keys, 1, self.sim.signers["timestamp"]
)
self.sim.update_snapshot()

updater = self._init_updater()
with self.assertRaises(exceptions.UnsignedMetadataError):
updater.refresh()

old_keyids = root.roles["timestamp"].keyids
old_keys = {k: v for (k, v) in root.keys.items() if k in old_keyids}
self.sim.add_rotate_file(
"timestamp", old_keys, 1, self.sim.signers["snapshot"]
)
updater.refresh()


if __name__ == "__main__":
if "--dump" in sys.argv:
TestRotateFiles.dump_dir = tempfile.mkdtemp()
print(f"Repository dumps in {TestRotateFiles.dump_dir}")
sys.argv.remove("--dump")

utils.configure_test_logging(sys.argv)
unittest.main()
Loading