Skip to content

Commit

Permalink
Replaced the Key signatures dict to Signers
Browse files Browse the repository at this point in the history
This commit adds a refactoring on the key signature used.
Instead of using from Key Storage Service keys as a dictionary, uses
that as a ``securesystemslib.signer.Signer``. It gives more
flexibility and uses the same data structure across the services,
repository and TUF.

Signed-off-by: Kairo de Araujo <kdearaujo@vmware.com>
  • Loading branch information
Kairo de Araujo committed Jun 25, 2022
1 parent a551965 commit cbd1e41
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 93 deletions.
103 changes: 80 additions & 23 deletions tests/unit/tuf/test_repository.py
Expand Up @@ -100,8 +100,13 @@ def test__create_delegated_targets_roles(self, tuf_repository, monkeypatch):
tuf_repository.load_role = pretend.call_recorder(
lambda role: fake_snapshot_md if role == Snapshot.type else None
)

tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None)
fake_signers = [
pretend.stub(
key_dict={"keyid": "key1"},
sign=pretend.call_recorder(lambda *a: "key1"),
)
]

test_delegate_roles_parameters = [
(
Expand All @@ -112,7 +117,7 @@ def test__create_delegated_targets_roles(self, tuf_repository, monkeypatch):
False,
paths=["*/*"],
),
[{"keyid": "key1"}, {"keyid": "key2"}],
fake_signers,
fake_time,
)
]
Expand Down Expand Up @@ -149,6 +154,12 @@ def test__create_delegated_targets_roles_with_snapshot_md(
)
)
fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={}))
fake_signers = [
pretend.stub(
key_dict={"keyid": "key1"},
sign=pretend.call_recorder(lambda *a: "key1"),
)
]

tuf_repository.load_role = pretend.call_recorder(
lambda role: fake_snapshot_md if role == Snapshot.type else None
Expand All @@ -164,7 +175,7 @@ def test__create_delegated_targets_roles_with_snapshot_md(
False,
paths=["*/*"],
),
[{"keyid": "key1"}, {"keyid": "key2"}],
fake_signers,
fake_time,
)
]
Expand Down Expand Up @@ -201,6 +212,12 @@ def test__create_delegated_targets_roles_has_delegations(
)
)
fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={}))
fake_signers = [
pretend.stub(
key_dict={"keyid": "key1"},
sign=pretend.call_recorder(lambda *a: "key1"),
)
]

tuf_repository.load_role = pretend.call_recorder(
lambda role: fake_snapshot_md if role == Snapshot.type else None
Expand All @@ -216,7 +233,7 @@ def test__create_delegated_targets_roles_has_delegations(
False,
paths=["*/*"],
),
[{"keyid": "key1"}, {"keyid": "key2"}],
fake_signers,
fake_time,
)
]
Expand Down Expand Up @@ -281,10 +298,20 @@ def test_initialization(self, tuf_repository):
),
},
}
fake_signers = [
pretend.stub(
key_dict=fake_key,
sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key1")),
),
pretend.stub(
key_dict=fake_key,
sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key2")),
),
]

top_roles_payload = dict()
for role in TOP_LEVEL_ROLE_NAMES:
top_roles_payload[role] = [fake_key, fake_key]
top_roles_payload[role] = fake_signers

tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None)
tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None)
Expand Down Expand Up @@ -315,9 +342,20 @@ def test_initialization_store_false(self, tuf_repository):
),
},
}
fake_signers = [
pretend.stub(
key_dict=fake_key,
sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key1")),
),
pretend.stub(
key_dict=fake_key,
sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key2")),
),
]

top_roles_payload = dict()
for role in TOP_LEVEL_ROLE_NAMES:
top_roles_payload[role] = [fake_key, fake_key]
top_roles_payload[role] = fake_signers

tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None)
tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None)
Expand Down Expand Up @@ -365,9 +403,15 @@ def test_initialization_threshold_more_than_keys(self, tuf_repository):
),
},
}
fake_signers = [
pretend.stub(
key_dict=fake_key,
sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key1")),
)
]
top_roles_payload = dict()
for role in TOP_LEVEL_ROLE_NAMES:
top_roles_payload[role] = [fake_key]
top_roles_payload[role] = fake_signers

tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None)
tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None)
Expand Down Expand Up @@ -409,7 +453,12 @@ def test_delegate_targets_roles(self, tuf_repository):
),
},
}
payload = {"xxxx-yyyy": [fake_key]}
fake_signers = [
pretend.stub(
key_dict=fake_key, sign=pretend.call_recorder(lambda *a: "key1")
)
]
payload = {"xxxx-yyyy": fake_signers}
fake_targets_md = pretend.stub(
signed=pretend.stub(
delegations=None,
Expand Down Expand Up @@ -449,7 +498,7 @@ def test_delegate_targets_roles(self, tuf_repository):
rolename="xxxx-yyyy",
role_metadata=fake_targets_md,
role_expires=fake_time,
key_rolename=None,
signers=None,
store=True,
)
]
Expand All @@ -463,22 +512,27 @@ def test_delegate_targets_roles(self, tuf_repository):
def test_bump_role_version(self, tuf_repository):
fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1)
fake_new_time = datetime.datetime(2022, 6, 16, 9, 5, 1)
fake_signers = [
pretend.stub(
key_dict={"keyid": "fake_id"},
sign=pretend.call_recorder(lambda *a: "key1"),
)
]
initial_version = 1983
fake_role_metadata = pretend.stub(
signed=pretend.stub(expires=fake_time, version=initial_version),
sign=lambda *a, **kw: None,
)

tuf_repository.key_backend = pretend.stub(
get=pretend.call_recorder(lambda role: [{"key": "key_data"}])
get=pretend.call_recorder(lambda role: fake_signers)
)

result = tuf_repository.bump_role_version(
"fake_role", fake_role_metadata, fake_new_time
"fake_role", fake_role_metadata, fake_new_time, fake_signers
)
assert result.signed.version == initial_version + 1
assert result.signed.expires == fake_new_time
assert tuf_repository.key_backend.get.calls == [pretend.call("fake_role")]

def test_bump_role_version_store_true(self, tuf_repository):
fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1)
Expand All @@ -488,17 +542,19 @@ def test_bump_role_version_store_true(self, tuf_repository):
signed=pretend.stub(expires=fake_time, version=initial_version),
sign=lambda *a, **kw: None,
)
fake_signers = [
pretend.stub(
key_dict={"keyid": "fake_id"},
sign=pretend.call_recorder(lambda *a: "key1"),
)
]

tuf_repository.key_backend = pretend.stub(
get=pretend.call_recorder(lambda role: [{"key": "key_data"}])
)
tuf_repository._store = pretend.call_recorder(lambda rolename, role_md: None)
result = tuf_repository.bump_role_version(
"fake_role", fake_role_metadata, fake_new_time, store=True
"fake_role", fake_role_metadata, fake_new_time, fake_signers, store=True
)
assert result.signed.version == initial_version + 1
assert result.signed.expires == fake_new_time
assert tuf_repository.key_backend.get.calls == [pretend.call("fake_role")]
assert tuf_repository._store.calls == [
pretend.call("fake_role", fake_role_metadata)
]
Expand All @@ -511,17 +567,18 @@ def test_bump_role_version_with_key_rolename(self, tuf_repository):
signed=pretend.stub(expires=fake_time, version=initial_version),
sign=lambda *a, **kw: None,
)

tuf_repository.key_backend = pretend.stub(
get=pretend.call_recorder(lambda role: [{"key": "key_data"}])
)
fake_signers = [
pretend.stub(
key_dict={"keyid": "fake_id"},
sign=pretend.call_recorder(lambda *a: "key1"),
)
]

result = tuf_repository.bump_role_version(
"fake_role", fake_role_metadata, fake_new_time, "key_role_name"
"fake_role", fake_role_metadata, fake_new_time, fake_signers
)
assert result.signed.version == initial_version + 1
assert result.signed.expires == fake_new_time
assert tuf_repository.key_backend.get.calls == [pretend.call("key_role_name")]

def test_bump_timestamp_version(self, tuf_repository):
fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1)
Expand Down
25 changes: 18 additions & 7 deletions tests/unit/tuf/test_services.py
Expand Up @@ -61,7 +61,7 @@ def test_get(self, db_request, monkeypatch):

root_keyid = service.get("root")

assert root_keyid == [expected_priv_key_dict]
assert root_keyid[0].key_dict == expected_priv_key_dict


class TestLocalStorageService:
Expand Down Expand Up @@ -420,10 +420,14 @@ def test_init_repository_already_initialized(self, db_request, monkeypatch):

def test_init_targets_delegation(self, db_request, monkeypatch):
fake_storage = pretend.stub()
fake_key_storage = pretend.stub(
get=pretend.call_recorder(
lambda role: [{"keyid": "key1"}, {"keyid": "key2"}]
fake_signers = [
pretend.stub(
key_dict={"keyid": "fake_id"},
sign=pretend.call_recorder(lambda *a: "key1"),
)
]
fake_key_storage = pretend.stub(
get=pretend.call_recorder(lambda role: fake_signers)
)

fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1)
Expand Down Expand Up @@ -463,12 +467,12 @@ def test_init_targets_delegation(self, db_request, monkeypatch):
assert sorted(["targets", "bins"]) == sorted(list(call_args.keys()))
assert len(call_args["targets"]) == 1
assert type(call_args["targets"][0][0]) == services.DelegatedRole
assert call_args["targets"][0][1] == [{"keyid": "key1"}, {"keyid": "key2"}]
assert call_args["targets"][0][1][0].key_dict == {"keyid": "fake_id"}
assert (
len(call_args["bins"]) == 16384
) # PEP458 https://peps.python.org/pep-0458/#metadata-scalability
assert type(call_args["bins"][0][0]) == services.DelegatedRole
assert call_args["bins"][0][1] == [{"keyid": "key1"}, {"keyid": "key2"}]
assert call_args["bins"][0][1][0].key_dict == {"keyid": "fake_id"}
# 1 target + # PEP458 https://peps.python.org/pep-0458/#metadata-scalability
assert len(fake_metadata_repository._set_expiration_for_role.calls) == 16385

Expand Down Expand Up @@ -558,8 +562,14 @@ def test_bump_snapshot_specific_snapshot_metadata(self, db_request, monkeypatch)

def test_bump_bin_n_roles(self, db_request, monkeypatch):
fake_storage = pretend.stub()
fake_signers = [
pretend.stub(
key_dict={"keyid": "fake_id"},
sign=pretend.call_recorder(lambda *a: "key1"),
)
]
fake_key_storage = pretend.stub(
get=pretend.call_recorder(lambda role: "fake_key")
get=pretend.call_recorder(lambda role: fake_signers)
)

fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1)
Expand Down Expand Up @@ -592,6 +602,7 @@ def test_bump_bin_n_roles(self, db_request, monkeypatch):
),
timestamp_bump_version=pretend.call_recorder(lambda *a, **kw: None),
_set_expiration_for_role=pretend.call_recorder(lambda *a: fake_datetime),
_key_storage_backend=pretend.call_recorder(lambda *a: fake_signers),
)
monkeypatch.setattr(
"warehouse.tuf.services.MetadataRepository",
Expand Down

0 comments on commit cbd1e41

Please sign in to comment.