Skip to content

Commit

Permalink
Merge b70d644 into 4521c5c
Browse files Browse the repository at this point in the history
  • Loading branch information
inikolcev committed Aug 12, 2020
2 parents 4521c5c + b70d644 commit f2d1ab2
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 50 deletions.
8 changes: 4 additions & 4 deletions tests/tlstest.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ def connect():
try:
connection.handshakeClientCert(settings=settings)
assert False
except TLSLocalAlert as e:
assert "certificate with curve" in str(e)
except TLSRemoteAlert as e:
assert "handshake_failure" in str(e)
connection.close()

test_no += 1
Expand Down Expand Up @@ -1665,8 +1665,8 @@ def connect():
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert False
except TLSRemoteAlert as e:
assert "handshake_failure" in str(e)
except TLSLocalAlert as e:
assert "curve in the public key is not supported by the client" in str(e)
connection.close()

test_no += 1
Expand Down
47 changes: 45 additions & 2 deletions tlslite/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
# while supporting TLS 1.3 or greater
TLS_1_2_DOWNGRADE_SENTINEL = a2b_hex("444F574E47524401")

RSA_PSS_OID = bytes(a2b_hex('06092a864886f70d01010a'))


class TLSEnum(object):
"""Base class for different enums of TLS IDs"""
Expand Down Expand Up @@ -216,6 +218,8 @@ class SignatureScheme(TLSEnum):
rsa_pkcs1_sha256 = (4, 1)
rsa_pkcs1_sha384 = (5, 1)
rsa_pkcs1_sha512 = (6, 1)
ecdsa_sha1 = (2, 3)
ecdsa_sha224 = (3, 3)
ecdsa_secp256r1_sha256 = (4, 3)
ecdsa_secp384r1_sha384 = (5, 3)
ecdsa_secp521r1_sha512 = (6, 3)
Expand Down Expand Up @@ -287,15 +291,54 @@ def getHash(scheme):
except AttributeError:
raise ValueError("\"{0}\" scheme is unknown".format(scheme))
vals = scheme.split('_', 4)
assert len(vals) in (3, 4)
if len(vals) == 3:
assert len(vals) in (2, 3, 4)
if len(vals) == 2:
kType, hName = vals
elif len(vals) == 3:
kType, _, hName = vals
else:
kType, _, _, hName = vals
assert kType in ('rsa', 'ecdsa')
return hName


class AlgorithmOID(TLSEnum):
"""
Algorithm OIDs as defined in rfc5758(ecdsa),
rfc5754(rsa, sha), rfc3447(rss-pss).
The key is the DER encoded OID in hex and
the value is the algorithm id.
"""
oid = {}

oid[bytes(a2b_hex('06072a8648ce3d0401'))] = \
SignatureScheme.ecdsa_sha1
oid[bytes(a2b_hex('06082a8648ce3d040301'))] = \
SignatureScheme.ecdsa_sha224
oid[bytes(a2b_hex('06082a8648ce3d040302'))] = \
SignatureScheme.ecdsa_secp256r1_sha256
oid[bytes(a2b_hex('06082a8648ce3d040303'))] = \
SignatureScheme.ecdsa_secp384r1_sha384
oid[bytes(a2b_hex('06082a8648ce3d040304'))] = \
SignatureScheme.ecdsa_secp521r1_sha512
oid[bytes(a2b_hex('06092a864886f70d010105'))] = \
SignatureScheme.rsa_pkcs1_sha1
oid[bytes(a2b_hex('06092a864886f70d01010e'))] = \
SignatureScheme.rsa_pkcs1_sha224
oid[bytes(a2b_hex('06092a864886f70d01010b'))] = \
SignatureScheme.rsa_pkcs1_sha256
oid[bytes(a2b_hex('06092a864886f70d01010c'))] = \
SignatureScheme.rsa_pkcs1_sha384
oid[bytes(a2b_hex('06092a864886f70d01010d'))] = \
SignatureScheme.rsa_pkcs1_sha512
oid[bytes(a2b_hex('300b0609608648016503040201'))] = \
SignatureScheme.rsa_pss_rsae_sha256
oid[bytes(a2b_hex('300b0609608648016503040202'))] = \
SignatureScheme.rsa_pss_rsae_sha384
oid[bytes(a2b_hex('300b0609608648016503040203'))] = \
SignatureScheme.rsa_pss_rsae_sha512


class GroupName(TLSEnum):
"""Name of groups supported for (EC)DH key exchange"""

Expand Down
208 changes: 167 additions & 41 deletions tlslite/tlsconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3255,19 +3255,6 @@ def _serverGetClientHello(self, settings, private_key, cert_chain,
for result in self._sendMsg(alert):
yield result

try:
sig_scheme, cert_chain, private_key = \
self._pickServerKeyExchangeSig(settings,
clientHello,
cert_chain,
private_key,
version)
except TLSHandshakeFailure as alert:
for result in self._sendError(
AlertDescription.handshake_failure,
str(alert)):
yield result

#Check if there's intersection between supported curves by client and
#server
clientGroups = clientHello.getExtension(ExtensionType.supported_groups)
Expand Down Expand Up @@ -3379,8 +3366,7 @@ def _serverGetClientHello(self, settings, private_key, cert_chain,
cipherSuites = CipherSuite.filterForVersion(cipherSuites,
minVersion=version,
maxVersion=version)
cipherSuites = CipherSuite.filter_for_certificate(cipherSuites,
cert_chain)

#If resumption was requested and we have a session cache...
if clientHello.session_id and sessionCache:
session = None
Expand Down Expand Up @@ -3552,29 +3538,26 @@ def _serverGetClientHello(self, settings, private_key, cert_chain,
#
#Given the current ciphersuite ordering, this means we prefer SRP
#over non-SRP.
for cipherSuite in cipherSuites:
if cipherSuite in clientHello.cipher_suites:
break
else:
if clientGroups and \
any(i in range(256, 512) for i in clientGroups) and \
any(i in CipherSuite.dhAllSuites
for i in clientHello.cipher_suites):
for result in self._sendError(
AlertDescription.insufficient_security,
"FFDHE groups not acceptable and no other common "
"ciphers"):
yield result
else:
for result in self._sendError(\
AlertDescription.handshake_failure,
"No mutual ciphersuite"):
yield result
if cipherSuite in CipherSuite.srpAllSuites and \
not clientHello.srp_username:
for result in self._sendError(\
AlertDescription.unknown_psk_identity,
"Client sent a hello, but without the SRP username"):

try:
cipherSuite, sig_scheme, cert_chain, private_key = \
self._server_select_certificate(settings, clientHello,
cipherSuites, cert_chain,
private_key, version)
except TLSHandshakeFailure as err:
for result in self._sendError(
AlertDescription.handshake_failure,
str(err)):
yield result
except TLSInsufficientSecurity as err:
for result in self._sendError(
AlertDescription.insufficient_security,
str(err)):
yield result
except TLSIllegalParameterException as err:
for result in self._sendError(
AlertDescription.illegal_parameter,
str(err)):
yield result

#If an RSA suite is chosen, check for certificate type intersection
Expand Down Expand Up @@ -3845,6 +3828,145 @@ def _serverSRPKeyExchange(self, clientHello, serverHello, verifierDB,

yield premasterSecret, privateKey, serverCertChain

def _server_select_certificate(self, settings, client_hello,
cipher_suites, cert_chain,
private_key, version):
"""
This method makes the decision on which certificate/key pair,
signature algorithm and cipher to use based on the certificate.
"""

last_cert = False
possible_certs = []

# Get client groups
client_groups = client_hello. \
getExtension(ExtensionType.supported_groups)
if client_groups is not None:
client_groups = client_groups.groups

# If client did send signature_algorithms_cert use it,
# otherwise fallback to signature_algorithms.
# Client can also decide not to send sigalg extension
client_sigalgs = \
client_hello. \
getExtension(ExtensionType.signature_algorithms_cert)
if client_sigalgs is not None:
client_sigalgs = \
client_hello. \
getExtension(ExtensionType.signature_algorithms_cert). \
sigalgs
else:
client_sigalgs = \
client_hello. \
getExtension(ExtensionType.signature_algorithms)
if client_sigalgs is not None:
client_sigalgs = \
client_hello. \
getExtension(ExtensionType.signature_algorithms). \
sigalgs
else:
client_sigalgs = []

# Get all the certificates we can offer
alt_certs = ((X509CertChain(i.certificates), i.key) for vh in
settings.virtual_hosts for i in vh.keys)
certs = [(cert, key)
for cert, key in chain([(cert_chain, private_key)], alt_certs)]

for cert, key in certs:

# Check if this is the last (cert, key) pair we have to check
if (cert, key) == certs[-1]:
last_cert = True

# Mandatory checks. If any one of these checks fail, the certificate
# is not usuable.
try:
# Find a suitable ciphersuite based on the certificate
ciphers = CipherSuite.filter_for_certificate(cipher_suites, cert)
for cipher in ciphers:
if cipher in client_hello.cipher_suites:
break
else:
if client_groups and \
any(i in range(256, 512) for i in client_groups) and \
any(i in CipherSuite.dhAllSuites
for i in client_hello.cipher_suites):
raise TLSInsufficientSecurity(
"FFDHE groups not acceptable and no other common "
"ciphers")
raise TLSHandshakeFailure("No mutual ciphersuite")

# Find a signature algorithm based on the certificate
try:
sig_scheme, _, _ = \
self._pickServerKeyExchangeSig(settings,
client_hello,
cert,
key,
version,
False)
except TLSHandshakeFailure:
raise TLSHandshakeFailure(
"No common signature algorithms")

# If the certificate is ECDSA, we must check curve compatibility
if cert and cert.x509List[0].certAlg == 'ecdsa' and \
client_groups and client_sigalgs:
public_key = cert.getEndEntityPublicKey()
curve = public_key.curve_name
for name, aliases in CURVE_ALIASES.items():
if curve in aliases:
curve = getattr(GroupName, name)
break

if version <= (3, 3) and curve not in client_groups:
raise TLSHandshakeFailure(
"The curve in the public key is not "
"supported by the client: {0}" \
.format(GroupName.toRepr(curve)))

if version >= (3, 4):
if GroupName.toRepr(curve) not in \
('secp256r1', 'secp384r1', 'secp521r1'):
raise TLSIllegalParameterException(
"Curve in public key is not supported "
"in TLS1.3")

# If all mandatory checks passed add
# this as possible certificate we can use.
possible_certs.append((cipher, sig_scheme, cert, key))

except Exception:
if last_cert and not possible_certs:
raise
continue

# Non-mandatory checks, if these fail the certificate is still usable
# but we should try to find one that passes all the checks

# Check if every certificate(except the self-signed root CA)
# in the certificate chain is signed with a signature algorithm
# supported by the client.
if cert:
cert_chain_ok = True
for i in range(len(cert.x509List)):
if cert.x509List[i].issuer != cert.x509List[i].subject:
if cert.x509List[i].sigalg not in client_sigalgs:
cert_chain_ok = False
break
if not cert_chain_ok:
break

# If all mandatory and non-mandatory checks passed
# return the (cert, key) pair, cipher and sig_scheme
return cipher, sig_scheme, cert, key

# If we can't find cert that passed all the checks, return the first usable one.
return possible_certs[0]


def _serverCertKeyExchange(self, clientHello, serverHello, sigHashAlg,
serverCertChain, keyExchange,
reqCert, reqCAs, cipherSuite,
Expand Down Expand Up @@ -4231,7 +4353,7 @@ def _handshakeWrapperAsync(self, handshaker, checker):
@staticmethod
def _pickServerKeyExchangeSig(settings, clientHello, certList=None,
private_key=None,
version=(3, 3)):
version=(3, 3), check_alt=True):
"""Pick a hash that matches most closely the supported ones"""
hashAndAlgsExt = clientHello.getExtension(
ExtensionType.signature_algorithms)
Expand All @@ -4247,8 +4369,12 @@ def _pickServerKeyExchangeSig(settings, clientHello, certList=None,
# sha1 should be picked
return "sha1", certList, private_key

alt_certs = ((X509CertChain(i.certificates), i.key) for vh in
settings.virtual_hosts for i in vh.keys)
if check_alt:
alt_certs = ((X509CertChain(i.certificates), i.key) for vh in
settings.virtual_hosts for i in vh.keys)
else:
alt_certs = ()


for certs, key in chain([(certList, private_key)], alt_certs):
supported = TLSConnection._sigHashesToList(settings,
Expand Down
Loading

0 comments on commit f2d1ab2

Please sign in to comment.