forked from scionproto/scion
/
trc.py
540 lines (491 loc) · 20 KB
/
trc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# Copyright 2014 ETH Zurich
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
:mod:`trc` --- SCION TRC parser
===============================================
"""
# Stdlib
import base64
import copy
import json
import logging
import time
# External
import lz4
# SCION
from lib.crypto.asymcrypto import verify, sign
from lib.packet.scion_addr import ISD_AS
ISDID_STRING = 'ISDID'
DESCRIPTION_STRING = 'Description'
VERSION_STRING = 'Version'
CREATION_TIME_STRING = 'CreationTime'
EXPIRATION_TIME_STRING = 'ExpirationTime'
CORE_ASES_STRING = 'CoreCAs'
ROOT_CAS_STRING = 'RootCAs'
PKI_LOGS_STRING = 'PKILogs'
QUORUM_EEPKI_STRING = 'QuorumEEPKI'
RAINS_STRING = 'RAINS'
ROOT_RAINS_KEY_STRING = 'RootRainsKey'
QUORUM_OWN_TRC_STRING = 'QuorumOwnTRC'
QUORUM_CAS_STRING = 'QuorumCAs'
QUARANTINE_STRING = 'Quarantine'
SIGNATURES_STRING = 'Signatures'
GRACE_PERIOD_STRING = 'GracePeriod'
ONLINE_KEY_ALG_STRING = 'OnlineKeyAlg'
ONLINE_KEY_STRING = 'OnlineKey'
OFFLINE_KEY_ALG_STRING = 'OfflineKeyAlg'
OFFLINE_KEY_STRING = 'OfflineKey'
CERTIFICATE_STRING = 'Certificate'
class TRC(object):
"""
The TRC class parses the TRC file of an ISD and stores such
information for further use.
:ivar int isd: the ISD identifier.
:ivar str description: is a human readable description of an ISD.
:ivar int version: the TRC file version.
:ivar int creation_time: the TRC file creation timestamp.
:ivar int expiration_time: the time when TRC expires.
:ivar dict core_ases: the set of core ASes and their certificates.
:ivar dict root_cas: the set of root CAs and their certificates.
:ivar dict pki_logs: is a dictionary of end entity certificate logs, and
their addresses and public key certificates
:ivar int quroum_eepki: is a threshold number (nonnegative integer) of
CAs that have to sign a domain’s policy
:ivar str root_rains_key: the RAINS root public key.
:ivar int quorum_own_trc: number of core ASes necessary to sign a new TRC.
:ivar int quorum_cas: number of CAs necessary to change CA entries
:ivar int grace_period: defines for how long this TRC is valid when a new
TRC is available
:ivar bool quarantine: flag defining whether TRC is valid(quarantine=false)
or an early annoncement(quarantine=true)
:ivar dict signatures: signatures generated by a quorum of trust roots.
"""
FIELDS_MAP = {
ISDID_STRING: ("isd", int),
DESCRIPTION_STRING: ("description", str),
VERSION_STRING: ("version", int),
CREATION_TIME_STRING: ("time", int),
EXPIRATION_TIME_STRING: ("exp_time", int),
CORE_ASES_STRING: ("core_ases", dict),
ROOT_CAS_STRING: ("root_cas", dict),
PKI_LOGS_STRING: ("pki_logs", dict),
QUORUM_EEPKI_STRING: ("quorum_eepki", int),
RAINS_STRING: ("rains", dict),
QUORUM_OWN_TRC_STRING: ("quorum_own_trc", int),
QUORUM_CAS_STRING: ("quorum_cas", int),
QUARANTINE_STRING: ("quarantine", bool),
SIGNATURES_STRING: ("signatures", dict),
GRACE_PERIOD_STRING: ("grace_period", int),
}
DEFAULT_VALIDITY = 365 * 24 * 60 * 60
def __init__(self, trc_dict):
"""
:param dict trc_dict: TRC as dict.
"""
for k, (name, type_) in self.FIELDS_MAP.items():
val = trc_dict[k]
if type_ in (int,):
val = int(val)
elif type_ in (dict, ):
val = copy.deepcopy(val)
elif type_ in (bytes, ):
val = base64.b64decode(val.encode('utf-8'))
setattr(self, name, val)
for subject in trc_dict[CORE_ASES_STRING]:
key = trc_dict[CORE_ASES_STRING][subject][ONLINE_KEY_STRING]
self.core_ases[subject][ONLINE_KEY_STRING] = base64.b64decode(key.encode('utf-8'))
key = trc_dict[CORE_ASES_STRING][subject][OFFLINE_KEY_STRING]
self.core_ases[subject][OFFLINE_KEY_STRING] = base64.b64decode(key.encode('utf-8'))
for subject in trc_dict[SIGNATURES_STRING]:
sig = trc_dict[SIGNATURES_STRING][subject]
self.signatures[subject] = base64.b64decode(sig.encode('utf-8'))
for subject in trc_dict[ROOT_CAS_STRING]:
key = trc_dict[ROOT_CAS_STRING][subject][CERTIFICATE_STRING]
self.root_cas[subject][CERTIFICATE_STRING] = base64.b64decode(key.encode('utf-8'))
key = trc_dict[ROOT_CAS_STRING][subject][ONLINE_KEY_STRING]
self.root_cas[subject][ONLINE_KEY_STRING] = base64.b64decode(key.encode('utf-8'))
if trc_dict[RAINS_STRING]:
key = trc_dict[RAINS_STRING][ROOT_RAINS_KEY_STRING]
self.rains[ROOT_RAINS_KEY_STRING] = base64.b64decode(key.encode('utf-8'))
key = trc_dict[RAINS_STRING][ONLINE_KEY_STRING]
self.rains[ONLINE_KEY_STRING] = base64.b64decode(key.encode('utf-8'))
def get_isd_ver(self):
return self.isd, self.version
def get_core_ases(self):
res = []
for key in self.core_ases:
res.append(ISD_AS(key))
return res
def dict(self, with_signatures):
"""
Return the TRC information.
:param bool with_signatures:
If True, include signatures in the return value.
:returns: the TRC information.
:rtype: dict
"""
trc_dict = {}
for k, (name, _) in self.FIELDS_MAP.items():
trc_dict[k] = getattr(self, name)
if not with_signatures:
del trc_dict[SIGNATURES_STRING]
return trc_dict
@classmethod
def from_raw(cls, trc_raw, lz4_=False):
if lz4_:
trc_raw = lz4.loads(trc_raw).decode("utf-8")
trc = json.loads(trc_raw)
return TRC(trc)
@classmethod
def from_values(cls, isd, description, version, core_ases, root_cas,
pki_logs, quorum_eepki, rains, quorum_own_trc,
quorum_cas, grace_period, quarantine, signatures):
"""
Generate a TRC instance.
"""
now = int(time.time())
trc_dict = {
ISDID_STRING: isd,
DESCRIPTION_STRING: description,
VERSION_STRING: version,
CREATION_TIME_STRING: now,
EXPIRATION_TIME_STRING: now + cls.DEFAULT_VALIDITY,
CORE_ASES_STRING: core_ases,
ROOT_CAS_STRING: root_cas,
PKI_LOGS_STRING: pki_logs,
QUORUM_EEPKI_STRING: quorum_eepki,
RAINS_STRING: rains,
QUORUM_OWN_TRC_STRING: quorum_own_trc,
QUORUM_CAS_STRING: quorum_cas,
GRACE_PERIOD_STRING: grace_period,
QUARANTINE_STRING: quarantine,
SIGNATURES_STRING: signatures,
}
trc = TRC(trc_dict)
return trc
def sign(self, isd_as, sig_priv_key):
data = self._sig_input()
self.signatures[isd_as] = sign(data, sig_priv_key)
def verify(self, old_trc):
"""
Perform signature verification for core signatures as defined
in old TRC.
:param: old_trc: the previous TRC which has already been verified.
:returns: True if verification succeeds, false otherwise.
:rtype: bool
"""
# Only look at signatures which are from core ASes as defined in old TRC
signatures = {k: self.signatures[k] for k in old_trc.core_ases.keys()}
# We have more signatures than the number of core ASes in old TRC
if len(signatures) < len(self.signatures):
logging.warning("TRC has more signatures than number of core ASes.")
valid_signature_signers = set()
# Add every signer to this set whose signature was verified successfully
for signer in signatures:
public_key = self.core_ases[signer].subject_sig_key_raw
if self.verify_signature(signatures[signer], public_key):
valid_signature_signers.add(signer)
else:
logging.warning("TRC contains a signature which could not be verified.")
# We have fewer valid signatrues for this TRC than quorum_own_trc
if len(valid_signature_signers) < old_trc.quorum_own_trc:
logging.error("TRC does not have the number of required valid signatures")
return False
logging.debug("TRC verified.")
return True
def verify_signature(self, signature, public_key):
"""
Checks if the signature can be verified with the given public key for a
single signature
:returns: True if the given signature could be verified with the
given key, False otherwise
:rtype bool
"""
if not verify(self._sig_input(), signature, public_key):
return False
return True
def _sig_input(self):
d = self.dict(False)
for k in d:
if self.FIELDS_MAP[k][1] == str:
d[k] = base64.b64encode(d[k].encode('utf-8')).decode('utf-8')
elif self.FIELDS_MAP[k][1] == dict:
d[k] = self._encode_dict(d[k])
elif self.FIELDS_MAP[k][1] == bytes:
d[k] = base64.b64encode(d[k]).decode('utf-8')
j = json.dumps(d, sort_keys=True, separators=(',', ':'))
return j.encode('utf-8')
def _encode_dict(self, dict_):
encoded_dict = {}
for key_ in dict_:
if type(dict_[key_]) is str:
encoded_dict[key_] = base64.b64encode(dict_[key_].encode('utf-8')).decode('utf-8')
return encoded_dict
def to_json(self, with_signatures=True):
"""
Convert the instance to json format.
"""
trc_dict = copy.deepcopy(self.dict(with_signatures))
key = trc_dict[RAINS_STRING][ONLINE_KEY_STRING]
trc_dict[RAINS_STRING][ONLINE_KEY_STRING] = base64.b64encode(key).decode('utf-8')
key = trc_dict[RAINS_STRING][ROOT_RAINS_KEY_STRING]
trc_dict[RAINS_STRING][ROOT_RAINS_KEY_STRING] = base64.b64encode(key).decode('utf-8')
core_ases = {}
for subject in trc_dict[CORE_ASES_STRING]:
d = trc_dict[CORE_ASES_STRING][subject]
for key in (ONLINE_KEY_STRING, OFFLINE_KEY_STRING, ):
key_ = trc_dict[CORE_ASES_STRING][subject][key]
d[key] = base64.b64encode(key_).decode('utf-8')
core_ases[subject] = d
trc_dict[CORE_ASES_STRING] = core_ases
root_cas = {}
for subject in trc_dict[ROOT_CAS_STRING]:
d = trc_dict[ROOT_CAS_STRING][subject]
for key in (ONLINE_KEY_STRING, CERTIFICATE_STRING, ):
key_ = trc_dict[ROOT_CAS_STRING][subject][key]
d[key] = base64.b64encode(key_).decode('utf-8')
root_cas[subject] = d
trc_dict[ROOT_CAS_STRING] = root_cas
if with_signatures:
signatures = {}
for subject in trc_dict[SIGNATURES_STRING]:
signature = trc_dict[SIGNATURES_STRING][subject]
signatures[subject] = base64.b64encode(
signature).decode('utf-8')
trc_dict[SIGNATURES_STRING] = signatures
trc_str = json.dumps(trc_dict, sort_keys=True, indent=4)
return trc_str
def get_ca_sigs(self):
"""
Returns a list of tuples (isd, ca name, ca signature) for all CA
signatures on this TRC
"""
cas = []
for subject, signature in self.signatures.items():
res = self._parse_subject_str(subject)
if not res:
continue
type_, isd, ca_name = res
if type_ == "CA":
cas.append((int(isd), ca_name, signature))
return cas
def get_rains_sigs(self):
"""
Returns a list of tuples (isd, rains signature) for all RAINS signatures
on this TRC
"""
rains = []
for subject, signature in self.signatures.items():
res = self._parse_subject_str(subject)
if not res:
continue
type_, isd, _ = res
if type_ == "RAINS":
rains.append((int(isd), signature))
return rains
def get_as_sigs(self):
"""
Returns a list of tuples (isd_as, as signature) for all AS signatures
on this TRC
"""
ases = []
for subject, signature in self.signatures.items():
res = self._parse_subject_str(subject)
if not res:
continue
type_, isd_as, _ = res
if type_ == "AS":
ases.append((isd_as, signature))
return ases
def get_neighbors(self):
"""
Parses the signature subjects and returns a list of all
ISDs which signed this TRC.
"""
neighbors = set()
for subject, signature in self.signatures.items():
res = self._parse_subject_str(subject)
if not res:
continue
_, isd, _ = res
if isinstance(isd, ISD_AS):
isd = isd[0]
neighbors.add(isd)
return neighbors
def _parse_subject_str(self, subject):
"""
Parses the subject string only for cross signatures.
The subject strings have the different forms depending on subject.
CA entry begins with the string "ISD x, CA:", on which the CAs name follows.
RAINS entry begins with the string "ISD x, RAINS:"
Core AS entry contains the SCION name of the AS.
"""
sub = subject.split(',', 1)
# We have a CA or rains as subject
if sub[0].split(' ')[0] == "ISD":
isd = sub[0].split(' ')[1]
if not isd.isdigit() or len(sub) < 2:
logging.error("Cannot parse subject: %s" % subject)
return
if sub[1].strip() == "RAINS":
return "RAINS", isd, ""
elif sub[1].strip().startswith('CA:'):
ca = sub[1].split(':')[1].strip()
return "CA", isd, ca
else:
logging.error("Cannot parse subject: %s" % subject)
return
# We have any AS
else:
try:
isd_as = ISD_AS(sub[0])
return "AS", isd_as, ""
except:
logging.error("Cannot parse subject: %s" % subject)
return
def pack(self, lz4_=False):
ret = self.to_json().encode('utf-8')
if lz4_:
return lz4.dumps(ret)
return ret
def __str__(self):
return self.to_json()
def __eq__(self, other): # pragma: no cover
return str(self) == str(other)
def verify_new_trc(old_trc, new_trc):
"""
Check if update from current TRC to updated TRC is valid. Checks if update
is correct and checks if the new TRC has enough valid signatures as defined
in the current TRC.
:returns: True if update is valid, False otherwise
"""
# Check if update is correct
if old_trc.isd != new_trc.isd:
logging.error("TRC isdid mismatch")
return False
if old_trc.version + 1 != new_trc.version:
logging.error("TRC versions mismatch")
return False
if new_trc.time < old_trc.time:
logging.error("New TRC timestamp is not valid")
return False
if old_trc.exp_time >= time.time():
logging.error("Current TRC expired")
return False
if new_trc.exp_time >= time.time():
logging.error("New TRC expired")
return False
if new_trc.quarantine or old_trc.quarantine:
logging.error("Early announcement")
return False
# Check if there are enough valid signatures for new TRC
if not new_trc.verify(old_trc):
logging.error("New TRC verification failed, missing or invalid signatures")
return False
logging.debug("New TRC verified")
return True
def verify_trc_chain(local_trc, verified_rem_trcs, rem_trc):
"""
Checks if remote TRC can be verified using local TRC or already
verified remote TRCs. i.e. checks if there is a trust chain between
local TRC and remote TRC.
:param TRC local_trc: The local TRC to this ISD.
:param List(TRC) verified_rem_trcs: Already verified remote TRCs.
:param TRC rem_trc: Remote TRC to verify.
:returns: True if rem_trc can be verified, false otherwise.
"""
# Get neighbors of remote TRC
rem_nbs = rem_trc.get_neighbors()
if local_trc.isd in rem_nbs:
# Try to verify with local TRC
if verify_trc_xsigs(local_trc, rem_trc) and verify_trc_xsigs(rem_trc, local_trc):
return True
# Only take TRCs that are neighbors of remote TRC
ver_trcs = [trc for trc in verified_rem_trcs if trc.isd in rem_nbs]
for trc in ver_trcs:
if verify_trc_xsigs(trc, rem_trc) and verify_trc_xsigs(rem_trc, trc):
return True
return False
def verify_trc_xsigs(src_trc, dst_trc):
"""
Check if dst_trc is signed correctly by the ISD src_trc belongs to.
:param TRC src_trc: The signing ISD's TRC.
:param TRC dst_trc: The TRC whose signatures need to be checked.
:returns: True if dst_trc is signed correctly by src_trc, False otherwise.
"""
assert isinstance(src_trc, TRC)
assert isinstance(dst_trc, TRC)
if src_trc.isd == dst_trc.isd:
logging.warning("TRCs are from the same ISD.")
return False
return (verify_core_as_xsigs(src_trc, dst_trc) and
verify_rains_xsigs(src_trc, dst_trc) and
verify_ca_xsigs(src_trc, dst_trc))
def verify_core_as_xsigs(src_trc, dst_trc):
"""
Checks if dst_trc is signed by a core AS in src_trc.
:param TRC src_trc: The signing ISD's TRC.
:param TRC dst_trc: The TRC whose signatures need to be checked.
:returns: True if dst_trc has a valid signature of a core AS in src_trc.
False otherwise.
"""
as_sigs = dst_trc.get_as_sigs()
for isd_as, signature in as_sigs:
if isd_as[0] != src_trc.isd:
continue
pub_key = src_trc.core_ases[str(isd_as)][ONLINE_KEY_STRING]
if dst_trc.verify_signature(signature, pub_key):
return True
else:
logging.error("TRC(ISD %s) contains invalid signature from core AS (ISD %s)"
% (dst_trc.isd, src_trc.isd))
return False
def verify_rains_xsigs(src_trc, dst_trc):
"""
Checks if dst_trc is signed by RAINS in src_trc.
:param TRC src_trc: The signing ISD's TRC.
:param TRC dst_trc: The TRC whose signatures need to be checked.
:returns: True if dst_trc has a valid signature of RAINS in src_trc.
False otherwise.
"""
rains_sigs = dst_trc.get_rains_sigs()
for isd, signature in rains_sigs:
if isd != src_trc.isd:
continue
pub_key = src_trc.rains[ONLINE_KEY_STRING]
if dst_trc.verify_signature(signature, pub_key):
return True
else:
logging.error("TRC(ISD %s) contains invalid signature from RAINS (ISD %s)"
% (dst_trc.isd, src_trc.isd))
return False
def verify_ca_xsigs(src_trc, dst_trc):
"""
Checks if dst_trc is signed by a CA in src_trc.
:param TRC src_trc: The signing ISD's TRC.
:param TRC dst_trc: The TRC whose signatures need to be checked.
:returns: True if dst_trc has a valid signature of a CA in src_trc.
False otherwise.
"""
ca_sigs = dst_trc.get_ca_sigs()
for isd, ca_name, signature in ca_sigs:
if isd != src_trc.isd:
continue
pub_key = src_trc.root_cas[ca_name][ONLINE_KEY_STRING]
if dst_trc.verify_signature(signature, pub_key):
return True
else:
logging.error("Remote TRC(ISD %s) contains invalid signature from CA (ISD %s)"
% (dst_trc.isd, src_trc.isd))
return False