Skip to content

Commit

Permalink
WIP: Update to new securesystemslib API
Browse files Browse the repository at this point in the history
Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
  • Loading branch information
jku committed Apr 25, 2024
1 parent 411505d commit e119a12
Show file tree
Hide file tree
Showing 26 changed files with 245 additions and 313 deletions.
55 changes: 21 additions & 34 deletions examples/manual_repo/basic_repo.py
Expand Up @@ -25,10 +25,9 @@
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict
from typing import Dict

from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibKey, SSlibSigner
from securesystemslib.signer import CryptoSigner, Signer

from tuf.api.metadata import (
SPECIFICATION_VERSION,
Expand Down Expand Up @@ -89,7 +88,7 @@ def _in(days: float) -> datetime:
# Define containers for role objects and cryptographic keys created below. This
# allows us to sign and write metadata in a batch more easily.
roles: Dict[str, Metadata] = {}
keys: Dict[str, Dict[str, Any]] = {}
signers: Dict[str, Signer] = {}


# Targets (integrity)
Expand Down Expand Up @@ -157,10 +156,8 @@ def _in(days: float) -> datetime:
# See https://github.com/secure-systems-lab/securesystemslib for more details
# about key handling, and don't forget to password-encrypt your private keys!
for name in ["targets", "snapshot", "timestamp", "root"]:
keys[name] = generate_ed25519_key()
roles["root"].signed.add_key(
SSlibKey.from_securesystemslib_key(keys[name]), name
)
signers[name] = CryptoSigner.generate_ecdsa()
roles["root"].signed.add_key(signers[name].public_key, name)

# NOTE: We only need the public part to populate root, so it is possible to use
# out-of-band mechanisms to generate key pairs and only expose the public part
Expand All @@ -173,10 +170,8 @@ def _in(days: float) -> datetime:
# threshold of multiple keys to sign root metadata. For this example we
# generate another root key (you can pretend it's out-of-band) and increase the
# required signature threshold.
another_root_key = generate_ed25519_key()
roles["root"].signed.add_key(
SSlibKey.from_securesystemslib_key(another_root_key), "root"
)
another_root_signer = CryptoSigner.generate_ecdsa()
roles["root"].signed.add_key(another_root_signer.public_key, "root")
roles["root"].signed.roles["root"].threshold = 2


Expand All @@ -185,9 +180,7 @@ def _in(days: float) -> datetime:
# In this example we have access to all top-level signing keys, so we can use
# them to create and add a signature for each role metadata.
for name in ["targets", "snapshot", "timestamp", "root"]:
key = keys[roles[name].signed.type]
signer = SSlibSigner(key)
roles[name].sign(signer)
roles[name].sign(signers[name])


# Persist metadata (consistent snapshot)
Expand Down Expand Up @@ -227,9 +220,9 @@ def _in(days: float) -> datetime:
# file, sign it, and write it back to the same file, and this can be repeated
# until the threshold is satisfied.
root_path = os.path.join(TMP_DIR, "1.root.json")
roles["root"].from_file(root_path)
roles["root"].sign(SSlibSigner(another_root_key), append=True)
roles["root"].to_file(root_path, serializer=PRETTY)
root = Metadata.from_file(root_path)
root.sign(another_root_signer, append=True)
root.to_file(root_path, serializer=PRETTY)


# Targets delegation
Expand All @@ -243,7 +236,7 @@ def _in(days: float) -> datetime:
# In this example the top-level targets role trusts a new "python-scripts"
# targets role to provide integrity for any target file that ends with ".py".
delegatee_name = "python-scripts"
keys[delegatee_name] = generate_ed25519_key()
signers[delegatee_name] = CryptoSigner.generate_ecdsa()

# Delegatee
# ---------
Expand Down Expand Up @@ -271,16 +264,13 @@ def _in(days: float) -> datetime:
# delegatee is responsible for, e.g. a list of path patterns. For details about
# all configuration parameters see
# https://theupdateframework.github.io/specification/latest/#delegations
delegatee_key = signers[delegatee_name].public_key
roles["targets"].signed.delegations = Delegations(
keys={
keys[delegatee_name]["keyid"]: SSlibKey.from_securesystemslib_key(
keys[delegatee_name]
)
},
keys={delegatee_key.keyid: delegatee_key},
roles={
delegatee_name: DelegatedRole(
name=delegatee_name,
keyids=[keys[delegatee_name]["keyid"]],
keyids=[delegatee_key.keyid],
threshold=1,
terminating=True,
paths=["*.py"],
Expand Down Expand Up @@ -319,8 +309,7 @@ def _in(days: float) -> datetime:

# Sign and write metadata for all changed roles, i.e. all but root
for role_name in ["targets", "python-scripts", "snapshot", "timestamp"]:
signer = SSlibSigner(keys[role_name])
roles[role_name].sign(signer)
roles[role_name].sign(signers[role_name])

# Prefix all but timestamp with version number (see consistent snapshot)
filename = f"{role_name}.json"
Expand All @@ -343,17 +332,15 @@ def _in(days: float) -> datetime:
# In this example we will replace a root key, and sign a new version of root
# with the threshold of old and new keys. Since one of the previous root keys
# remains in place, it can be used to count towards the old and new threshold.
new_root_key = generate_ed25519_key()
new_root_signer = CryptoSigner.generate_ecdsa()

roles["root"].signed.revoke_key(keys["root"]["keyid"], "root")
roles["root"].signed.add_key(
SSlibKey.from_securesystemslib_key(new_root_key), "root"
)
roles["root"].signed.revoke_key(signers["root"].public_key.keyid, "root")
roles["root"].signed.add_key(new_root_signer.public_key, "root")
roles["root"].signed.version += 1

roles["root"].signatures.clear()
for key in [keys["root"], another_root_key, new_root_key]:
roles["root"].sign(SSlibSigner(key), append=True)
for signer in [signers["root"], another_root_signer, new_root_signer]:
roles["root"].sign(signer, append=True)

roles["root"].to_file(
os.path.join(TMP_DIR, f"{roles['root'].signed.version}.root.json"),
Expand Down
16 changes: 7 additions & 9 deletions examples/manual_repo/hashed_bin_delegation.py
Expand Up @@ -21,10 +21,9 @@
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, Iterator, List, Tuple
from typing import Dict, Iterator, List, Tuple

from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibKey, SSlibSigner
from securesystemslib.signer import CryptoSigner, Signer

from tuf.api.metadata import (
DelegatedRole,
Expand All @@ -44,7 +43,7 @@ def _in(days: float) -> datetime:


roles: Dict[str, Metadata[Targets]] = {}
keys: Dict[str, Dict[str, Any]] = {}
signers: Dict[str, Signer] = {}

# Hash bin delegation
# ===================
Expand Down Expand Up @@ -138,7 +137,7 @@ def find_hash_bin(path: str) -> str:
# NOTE: See "Targets delegation" and "Signature thresholds" paragraphs in
# 'basic_repo.py' for more details
for name in ["bin-n", "bins"]:
keys[name] = generate_ed25519_key()
signers[name] = CryptoSigner.generate_ecdsa()


# Targets roles
Expand All @@ -149,7 +148,7 @@ def find_hash_bin(path: str) -> str:
# Create preliminary delegating targets role (bins) and add public key for
# delegated targets (bin_n) to key store. Delegation details are update below.
roles["bins"] = Metadata(Targets(expires=_in(365)))
bin_n_key = SSlibKey.from_securesystemslib_key(keys["bin-n"])
bin_n_key = signers["bin-n"].public_key
roles["bins"].signed.delegations = Delegations(
keys={bin_n_key.keyid: bin_n_key},
roles={},
Expand All @@ -169,7 +168,7 @@ def find_hash_bin(path: str) -> str:
# delegated targets role (bin_n).
roles["bins"].signed.delegations.roles[bin_n_name] = DelegatedRole(
name=bin_n_name,
keyids=[keys["bin-n"]["keyid"]],
keyids=[signers["bin-n"].public_key.keyid],
threshold=1,
terminating=False,
path_hash_prefixes=bin_n_hash_prefixes,
Expand Down Expand Up @@ -210,8 +209,7 @@ def find_hash_bin(path: str) -> str:
TMP_DIR = tempfile.mkdtemp(dir=os.getcwd())

for role_name, role in roles.items():
key = keys["bins"] if role_name == "bins" else keys["bin-n"]
signer = SSlibSigner(key)
signer = signers["bins"] if role_name == "bins" else signers["bin-n"]
role.sign(signer)

filename = f"1.{role_name}.json"
Expand Down
16 changes: 5 additions & 11 deletions examples/manual_repo/succinct_hash_bin_delegations.py
Expand Up @@ -23,10 +23,9 @@
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, Tuple
from typing import Dict

from securesystemslib.keys import generate_ed25519_key
from securesystemslib.signer import SSlibKey, SSlibSigner
from securesystemslib.signer import CryptoSigner

from tuf.api.metadata import (
Delegations,
Expand Down Expand Up @@ -80,15 +79,10 @@
THRESHOLD = 1


def create_key() -> Tuple[Key, SSlibSigner]:
"""Generates a new Key and Signer."""
sslib_key = generate_ed25519_key()
return SSlibKey.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key)


# Create one signing key for all bins, and one for the delegating targets role.
bins_key, bins_signer = create_key()
_, targets_signer = create_key()
bins_signer = CryptoSigner.generate_ecdsa()
bins_key = bins_signer.public_key
targets_signer = CryptoSigner.generate_ecdsa()

# Delegating targets role
# -----------------------
Expand Down
9 changes: 4 additions & 5 deletions examples/repository/_simplerepo.py
Expand Up @@ -10,8 +10,7 @@
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Union

from securesystemslib import keys
from securesystemslib.signer import Key, Signer, SSlibKey, SSlibSigner
from securesystemslib.signer import CryptoSigner, Key, Signer

from tuf.api.exceptions import RepositoryError
from tuf.api.metadata import (
Expand Down Expand Up @@ -76,9 +75,9 @@ def __init__(self) -> None:
# setup a basic repository, generate signing key per top-level role
with self.edit_root() as root:
for role in ["root", "timestamp", "snapshot", "targets"]:
key = keys.generate_ed25519_key()
self.signer_cache[role].append(SSlibSigner(key))
root.add_key(SSlibKey.from_securesystemslib_key(key), role)
signer = CryptoSigner.generate_ecdsa()
self.signer_cache[role].append(signer)
root.add_key(signer.public_key, role)

for role in ["timestamp", "snapshot", "targets"]:
with self.edit(role):
Expand Down
37 changes: 25 additions & 12 deletions examples/uploader/_localrepo.py
Expand Up @@ -12,8 +12,12 @@
from typing import Dict

import requests
from securesystemslib import keys
from securesystemslib.signer import SSlibKey, SSlibSigner
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
)
from securesystemslib.signer import CryptoSigner, Signer

from tuf.api.exceptions import RepositoryError
from tuf.api.metadata import Metadata, MetaFile, TargetFile, Targets
Expand Down Expand Up @@ -75,18 +79,22 @@ def open(self, role: str) -> Metadata:
md.signed.version = 0
return md

def close(self, role: str, md: Metadata) -> None:
def close(self, role_name: str, md: Metadata) -> None:
"""Store a version of metadata. Handle version bumps, expiry, signing"""
targets = self.targets()
role = targets.get_delegated_role(role_name)
public_key = targets.get_key(role.keyids[0])
uri = f"file2:{self.key_dir}/{role_name}"

signer = Signer.from_priv_key_uri(uri, public_key)

md.signed.version += 1
md.signed.expires = datetime.now(timezone.utc) + self.expiry_period

with open(f"{self.key_dir}/{role}", encoding="utf-8") as f:
signer = SSlibSigner(json.loads(f.read()))

md.sign(signer, append=False)

# Upload using "api/role"
uri = f"{self.base_url}/api/role/{role}"
uri = f"{self.base_url}/api/role/{role_name}"
r = requests.post(uri, data=md.to_bytes(JSONSerializer()), timeout=5)
r.raise_for_status()

Expand Down Expand Up @@ -115,19 +123,24 @@ def add_target(self, role: str, targetpath: str) -> bool:

def add_delegation(self, role: str) -> bool:
"""Use the (unauthenticated) delegation adding API endpoint"""
keydict = keys.generate_ed25519_key()
pubkey = SSlibKey.from_securesystemslib_key(keydict)
signer = CryptoSigner.generate_ecdsa()

data = {pubkey.keyid: pubkey.to_dict()}
data = {signer.public_key.keyid: signer.public_key.to_dict()}
url = f"{self.base_url}/api/delegation/{role}"
r = requests.post(url, data=json.dumps(data), timeout=5)
if r.status_code != 200:
print(f"delegation failed with {r}")
return False

# Store the private key using rolename as filename
with open(f"{self.key_dir}/{role}", "w", encoding="utf-8") as f:
f.write(json.dumps(keydict))
with open(f"{self.key_dir}/{role}", "wb") as f:
# TODO this is dumb and needs to be securesystemslibs job...
priv_key = signer._private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption(),
)
f.write(priv_key)

print(f"Uploaded new delegation, stored key in {self.key_dir}/{role}")
return True
2 changes: 1 addition & 1 deletion requirements/pinned.txt
Expand Up @@ -6,5 +6,5 @@ idna==3.7 # via requests
pycparser==2.22 # via cffi
pynacl==1.5.0 # via securesystemslib
requests==2.31.0
securesystemslib[crypto,pynacl]==0.31.0
securesystemslib[crypto,pynacl] @ git+https://github.com/secure-systems-lab/securesystemslib@34a42954b5064610ec98406bb55719fd19874089
urllib3==2.2.1 # via requests

0 comments on commit e119a12

Please sign in to comment.