Skip to content

Commit

Permalink
Add simple TUF role metadata model (WIP)
Browse files Browse the repository at this point in the history
Add metadata module with container classes for TUF role metadata, including
methods to read/serialize/write from and to JSON, perform TUF-compliant
metadata updates, and create and verify signatures.

The 'Metadata' class provides a container for inner TUF metadata objects (Root,
Timestamp, Snapshot, Targets) (i.e. OOP composition)

The 'Signed' class provides a base class to aggregate common attributes (i.e.
version, expires, spec_version) of the inner metadata classes. (i.e. OOP
inheritance). The name of the class also aligns with the 'signed' field of
the outer metadata container.

Based on prior observations in TUF's sister project in-toto, this architecture
seems to well represent the metadata model as it is defined in the
specification (see in-toto/in-toto#98 and in-toto/in-toto#142 for related
discussions).

This commits also adds tests.

**TODO: See doc header TODO list**

**Additional design considerations**
(also in regards to prior sketches of this module)

 - Aims at simplicity, brevity and recognizability of the wireline metadata
   format.

 - All attributes that correspond to fields in TUF JSON metadata are public.
   There doesn't seem to be a good reason to protect them with leading
   underscores and use setters/getters instead, it just adds more code, and
   impedes recognizability of the wireline metadata format.

 - Although, it might be convenient to have short-cuts on the Metadata class
   that point to methods and attributes that are common to all subclasses of
   the contained Signed class (e.g. Metadata.version instead of
   Metadata.signed.version, etc.), this also conflicts with goal of
   recognizability of the wireline metadata. Thus we won't add such short-cuts
   for now. See:
   theupdateframework#1060 (comment)

 - Signing keys and a 'consistent_snapshot' boolean are not on the targets
   metadata class. They are a better fit for management code. See:
   theupdateframework#1060 (comment),
   and theupdateframework#660.

 - Does not use sslib schema checks (see TODO notes about validation in
   doc header)

 - Does not use existing tuf utils, such as make_metadata_fileinfo,
   build_dict_conforming_to_schema, if it is easy and more explicit to
   just re-implement the desired behavior on the metadata classes.

 - All datetime's are treated as UTC. Since timezone info is not captured in
   the wireline metadata format it should not be captured in the internal
   representation either.

 - Does not use 3rd-party dateutil package, in order to minimize dependency
   footprint, which is especially important for update clients which often have
   to vendor their dependencies.
   However, compatibility between the more advanced dateutil.relativedelta (e.g
   handles leap years automatically) and timedelta is tested.

 - Uses PEP8 indentation (4 space) and Google-style doc string instead of
   sslab-style. See
   secure-systems-lab/code-style-guidelines#20

 - Does not support Python =< 3.5

Co-authored-by: Trishank Karthik Kuppusamy <trishank.kuppusamy@datadoghq.com>
Co-authored-by: Joshua Lock <jlock@vmware.com>
Co-authored-by: Teodora Sechkova <tsechkova@vmware.com>
Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
  • Loading branch information
4 people committed Aug 20, 2020
1 parent 5d16f91 commit 17f08ad
Show file tree
Hide file tree
Showing 2 changed files with 442 additions and 0 deletions.
165 changes: 165 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#!/usr/bin/env python

# Copyright 2020, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
""" Unit tests for api/metdata.py
"""
import logging
import os
import shutil
import tempfile
import unittest

from datetime import timedelta
from dateutil.relativedelta import relativedelta

from tuf.api.metadata import (
Snapshot,
Timestamp,
)

logger = logging.getLogger(__name__)


class TestMetadata(unittest.TestCase):
# TODO: Start Vault in a dev mode, and export VAULT_ADDR as well as VAULT_TOKEN.
# TODO: Enable the Vault Transit secrets engine.
@classmethod
def setUpClass(cls):

# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownClass() so that
# temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())

test_repo_data = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'repository_data')

cls.repo_dir = os.path.join(cls.temporary_directory, 'repository')
shutil.copytree(
os.path.join(test_repo_data, 'repository'), cls.repo_dir)

cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore')
shutil.copytree(
os.path.join(test_repo_data, 'keystore'), cls.keystore_dir)


# TODO: Shut down Vault.
@classmethod
def tearDownClass(cls):

# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated for the test cases.
shutil.rmtree(cls.temporary_directory)



# def _load_key_ring(self):
# key_list = []
# root_key = RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'),
# 'rsassa-pss-sha256', 'password')
# key_list.append(root_key)

# for key_file in os.listdir(self.keystore_dir):
# if key_file.endswith('.pub'):
# # ignore public keys
# continue

# if key_file.startswith('root_key'):
# # root key is loaded
# continue

# key = RAMKey.read_from_file(os.path.join(self.keystore_dir, key_file),
# 'ed25519', 'password')
# key_list.append(key)

# threshold = Threshold(1, 5)
# return KeyRing(threshold=threshold, keys=key_list)

def test_metadata_base(self):
# Use of Snapshot is arbitrary, we're just testing the base class features
# with real data
snapshot_path = os.path.join(
self.repo_dir, 'metadata', 'snapshot.json')
md = Snapshot.read_from_json(snapshot_path)

self.assertEqual(md.signed.version, 1)
md.signed.bump_version()
self.assertEqual(md.signed.version, 2)
self.assertEqual(md.signed.expires, '2030-01-01T00:00:00Z')
md.signed.bump_expiration()
self.assertEqual(md.signed.expires, '2030-01-02T00:00:00Z')
md.signed.bump_expiration(timedelta(days=365))
self.assertEqual(md.signed.expires, '2031-01-02T00:00:00Z')


def test_metadata_snapshot(self):
snapshot_path = os.path.join(
self.repo_dir, 'metadata', 'snapshot.json')
snapshot = Snapshot.read_from_json(snapshot_path)

# key_ring = self._load_key_ring()
# snapshot.verify(key_ring)

# Create a dict representing what we expect the updated data to be
fileinfo = snapshot.signed.meta
hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
fileinfo['role1.json']['version'] = 2
fileinfo['role1.json']['hashes'] = hashes
fileinfo['role1.json']['length'] = 123

snapshot.signed.update('role1', 2, 123, hashes)
self.assertEqual(snapshot.signed.meta, fileinfo)

# snapshot.signable()

# snapshot.sign()

# snapshot.verify()

# snapshot.write_to_json(os.path.join(cls.temporary_directory, 'api_snapshot.json'))


def test_metadata_timestamp(self):
timestamp_path = os.path.join(
self.repo_dir, 'metadata', 'timestamp.json')
timestamp = Timestamp.read_from_json(timestamp_path)

# key_ring = self._load_key_ring()
# timestamp.verify(key_ring)

self.assertEqual(timestamp.signed.version, 1)
timestamp.signed.bump_version()
self.assertEqual(timestamp.signed.version, 2)

self.assertEqual(timestamp.signed.expires, '2030-01-01T00:00:00Z')
timestamp.signed.bump_expiration()
self.assertEqual(timestamp.signed.expires, '2030-01-02T00:00:00Z')
timestamp.signed.bump_expiration(timedelta(days=365))
self.assertEqual(timestamp.signed.expires, '2031-01-02T00:00:00Z')

# Test whether dateutil.relativedelta works, this provides a much
# easier to use interface for callers
delta = relativedelta(days=1)
timestamp.signed.bump_expiration(delta)
self.assertEqual(timestamp.signed.expires, '2031-01-03T00:00:00Z')
delta = relativedelta(years=5)
timestamp.signed.bump_expiration(delta)
self.assertEqual(timestamp.signed.expires, '2036-01-03T00:00:00Z')

hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'}
fileinfo = timestamp.signed.meta['snapshot.json']
fileinfo['hashes'] = hashes
fileinfo['version'] = 2
fileinfo['length'] = 520
timestamp.signed.update(2, 520, hashes)
self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo)

# timestamp.sign()

# timestamp.write_to_json()


# Run unit test.
if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 17f08ad

Please sign in to comment.