-
-
Notifications
You must be signed in to change notification settings - Fork 5
/
repository_tool.py
1627 lines (1380 loc) · 63.4 KB
/
repository_tool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import datetime
import json
import operator
import os
import shutil
from fnmatch import fnmatch
from functools import partial, reduce
from pathlib import Path
import securesystemslib
import tuf.roledb
from securesystemslib.exceptions import Error as SSLibError
from securesystemslib.interface import import_rsa_privatekey_from_file
from securesystemslib.util import get_file_details
from tuf.exceptions import Error as TUFError, RepositoryError
from tuf.repository_tool import (
METADATA_DIRECTORY_NAME,
TARGETS_DIRECTORY_NAME,
import_rsakey_from_pem,
load_repository,
)
from tuf.roledb import get_roleinfo
from taf import YubikeyMissingLibrary
from taf.constants import DEFAULT_RSA_SIGNATURE_SCHEME
from taf.exceptions import (
InvalidKeyError,
MetadataUpdateError,
RootMetadataUpdateError,
SigningError,
SnapshotMetadataUpdateError,
TargetsError,
TargetsMetadataUpdateError,
TimestampMetadataUpdateError,
YubikeyError,
InvalidRepositoryError,
KeystoreError,
)
from taf.git import GitRepository
from taf.utils import normalize_file_line_endings, on_rm_error
try:
import taf.yubikey as yk
except ImportError:
yk = YubikeyMissingLibrary()
# Default expiration intervals per role
expiration_intervals = {"root": 365, "targets": 90, "snapshot": 7, "timestamp": 1}
# Loaded keys cache
role_keys_cache = {}
DISABLE_KEYS_CACHING = False
HASH_FUNCTION = "sha256"
def get_role_metadata_path(role):
return f"{METADATA_DIRECTORY_NAME}/{role}.json"
def get_target_path(target_name):
return f"{TARGETS_DIRECTORY_NAME}/{target_name}"
def is_delegated_role(role):
return role not in ("root", "targets", "snapshot", "timestamp")
def is_auth_repo(repo_path):
"""Check if the given path contains a valid TUF repository"""
try:
Repository(repo_path)._repository
return True
except Exception:
return False
def load_role_key(keystore, role, password=None, scheme=DEFAULT_RSA_SIGNATURE_SCHEME):
"""Loads the specified role's key from a keystore file.
The keystore file can, but doesn't have to be password protected.
NOTE: Keys inside keystore should match a role name!
Args:
- keystore(str): Path to the keystore directory
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- password(str): (Optional) password used for PEM decryption
- scheme(str): A signature scheme used for signing.
Returns:
- An RSA key object, conformant to 'securesystemslib.RSAKEY_SCHEMA'.
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.CryptoError: If path is not a valid encrypted key file.
"""
if not password:
password = None
key = role_keys_cache.get(role)
if key is None:
try:
if password is not None:
key = import_rsa_privatekey_from_file(
str(Path(keystore, role)), password, scheme=scheme
)
else:
key = import_rsa_privatekey_from_file(
str(Path(keystore, role)), scheme=scheme
)
except FileNotFoundError:
raise KeystoreError(f"Cannot find {role} key in {keystore}")
if not DISABLE_KEYS_CACHING:
role_keys_cache[role] = key
return key
def root_signature_provider(signature_dict, key_id, _key, _data):
"""Root signature provider used to return signatures created remotely.
Args:
- signature_dict(dict): Dict where key is key_id and value is signature
- key_id(str): Key id from targets metadata file
- _key(securesystemslib.formats.RSAKEY_SCHEMA): Key info
- _data(dict): Data to sign (already signed remotely)
Returns:
Dictionary that comforms to `securesystemslib.formats.SIGNATURE_SCHEMA`
Raises:
- KeyError: If signature for key_id is not present in signature_dict
"""
from binascii import hexlify
return {"keyid": key_id, "sig": hexlify(signature_dict.get(key_id)).decode()}
def yubikey_signature_provider(name, key_id, key, data): # pylint: disable=W0613
"""
A signatures provider which asks the user to insert a yubikey
Useful if several yubikeys need to be used at the same time
"""
from binascii import hexlify
def _check_key_and_get_pin(expected_key_id):
try:
inserted_key = yk.get_piv_public_key_tuf()
if expected_key_id != inserted_key["keyid"]:
return None
serial_num = yk.get_serial_num(inserted_key)
pin = yk.get_key_pin(serial_num)
if pin is None:
pin = yk.get_and_validate_pin(name)
return pin
except Exception:
return None
while True:
# check if the needed YubiKey is inserted before asking the user to do so
# this allows us to use this signature provider inside an automated process
# assuming that all YubiKeys needed for signing are inserted
pin = _check_key_and_get_pin(key_id)
if pin is not None:
break
input(f"Insert {name} and press enter")
signature = yk.sign_piv_rsa_pkcs1v15(data, pin)
return {"keyid": key_id, "sig": hexlify(signature).decode()}
class Repository:
def __init__(self, path, repo_name="default"):
self.path = Path(path)
self.name = repo_name
_framework_files = ["repositories.json", "test-auth-repo"]
@property
def targets_path(self):
return self.path / TARGETS_DIRECTORY_NAME
@property
def metadata_path(self):
return self.path / METADATA_DIRECTORY_NAME
_tuf_repository = None
@property
def _repository(self):
if self._tuf_repository is None:
self._load_tuf_repository(self.path)
return self._tuf_repository
@property
def repo_id(self):
return GitRepository(path=self.path).initial_commit
@property
def certs_dir(self):
certs_dir = self.path / "certs"
certs_dir.mkdir(parents=True, exist_ok=True)
return str(certs_dir)
def _add_delegated_key(
self, role, keyid, pub_key, keytype="rsa", scheme=DEFAULT_RSA_SIGNATURE_SCHEME
):
"""
Adds public key of a new delegated role to the list of all keys of
delegated roles.
Args:
- role (str): parent target role's name
- keyid (str): keyid of the new signing key
- pub_key(str): public component of the new signing key
- keytype (str): key's type
- sheme (str): signature scheme
"""
roleinfo = tuf.roledb.get_roleinfo(role, self.name)
keysinfo = roleinfo["delegations"]["keys"]
if keyid in keysinfo:
return
key = {"public": pub_key.strip()}
key_metadata_format = securesystemslib.keys.format_keyval_to_metadata(
keytype, scheme, key
)
keysinfo[keyid] = key_metadata_format
tuf.roledb.update_roleinfo(role, roleinfo, repository_name=self.name)
def _add_target(self, targets_obj, file_path, custom=None):
"""
<Purpose>
Normalizes line endings (converts all line endings to unix style endings) and
registers the target file as a TUF target
<Arguments>
targets_obj: TUF targets objects (instance of TUF's targets role class)
file_path: full path of the target file
custom: custom target data
"""
file_path = str(Path(file_path).absolute())
targets_directory_length = len(targets_obj._targets_directory) + 1
relative_path = file_path[targets_directory_length:].replace("\\", "/")
normalize_file_line_endings(file_path)
targets_obj.add_target(relative_path, custom)
def add_metadata_key(self, role, pub_key_pem, scheme=DEFAULT_RSA_SIGNATURE_SCHEME):
"""Add metadata key of the provided role.
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- pub_key_pem(str|bytes): Public key in PEM format
Returns:
None
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
targets object.
- securesystemslib.exceptions.UnknownKeyError: If 'key_id' is not found in the keydb database.
"""
if isinstance(pub_key_pem, bytes):
pub_key_pem = pub_key_pem.decode("utf-8")
if is_delegated_role(role):
parent_role = self.find_delegated_roles_parent(role)
tuf.roledb._roledb_dict[self.name][role]["keyids"] = self.get_role_keys(
role, parent_role
)
key = import_rsakey_from_pem(pub_key_pem, scheme)
self._role_obj(role).add_verification_key(key)
if is_delegated_role(role):
keyids = tuf.roledb.get_roleinfo(role, self.name)["keyids"]
self.set_delegated_role_property("keyids", role, keyids, parent_role)
self._add_delegated_key(parent_role, keyids[-1], pub_key_pem, scheme=scheme)
def _load_tuf_repository(self, path):
"""
Load tuf repository. Should only be called directly if a different set of metadata files
should be loaded (and not the one located at repo path/metadata)
"""
# before attempting to load tuf repository, create empty targets directory if it does not
# exist to avoid errors raised by tuf
targets_existed = True
if not self.targets_path.is_dir():
targets_existed = False
self.targets_path.mkdir(parents=True, exist_ok=True)
current_dir = self.metadata_path / "current"
previous_dir = self.metadata_path / "previous"
if current_dir.is_dir():
shutil.rmtree(current_dir)
if previous_dir.is_dir():
shutil.rmtree(previous_dir)
try:
self._tuf_repository = load_repository(str(path), self.name)
except RepositoryError:
if not targets_existed:
self.targets_path.rmdir()
raise InvalidRepositoryError(f"{self.name} is not a valid TUF repository!")
def reload_tuf_repository(self):
"""
Reload tuf repository. Should be called after content on the disk is called.
"""
tuf.roledb.remove_roledb(self.name)
self._load_tuf_repository(self.path)
def _role_obj(self, role):
"""Helper function for getting TUF's role object, given the role's name
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
Returns:
One of metadata objects:
Root, Snapshot, Timestamp, Targets or delegated metadata
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
targets object.
"""
if role == "targets":
return self._repository.targets
elif role == "snapshot":
return self._repository.snapshot
elif role == "timestamp":
return self._repository.timestamp
elif role == "root":
return self._repository.root
return self._repository.targets(role)
def _try_load_metadata_key(self, role, key):
"""Check if given key can be used to sign given role and load it.
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- key(securesystemslib.formats.RSAKEY_SCHEMA): Private key used to sign metadata
Returns:
None
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
targets object.
- InvalidKeyError: If metadata cannot be signed with given key.
"""
if not self.is_valid_metadata_key(role, key):
raise InvalidKeyError(role)
self._role_obj(role).load_signing_key(key)
def add_existing_target(self, file_path, targets_role="targets", custom=None):
"""Registers new target files with TUF.
The files are expected to be inside the targets directory.
Args:
- file_path(str): Path to target file
- targets_role(str): Targets or delegated role: a targets role (the root targets role
or one of the delegated ones)
- custom(dict): Custom information for given file
Returns:
None
Raises:
- securesystemslib.exceptions.FormatError: If 'filepath' is improperly formatted.
- securesystemslib.exceptions.Error: If 'filepath' is not located in the repository's targets
directory.
"""
targets_obj = self._role_obj(targets_role)
self._add_target(targets_obj, file_path, custom)
def get_all_target_files_state(self):
"""Create dictionaries of added/modified and removed files by comparing current
file-system state with current signed targets (and delegations) metadata state.
Args:
- None
Returns:
- Dict of added/modified files and dict of removed target files (inputs for
`modify_targets` method.)
Raises:
- None
"""
added_target_files = {}
removed_target_files = {}
# current fs state
fs_target_files = self.all_target_files()
# current signed state
signed_target_files = self.get_signed_target_files()
# existing files with custom data and (modified) content
for file_name in fs_target_files:
target_file = self.targets_path / file_name
_, hashes = get_file_details(str(target_file))
# register only new or changed files
if hashes.get(HASH_FUNCTION) != self.get_target_file_hashes(file_name):
added_target_files[file_name] = {
"target": target_file.read_text(),
"custom": self.get_target_file_custom_data(file_name),
}
# removed files
for file_name in signed_target_files - fs_target_files:
removed_target_files[file_name] = {}
return added_target_files, removed_target_files
def get_signed_target_files(self):
"""Return all target files signed by all roles.
Args:
- None
Returns:
- Set of all target paths relative to targets directory
"""
all_roles = self.get_all_targets_roles()
return self.get_singed_target_files_of_roles(all_roles)
def get_singed_target_files_of_roles(self, roles):
"""Return all target files signed by the specified roles
Args:
- roles whose target files will be returned
Returns:
- Set of paths of target files of a role relative to targets directory
"""
if roles is None:
roles = self.get_all_targets_roles()
return set(
reduce(
operator.iconcat,
[self._role_obj(role).target_files for role in roles],
[],
)
)
def get_signed_targets_with_custom_data(self, roles):
"""Return all target files signed by the specified roles and and their custom data
as specified in the metadata files
Args:
- roles whose target files will be returned
Returns:
- A dictionary whose keys are parts of target files relative to the targets directory
and values are custom data dictionaries.
"""
if roles is None:
roles = self.get_all_targets_roles()
target_files = {}
for role in roles:
roles_targets = self._role_obj(role).target_files
for target_file, custom_data in roles_targets.items():
target_files.setdefault(target_file, {}).update(custom_data)
return target_files
def modify_targets(self, added_data=None, removed_data=None):
"""Creates a target.json file containing a repository's commit for each repository.
Adds those files to the tuf repository.
Args:
- added_data(dict): Dictionary of new data whose keys are target paths of repositories
(as specified in targets.json, relative to the targets dictionary).
The values are of form:
{
target: content of the target file
custom: {
custom_field1: custom_value1,
custom_field2: custom_value2
}
}
- removed_data(dict): Dictionary of the old data whose keys are target paths of
repositories
(as specified in targets.json, relative to the targets dictionary).
The values are not needed. This is just for consistency.
Content of the target file can be a dictionary, in which case a json file will be created.
If that is not the case, an ordinary textual file will be created.
If content is not specified and the file already exists, it will not be modified.
If it does not exist, an empty file will be created. To replace an existing file with an
empty file, specify empty content (target: '')
Custom is an optional property which, if present, will be used to specify a TUF target's
Returns:
- Role name used to update given targets
"""
added_data = {} if added_data is None else added_data
removed_data = {} if removed_data is None else removed_data
data = dict(added_data, **removed_data)
if not data:
raise TargetsError("Nothing to be modified!")
target_paths = list(data.keys())
targets_role = self.get_role_from_target_paths(data)
if targets_role is None:
raise TargetsError(
f"Could not find a common role for target paths:\n{'-'.join(target_paths)}"
)
targets_obj = self._role_obj(targets_role)
# add new target files
for path, target_data in added_data.items():
target_path = (self.targets_path / path).absolute()
self._create_target_file(target_path, target_data)
custom = target_data.get("custom", None)
self._add_target(targets_obj, str(target_path), custom)
# remove existing target files
for path in removed_data.keys():
target_path = (self.targets_path / path).absolute()
if target_path.exists():
if target_path.is_file():
target_path.unlink()
elif target_path.is_dir():
shutil.rmtree(target_path, onerror=on_rm_error)
try:
targets_obj.remove_target(path)
except Exception:
continue
return targets_role
def all_target_files(self):
"""
Return a set of relative paths of all files inside the targets
directory
"""
targets = []
for root, _, filenames in os.walk(str(self.targets_path)):
for filename in filenames:
filepath = Path(root) / filename
if filepath.is_file():
targets.append(
str(
Path(
os.path.relpath(str(filepath), str(self.targets_path))
).as_posix()
)
)
return set(targets)
def get_target_file_custom_data(self, target_path):
"""
Return a custom data of a given target.
"""
try:
role = self.get_role_from_target_paths([target_path])
roleinfo = get_roleinfo(role)
return roleinfo["paths"][target_path]
except Exception:
return None
def get_target_file_hashes(self, target_path, hash_func=HASH_FUNCTION):
"""
Return hashes of a given target path.
"""
hashes = {"sha256": None, "sha512": None}
try:
role = self.get_role_from_target_paths([target_path])
role_dict = json.loads((self.metadata_path / f"{role}.json").read_text())
hashes.update(role_dict["signed"]["targets"][target_path]["hashes"])
except Exception:
pass
return hashes.get(hash_func, hashes)
def get_role_from_target_paths(self, target_paths):
"""
Find a common role that can be used to sign given target paths.
NOTE: Currently each target has only one mapped role.
"""
targets_roles = self.map_signing_roles(target_paths)
roles = list(targets_roles.values())
try:
# all target files should have at least one common role
common_role = reduce(
set.intersection,
[set([r]) if isinstance(r, str) else set(r) for r in roles],
)
except TypeError:
return None
if not common_role:
return None
return common_role.pop()
def check_roles_expiration_dates(
self, interval=None, start_date=None, excluded_roles=None
):
"""Determines which metadata roles have expired, or will expire within a time frame.
Args:
- interval(int): Number of days to look ahead for expiration.
- start_date(datetime): Start date to look for expiration.
- excluded_roles(list): List of roles to exclude from the search.
Returns:
- A dictionary of roles that have expired, or will expire within the given time frame.
Results are sorted by expiration date.
"""
if start_date is None:
start_date = datetime.datetime.now()
if interval is None:
interval = 30
expiration_threshold = start_date + datetime.timedelta(days=interval)
if excluded_roles is None:
excluded_roles = []
target_roles = self.get_all_targets_roles()
main_roles = ["root", "targets", "snapshot", "timestamp"]
existing_roles = list(set(target_roles + main_roles) - set(excluded_roles))
expired_dict = {}
will_expire_dict = {}
for role in existing_roles:
expiry_date = self.get_expiration_date(role)
if start_date > expiry_date:
expired_dict[role] = expiry_date
elif expiration_threshold >= expiry_date:
will_expire_dict[role] = expiry_date
# sort by expiry date
expired_dict = {
k: v for k, v in sorted(expired_dict.items(), key=lambda item: item[1])
}
will_expire_dict = {
k: v for k, v in sorted(will_expire_dict.items(), key=lambda item: item[1])
}
return expired_dict, will_expire_dict
def _collect_target_paths_of_role(self, target_roles_paths):
all_target_relpaths = []
for target_role_path in target_roles_paths:
try:
if (self.targets_path / target_role_path).is_file():
all_target_relpaths.append(target_role_path)
continue
except OSError:
pass
for filepath in self.targets_path.rglob(target_role_path):
if filepath.is_file():
file_rel_path = str(
Path(filepath).relative_to(self.targets_path).as_posix()
)
all_target_relpaths.append(file_rel_path)
return all_target_relpaths
def _create_target_file(self, target_path, target_data):
# if the target's parent directory should not be "targets", create
# its parent directories if they do not exist
target_dir = target_path.parents[0]
target_dir.mkdir(parents=True, exist_ok=True)
# create the target file
content = target_data.get("target", None)
if content is None:
if not target_path.is_file():
target_path.touch()
else:
with open(str(target_path), "w") as f:
if isinstance(content, dict):
json.dump(content, f, indent=4)
else:
f.write(content)
def delete_unregistered_target_files(self, targets_role="targets"):
"""
Delete all target files not specified in targets.json
"""
targets_obj = self._role_obj(targets_role)
target_files_by_roles = self.sort_roles_targets_for_filenames()
if targets_role in target_files_by_roles:
for file_rel_path in target_files_by_roles[targets_role]:
if file_rel_path not in targets_obj.target_files:
(self.targets_path / file_rel_path).unlink()
def find_delegated_roles_parent(self, role_name):
"""
A simple implementation of finding a delegated targets role's parent
assuming that every delegated role is delegated by just one role
and that there won't be many delegations.
Args:
- role_name: Role
Returns:
Parent role's name
"""
def _find_delegated_role(parent_role_name, role_name):
delegations = self.get_delegations_info(parent_role_name)
if len(delegations):
for role_info in delegations.get("roles"):
# check if this role can sign target_path
delegated_role_name = role_info["name"]
if delegated_role_name == role_name:
return parent_role_name
parent = _find_delegated_role(delegated_role_name, role_name)
if parent is not None:
return parent
return None
return _find_delegated_role("targets", role_name)
def find_keys_roles(self, public_keys):
"""Find all roles that can be signed by the provided keys.
A role can be signed by the list of keys if at least the number
of keys that can sign that file is equal to or greater than the role's
threshold
"""
def _map_keys_to_roles(role_name, key_ids):
keys_roles = []
delegations = self.get_delegations_info(role_name)
if len(delegations):
for role_info in delegations.get("roles"):
# check if this role can sign target_path
delegated_role_name = role_info["name"]
delegated_roles_keyids = role_info["keyids"]
delegated_roles_threshold = role_info["threshold"]
num_of_signing_keys = len(
set(delegated_roles_keyids).intersection(key_ids)
)
if num_of_signing_keys >= delegated_roles_threshold:
keys_roles.append(delegated_role_name)
keys_roles.extend(_map_keys_to_roles(delegated_role_name, key_ids))
return keys_roles
keyids = [key["keyid"] for key in public_keys]
return _map_keys_to_roles("targets", keyids)
def get_all_targets_roles(self):
"""
Return a list containing names of all target roles
"""
def _traverse_targets_roles(role_name):
roles = [role_name]
delegations = self.get_delegations_info(role_name)
if len(delegations):
for role_info in delegations.get("roles"):
# check if this role can sign target_path
delegated_role_name = role_info["name"]
roles.extend(_traverse_targets_roles(delegated_role_name))
return roles
return _traverse_targets_roles("targets")
def get_delegated_role_property(self, property_name, role_name, parent_role=None):
"""
Extract value of the specified property of the provided delegated role from
its parent's role info.
Args:
- property_name: Name of the property (like threshold)
- role_name: Role
- parent_role: Parent role
Returns:
The specified property's value
"""
# TUF raises an error when asking for properties like threshold and signing keys
# of a delegated role (see https://github.com/theupdateframework/tuf/issues/574)
# The following workaround presumes that one every delegated role is a deegation
# of exactly one delegated role
if parent_role is None:
parent_role = self.find_delegated_roles_parent(role_name)
delegations = self.get_delegations_info(parent_role)
for delegated_role in delegations["roles"]:
if delegated_role["name"] == role_name:
return delegated_role[property_name]
return None
def get_expiration_date(self, role):
return self._role_obj(role).expiration
def get_role_keys(self, role, parent_role=None):
"""Get keyids of the given role
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- parent_role(str): Name of the parent role of the delegated role. If not specified,
it will be set automatically, but this might be slow if there
are many delegations.
Returns:
List of the role's keyids (i.e., keyids of the keys).
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
targets object.
"""
role_obj = self._role_obj(role)
if role_obj is None:
return None
try:
return role_obj.keys
except KeyError:
pass
return self.get_delegated_role_property("keyids", role, parent_role)
def get_role_paths(self, role, parent_role=None):
"""Get paths of the given role
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- parent_role(str): Name of the parent role of the delegated role. If not specified,
it will be set automatically, but this might be slow if there
are many delegations.
Returns:
Defined delegated paths of delegate target role or * in case of targets
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
"""
if role == "targets":
return "*"
return self.get_delegated_role_property("paths", role, parent_role)
def get_role_repositories(self, role, parent_role=None):
"""Get repositories of the given role
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- parent_role(str): Name of the parent role of the delegated role. If not specified,
it will be set automatically, but this might be slow if there
are many delegations.
Returns:
Repositories' path from repositories.json that matches given role paths
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
"""
role_paths = self.get_role_paths(role, parent_role=parent_role)
target_repositories = self._get_target_repositories()
return [
repo
for repo in target_repositories
if any([fnmatch(repo, path) for path in role_paths])
]
def get_delegations_info(self, role_name):
# load repository is not already loaded
self._repository
return tuf.roledb.get_roleinfo(role_name, self.name).get("delegations")
def get_role_threshold(self, role, parent_role=None):
"""Get threshold of the given role
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- parent_role(str): Name of the parent role of the delegated role. If not specified,
it will be set automatically, but this might be slow if there
are many delegations.
Returns:
Role's signatures threshold
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
"""
role_obj = self._role_obj(role)
if role_obj is None:
return None
try:
return role_obj.threshold
except KeyError:
pass
return self.get_delegated_role_property("threshold", role, parent_role)
def get_signable_metadata(self, role):
"""Return signable portion of newly generate metadata for given role.
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
Returns:
A string representing the 'object' encoded in canonical JSON form or None
Raises:
None
"""
try:
from tuf.keydb import get_key
signable = None
role_obj = self._role_obj(role)
key = get_key(role_obj.keys[0])
def _provider(key, data):
nonlocal signable
signable = securesystemslib.formats.encode_canonical(data)
role_obj.add_external_signature_provider(key, _provider)
self.writeall()
except (IndexError, TUFError, SSLibError):
return signable
def _get_target_repositories(self):
repositories_path = self.targets_path / "repositories.json"
if repositories_path.exists():
repositories = repositories_path.read_text()
repositories = json.loads(repositories)["repositories"]
return [str(Path(target_path).as_posix()) for target_path in repositories]
def is_valid_metadata_key(self, role, key, scheme=DEFAULT_RSA_SIGNATURE_SCHEME):
"""Checks if metadata role contains key id of provided key.
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- key(securesystemslib.formats.RSAKEY_SCHEMA): Role's key.
Returns:
Boolean. True if key id is in metadata role key ids, False otherwise.
Raises:
- securesystemslib.exceptions.FormatError: If key does not match RSAKEY_SCHEMA
- securesystemslib.exceptions.UnknownRoleError: If role does not exist
"""
if isinstance(key, str):
key = import_rsakey_from_pem(key, scheme)
securesystemslib.formats.RSAKEY_SCHEMA.check_match(key)
return key["keyid"] in self.get_role_keys(role)
def is_valid_metadata_yubikey(self, role, public_key=None):
"""Checks if metadata role contains key id from YubiKey.
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one
- public_key(securesystemslib.formats.RSAKEY_SCHEMA): RSA public key dict
Returns:
Boolean. True if smart card key id belongs to metadata role key ids
Raises:
- YubikeyError
- securesystemslib.exceptions.FormatError: If 'PEM' is improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If role does not exist
"""
securesystemslib.formats.NAME_SCHEMA.check_match(role)
if public_key is None:
public_key = yk.get_piv_public_key_tuf()
return self.is_valid_metadata_key(role, public_key)
def map_signing_roles(self, target_filenames):
"""
For each target file, find delegated role responsible for that target file based
on the delegated paths. The most specific role (meaning most deeply nested) whose
delegation path matches the target's path is returned as that file's matching role.
If there are no delegated roles with a path that matches the target file's path,
'targets' role will be returned as that file's matching role. Delegation path
is expected to be relative to the targets directory. It can be defined as a glob
pattern.
"""
def _map_targets_to_roles(role_name, target_filenames):
roles_targets = {}
delegations = self.get_delegations_info(role_name)
if len(delegations):
for role_info in delegations.get("roles"):
# check if this role can sign target_path
delegated_role_name = role_info["name"]
for path_pattern in role_info["paths"]:
for target_filename in target_filenames:
if fnmatch(
target_filename.lstrip(os.sep),
path_pattern.lstrip(os.sep),
):
roles_targets[target_filename] = delegated_role_name
roles_targets.update(
_map_targets_to_roles(delegated_role_name, target_filenames)
)
return roles_targets
roles_targets = {
target_filename: "targets" for target_filename in target_filenames
}