Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 152 additions & 18 deletions stem/descriptor/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
Purpose of Ed25519 certificate. As new certificate versions are added this
enumeration will expand.

============== ===========
CertType Description
============== ===========
**SIGNING** signing a signing key with an identity key
**LINK_CERT** TLS link certificate signed with ed25519 signing key
**AUTH** authentication key signed with ed25519 signing key
============== ===========
============== ===========
CertType Description
============== ===========
**SIGNING** signing a signing key with an identity key
**LINK_CERT** TLS link certificate signed with ed25519 signing key
**AUTH** authentication key signed with ed25519 signing key
**HS_V3_DESC_SIGNING_KEY** onion service v3 descriptor signing key cert (see rend-spec-v3.txt)
**HS_V3_INTRO_POINT_AUTH_KEY** onion service v3 intro point authentication key cert (see rend-spec-v3.txt)
**HS_V3_INTRO_POINT_ENC_KEY** onion service v3 intro point encryption key cert (see rend-spec-v3.txt)
============== ===========

.. data:: ExtensionType (enum)

Expand Down Expand Up @@ -65,12 +68,19 @@
import stem.prereq
import stem.util.enum
import stem.util.str_tools
import stem.util

from cryptography.hazmat.primitives import serialization

ED25519_HEADER_LENGTH = 40
ED25519_SIGNATURE_LENGTH = 64
ED25519_ROUTER_SIGNATURE_PREFIX = b'Tor router descriptor signature v1'

CertType = stem.util.enum.UppercaseEnum('SIGNING', 'LINK_CERT', 'AUTH')
CertType = stem.util.enum.UppercaseEnum(
'RESERVED_0', 'RESERVED_1', 'RESERVED_2', 'RESERVED_3', 'SIGNING', 'LINK_CERT',
'AUTH', 'RESERVED_RSA', 'HS_V3_DESC_SIGNING_KEY', 'HS_V3_INTRO_POINT_AUTH_KEY',
'HS_V3_INTRO_POINT_ENC_KEY', 'HS_V3_NTOR_ENC')

ExtensionType = stem.util.enum.Enum(('HAS_SIGNING_KEY', 4),)
ExtensionFlag = stem.util.enum.UppercaseEnum('AFFECTS_VALIDATION', 'UNKNOWN')

Expand Down Expand Up @@ -147,19 +157,16 @@ def __init__(self, version, encoded, decoded):
raise ValueError('Ed25519 certificate was %i bytes, but should be at least %i' % (len(decoded), ED25519_HEADER_LENGTH + ED25519_SIGNATURE_LENGTH))

cert_type = stem.util.str_tools._to_int(decoded[1:2])
try:
self.type = CertType.keys()[cert_type]
except IndexError:
raise ValueError('Certificate has wrong cert type')

if cert_type in (0, 1, 2, 3):
# Catch some invalid cert types
if self.type in ('RESERVED_0', 'RESERVED_1', 'RESERVED_2', 'RESERVED_3'):
raise ValueError('Ed25519 certificate cannot have a type of %i. This is reserved to avoid conflicts with tor CERTS cells.' % cert_type)
elif cert_type == 4:
self.type = CertType.SIGNING
elif cert_type == 5:
self.type = CertType.LINK_CERT
elif cert_type == 6:
self.type = CertType.AUTH
elif cert_type == 7:
elif self.type == ('RESERVED_RSA'):
raise ValueError('Ed25519 certificate cannot have a type of 7. This is reserved for RSA identity cross-certification.')
else:
raise ValueError("BUG: Ed25519 certificate type is decoded from one byte. It shouldn't be possible to have a value of %i." % cert_type)

# expiration time is in hours since epoch
try:
Expand Down Expand Up @@ -214,6 +221,9 @@ def is_expired(self):

return datetime.datetime.now() > self.expiration

# ATAGAR XXX certificates are generic and not just for descriptor, however
# this function assumes they are. this needs to be moved to the descriptor
# module. the new verify() function is more generic and should be used.
def validate(self, server_descriptor):
"""
Validates our signing key and that the given descriptor content matches its
Expand Down Expand Up @@ -269,3 +279,127 @@ def validate(self, server_descriptor):
verify_key.verify(signature_bytes, descriptor_sha256_digest)
except InvalidSignature:
raise ValueError('Descriptor Ed25519 certificate signature invalid (Signature was forged or corrupt)')

def get_signing_key(self):
"""
Get the signing key for this certificate. This is included in the extensions.
WARNING: This is the key that signed the certificate, not the key that got
certified.

:returns: Raw bytes of an ed25519 key.

:raises: **ValueError** if the signing key cannot be found.
"""
signing_key_extension = None

for extension in self.extensions:
if extension.type == ExtensionType.HAS_SIGNING_KEY:
signing_key_extension = extension
break

if not signing_key_extension:
raise ValueError('Signing key extension could not be found')

if (len(signing_key_extension.data) != 32):
raise ValueError('Signing key extension has malformed key')

return signing_key_extension.data

class MyED25519Certificate(object):
def __init__(self, version, cert_type, expiration_date,
cert_key_type, certified_pub_key,
signing_priv_key, include_signing_key):
"""
:var int version
:var int cert_type
:var CertType cert_type
:var datetime expiration_date
:var int cert_key_type
:var ED25519PublicKey certified_pub_key
:var ED25519PrivateKey signing_priv_key
:var bool include_signing_key
"""
self.version = version
self.cert_type = cert_type
self.expiration_date = expiration_date
self.cert_key_type = cert_key_type
self.certified_pub_key = certified_pub_key

self.signing_priv_key = signing_priv_key
self.signing_pub_key = signing_priv_key.public_key()

self.include_signing_key = include_signing_key
# XXX validate params

def _get_certificate_signature(self, msg_body):
return self.signing_priv_key.sign(msg_body)

def _get_cert_extensions_bytes(self):
n_extensions = 0

# If we need to include the signing key, let's create the extension body
# ExtLength [2 bytes]
# ExtType [1 byte]
# ExtFlags [1 byte]
# ExtData [ExtLength bytes]
if self.include_signing_key:
n_extensions += 1

signing_pubkey_bytes = self.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw)

ext_length = len(signing_pubkey_bytes)
ext_type = 4
ext_flags = 0
ext_data = signing_pubkey_bytes

# Now build the actual byte representation of any extensions
ext_obj = bytes()
ext_obj += n_extensions.to_bytes(1, 'big')

if self.include_signing_key:
ext_obj += ext_length.to_bytes(2, 'big')
ext_obj += ext_type.to_bytes(1, 'big')
ext_obj += ext_flags.to_bytes(1, 'big')
ext_obj += ext_data

return ext_obj

def encode(self):
"""Return a bytes representation of this certificate."""
obj = bytes()

# Encode VERSION
obj += self.version.to_bytes(1, 'big')

# Encode CERT_TYPE
try:
cert_type_int = CertType.index_of(self.cert_type)
except ValueError:
raise ValueError("Bad cert type %s" % self.cert_type)

obj += cert_type_int.to_bytes(1, 'big')

# Encode EXPIRATION_DATE
expiration_seconds_since_epoch = stem.util.datetime_to_unix(self.expiration_date)
expiration_hours_since_epoch = int(expiration_seconds_since_epoch) // 3600
obj += expiration_hours_since_epoch.to_bytes(4, 'big')

# Encode CERT_KEY_TYPE
obj += self.cert_key_type.to_bytes(1, 'big')

# Encode CERTIFIED_KEY
certified_pub_key_bytes = self.certified_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw)
assert(len(certified_pub_key_bytes) == 32)
obj += certified_pub_key_bytes

# Encode N_EXTENSIONS and EXTENSIONS
obj += self._get_cert_extensions_bytes()

# Do the signature on the body we have so far
obj += self._get_certificate_signature(obj)

return obj


108 changes: 106 additions & 2 deletions stem/descriptor/hidden_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ def _parse_introduction_points(content):

return introduction_points

import stem.descriptor.certificate
import stem.descriptor.hsv3_crypto as hsv3_crypto
from cryptography.hazmat.primitives import serialization

class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
"""
Expand Down Expand Up @@ -509,27 +512,92 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
}

@classmethod
def content(cls, attr = None, exclude = (), sign = False):
def content(cls, attr = None, exclude = (), sign = False, ed25519_private_identity_key = None):
"""
Creates descriptor content with the given attributes. Mandatory fields are
filled with dummy information unless data is supplied. This doesn't yet
create a valid signature.

.. versionadded:: 1.6.0

:param dict attr: keyword/value mappings to be included in the descriptor
:param list exclude: mandatory keywords to exclude from the descriptor, this
results in an invalid descriptor
:param bool sign: includes cryptographic signatures and digests if True
:param ED25519PrivateKey ed25519_private_identity_key: the private identity key of
this onion service

:returns: **str** with the content of a descriptor

:raises:
* **ImportError** if cryptography is unavailable and sign is True
* **NotImplementedError** if not implemented for this descriptor type
"""
if sign:
raise NotImplementedError('Signing of %s not implemented' % cls.__name__)

# We need an private identity key for the onion service to create its
# descriptor. We could make a new one on the spot, but we also need to
# return it to the caller, otherwise the caller will have no way to decode
# the descriptor without knowing the private key or the onion address, so
# for now we consider it a mandatory argument.
if not ed25519_private_identity_key:
raise ValueError('Need to provide a private ed25519 identity key to create a descriptor')


return _descriptor_content(attr, exclude, (
('hs-descriptor', '3'),
('descriptor-lifetime', '180'),
# here we need to write a crypto blob
('descriptor-signing-key-cert', _random_crypto_blob('ED25519 CERT')),
# here we need the OEP scheme
('revision-counter', '15'),
# here we need the encrypted blob
('superencrypted', _random_crypto_blob('MESSAGE')),
# here we need an actual signature
('signature', 'wdc7ffr+dPZJ/mIQ1l4WYqNABcmsm6SHW/NL3M3wG7bjjqOJWoPR5TimUXxH52n5Zk0Gc7hl/hz3YYmAx5MvAg'),
), ())

@classmethod
def create(cls, attr = None, exclude = (), validate = True, sign = False):
"""
Creates a descriptor with the given attributes. Mandatory fields are filled
with dummy information unless data is supplied. This doesn't yet create a
valid signature.

.. versionadded:: 1.6.0

:param dict attr: keyword/value mappings to be included in the descriptor
:param list exclude: mandatory keywords to exclude from the descriptor, this
results in an invalid descriptor
:param bool validate: checks the validity of the descriptor's content if
**True**, skips these checks otherwise
:param bool sign: includes cryptographic signatures and digests if True

:returns: :class:`~stem.descriptor.Descriptor` subclass

:raises:
* **ValueError** if the contents is malformed and validate is True
* **ImportError** if cryptography is unavailable and sign is True
* **NotImplementedError** if not implemented for this descriptor type
"""
# Create a string-representation of the descriptor and then parse it
# immediately to create an object.
return cls(cls.content(attr, exclude, sign), validate = validate, skip_crypto_validation = not sign)

def __init__(self, raw_contents, validate = False, skip_crypto_validation = False):
def __init__(self, raw_contents, validate = False, onion_address = None, skip_crypto_validation = False):
"""
The onion_address is needed so that we can decrypt the descriptor, which is
impossible without the full onion address.
"""
super(HiddenServiceDescriptorV3, self).__init__(raw_contents, lazy_load = not validate)
entries = _descriptor_components(raw_contents, validate)

if onion_address == None:
raise ValueError("The onion address MUST be provided to parse a V3 descriptor")
self.onion_address = onion_address

# XXX Do this parsing in its own function
if validate:
for keyword in REQUIRED_V3_FIELDS:
if keyword not in entries:
Expand All @@ -546,6 +614,42 @@ def __init__(self, raw_contents, validate = False, skip_crypto_validation = Fals
else:
self._entries = entries

# ATAGAR XXX need to do this cert extraction in the parsing handler
assert(self.signing_cert)
cert_lines = self.signing_cert.split('\n')
assert(cert_lines[0] == '-----BEGIN ED25519 CERT-----' and cert_lines[-1] == '-----END ED25519 CERT-----')
desc_signing_cert = stem.descriptor.certificate.Ed25519Certificate.parse(''.join(cert_lines[1:-1]))

# crypto validation (check skip_crypto_validation)
# ASN XXX need to verify descriptor signing certificate (for now we trust Tor to do it)
# ASN XXX need to verify descriptor signature (for now we trust Tor to do it)

plaintext = self.decrypt_descriptor(desc_signing_cert)

def decrypt_descriptor(self, desc_signing_cert):
# Get crypto material.
# ASN XXX Extract to its own function and assign them to class variables
blinded_key_bytes = desc_signing_cert.get_signing_key()
identity_public_key = hsv3_crypto.decode_address(self.onion_address)
identity_public_key_bytes = identity_public_key.public_bytes(encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw)
assert(len(identity_public_key_bytes) == 32)
assert(len(blinded_key_bytes) == 32)

subcredential_bytes = hsv3_crypto.get_subcredential(identity_public_key_bytes, blinded_key_bytes)

####################################### Do the decryption ###################################

outter_layer_plaintext = hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter,
identity_public_key_bytes, blinded_key_bytes, subcredential_bytes)

# ATAGAR XXX this parsing function is a hack. need to replace it with some stem parsing.
inner_layer_ciphertext = hsv3_crypto.parse_superencrypted_plaintext(outter_layer_plaintext)

inner_layer_plaintext = hsv3_crypto.decrypt_inner_layer(inner_layer_ciphertext, self.revision_counter,
identity_public_key_bytes, blinded_key_bytes, subcredential_bytes)

print(inner_layer_plaintext)

# TODO: drop this alias in stem 2.x

Expand Down
Loading