Skip to content

Commit

Permalink
Merge 7e52fed into 513e165
Browse files Browse the repository at this point in the history
  • Loading branch information
tomato42 committed Oct 2, 2019
2 parents 513e165 + 7e52fed commit f7be560
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 78 deletions.
11 changes: 11 additions & 0 deletions tests/clientECCert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-----BEGIN CERTIFICATE-----
MIIBfjCCASOgAwIBAgIUTxkWwfl73xnqhoJqWZ+R4RPCX54wCgYIKoZIzj0EAwIw
FDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE5MDkyNTE2NDgzMloXDTI5MDkyMjE2
NDgzMlowFDESMBAGA1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0D
AQcDQgAEQNSgjtKQ2KEyIjcamNSSH83huV2aYpJMCRYAXvEBUNygmCwxCod2j97J
Z4rnBmd9ySlceT65nXFuDuKlftIlLaNTMFEwHQYDVR0OBBYEFM9448HkxS8PnjT2
CRzwBP3SWGK1MB8GA1UdIwQYMBaAFM9448HkxS8PnjT2CRzwBP3SWGK1MA8GA1Ud
EwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSQAwRgIhAKCSuUb8eWvb3WCQcA8Wj2av
V8UXsXPk5N2HyDojhklvAiEA97Z8QDFxMg20hm8cPGMlQT1r3PtPwF0eRsL2rPu0
nxY=
-----END CERTIFICATE-----
5 changes: 5 additions & 0 deletions tests/clientECKey.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgjIlFWfrrN8Rsn7RU
9mZsG6hTDEa/1pXw8oNBML7UjeihRANCAARA1KCO0pDYoTIiNxqY1JIfzeG5XZpi
kkwJFgBe8QFQ3KCYLDEKh3aP3slniucGZ33JKVx5PrmdcW4O4qV+0iUt
-----END PRIVATE KEY-----
36 changes: 32 additions & 4 deletions tests/tlstest.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,27 @@ def connect():

synchro.recv(1)
connection = connect()
# TODO add client certificate support in TLS 1.3
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
connection.handshakeClientCert(x509Chain, x509Key)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()

test_no += 1

print("Test {0} - good mutual ECDSA X.509".format(test_no))
with open(os.path.join(dir, "clientECCert.pem")) as f:
x509Cert = X509().parse(f.read())
x509Chain = X509CertChain([x509Cert])
with open(os.path.join(dir, "clientECKey.pem")) as f:
x509Key = parsePEMKey(f.read(), private=True)

synchro.recv(1)
connection = connect()
connection.handshakeClientCert(x509Chain, x509Key)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) ==\
256
connection.close()

test_no += 1
Expand Down Expand Up @@ -1692,6 +1707,19 @@ def connect():

test_no += 1

print("Test {0} - good mutual ECDSA X.509".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, reqCert=True)
testConnServer(connection)
assert(isinstance(connection.session.clientCertChain, X509CertChain))
assert len(connection.session.clientCertChain.getEndEntityPublicKey()) ==\
256
connection.close()

test_no += 1

print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no))
synchro.send(b'R')
connection = connect()
Expand Down
2 changes: 1 addition & 1 deletion tlslite/keyexchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def makeCertificateVerify(version, handshakeHashes, validSigAlgs,
premasterSecret,
clientRandom,
serverRandom,
privateKey.key_type)
key_type=privateKey.key_type)
if signatureAlgorithm and \
signatureAlgorithm[1] == SignatureAlgorithm.ecdsa:
padding = None
Expand Down
198 changes: 125 additions & 73 deletions tlslite/tlsconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ def _clientTLS13Handshake(self, settings, session, clientHello,
assert isinstance(certificate_verify, CertificateVerify)

signature_scheme = certificate_verify.signatureAlgorithm
self.serverSigAlg = signature_scheme

signature_context = KeyExchange.calcVerifyBytes((3, 4),
srv_cert_verify_hh,
Expand All @@ -1268,20 +1269,27 @@ def _clientTLS13Handshake(self, settings, session, clientHello,
publicKey, serverCertChain, tackExt = result

if signature_scheme[1] == SignatureAlgorithm.ecdsa:
padType = None
hashName = HashAlgorithm.toRepr(signature_scheme[0])
saltLen = None
pad_type = None
hash_name = HashAlgorithm.toRepr(signature_scheme[0])
matching_hash = self._curve_name_to_hash_name(
publicKey.curve_name)
if hash_name != matching_hash:
raise TLSIllegalParameterException(
"server selected signature method invalid for the "
"certificate it presented (curve mismatch)")

salt_len = None
else:
scheme = SignatureScheme.toRepr(signature_scheme)
padType = SignatureScheme.getPadding(scheme)
hashName = SignatureScheme.getHash(scheme)
saltLen = getattr(hashlib, hashName)().digest_size
pad_type = SignatureScheme.getPadding(scheme)
hash_name = SignatureScheme.getHash(scheme)
salt_len = getattr(hashlib, hash_name)().digest_size

if not publicKey.verify(certificate_verify.signature,
signature_context,
padType,
hashName,
saltLen):
pad_type,
hash_name,
salt_len):
raise TLSDecryptionFailed("server Certificate Verify "
"signature "
"verification failed")
Expand Down Expand Up @@ -1470,7 +1478,7 @@ def _clientTLS13Handshake(self, settings, session, clientHello,
bytearray(b''), # no session_id in TLS 1.3
serverHello.cipher_suite,
None, # no SRP
None, # no client cert chain
clientCertChain,
certificate.cert_chain if certificate else None,
None, # no TACK
False, # no TACK in hello
Expand Down Expand Up @@ -1766,15 +1774,13 @@ def _clientFinished(self, premasterSecret, clientRandom, serverRandom,
yield result
yield masterSecret

def _clientGetKeyFromChain(self, certificate, settings, tackExt=None):
#Get and check cert chain from the Certificate message
cert_chain = certificate.cert_chain
if not cert_chain or cert_chain.getNumCerts() == 0:
for result in self._sendError(AlertDescription.illegal_parameter,
"Other party sent a Certificate message without "\
"certificates"):
yield result
def _check_certchain_with_settings(self, cert_chain, settings):
"""
Verify that the key parameters match enabled ones.
Checks if the certificate key size matches the minimum and maximum
sizes set or that it uses curves enabled in settings
"""
#Get and check public key from the cert chain
publicKey = cert_chain.getEndEntityPublicKey()
cert_type = cert_chain.x509List[0].certAlg
Expand Down Expand Up @@ -1826,23 +1832,42 @@ def _clientGetKeyFromChain(self, certificate, settings, tackExt=None):
"Other party's public key too large: %d" %
len(publicKey)):
yield result
yield publicKey

def _clientGetKeyFromChain(self, certificate, settings, tack_ext=None):
#Get and check cert chain from the Certificate message
cert_chain = certificate.cert_chain
if not cert_chain or cert_chain.getNumCerts() == 0:
for result in self._sendError(
AlertDescription.illegal_parameter,
"Other party sent a Certificate message without "\
"certificates"):
yield result

for result in self._check_certchain_with_settings(
cert_chain,
settings):
if result in (0, 1):
yield result
else: break
public_key = result

# If there's no TLS Extension, look for a TACK cert
if tackpyLoaded:
if not tackExt:
tackExt = cert_chain.getTackExt()
if not tack_ext:
tack_ext = cert_chain.getTackExt()

# If there's a TACK (whether via TLS or TACK Cert), check that it
# matches the cert chain
if tackExt and tackExt.tacks:
for tack in tackExt.tacks:
if tack_ext and tack_ext.tacks:
for tack in tack_ext.tacks:
if not cert_chain.checkTack(tack):
for result in self._sendError(
AlertDescription.illegal_parameter,
"Other party's TACK doesn't match their public key"):
yield result

yield publicKey, cert_chain, tackExt
yield public_key, cert_chain, tack_ext


#*********************************************************
Expand Down Expand Up @@ -2701,13 +2726,18 @@ def _serverTLS13Handshake(self, settings, clientHello, cipherSuite,
signature_scheme, None, None, None,
prf_name, b'client')

scheme = SignatureScheme.toRepr(signature_scheme)
pad_type = SignatureScheme.getPadding(scheme)
hash_name = SignatureScheme.getHash(scheme)
salt_len = getattr(hashlib, hash_name)().digest_size

public_key = client_cert_chain.getEndEntityPublicKey()

if signature_scheme[1] == SignatureAlgorithm.ecdsa:
hash_name = HashAlgorithm.toRepr(signature_scheme[0])
pad_type = None
salt_len = None
else:
scheme = SignatureScheme.toRepr(signature_scheme)
pad_type = SignatureScheme.getPadding(scheme)
hash_name = SignatureScheme.getHash(scheme)
salt_len = getattr(hashlib, hash_name)().digest_size

if not public_key.verify(certificate_verify.signature,
signature_context,
pad_type,
Expand Down Expand Up @@ -3704,7 +3734,8 @@ def _serverCertKeyExchange(self, clientHello, serverHello,
if not reqCAs:
reqCAs = []
valid_sig_algs = self._sigHashesToList(settings)
certificateRequest.create([ClientCertificateType.rsa_sign],
certificateRequest.create([ClientCertificateType.rsa_sign,
ClientCertificateType.ecdsa_sign],
reqCAs,
valid_sig_algs)
msgs.append(certificateRequest)
Expand Down Expand Up @@ -3792,46 +3823,56 @@ def _serverCertKeyExchange(self, clientHello, serverHello,
"Verify"):
yield result
signatureAlgorithm = certificateVerify.signatureAlgorithm
if not signatureAlgorithm and \
clientCertChain.x509List[0].certAlg == "ecdsa":
signatureAlgorithm = (HashAlgorithm.sha1,
SignatureAlgorithm.ecdsa)

cvhh = self._certificate_verify_handshake_hash
verifyBytes = KeyExchange.calcVerifyBytes(self.version,
cvhh,
signatureAlgorithm,
premasterSecret,
clientHello.random,
serverHello.random)
publicKey = clientCertChain.getEndEntityPublicKey()
if len(publicKey) < settings.minKeySize:
for result in self._sendError(\
AlertDescription.handshake_failure,
"Client's public key too small: %d" % len(publicKey)):
yield result

if len(publicKey) > settings.maxKeySize:
for result in self._sendError(\
AlertDescription.handshake_failure,
"Client's public key too large: %d" % len(publicKey)):
verify_bytes = KeyExchange.calcVerifyBytes(
self.version,
cvhh,
signatureAlgorithm,
premasterSecret,
clientHello.random,
serverHello.random,
key_type=clientCertChain.x509List[0].certAlg)

for result in self._check_certchain_with_settings(
clientCertChain,
settings):
if result in (0, 1):
yield result

scheme = SignatureScheme.toRepr(signatureAlgorithm)
# for pkcs1 signatures hash is used to add PKCS#1 prefix, but
# that was already done by calcVerifyBytes
hashName = None
saltLen = 0
if scheme is None:
padding = 'pkcs1'
else: break
public_key = result

if not signatureAlgorithm or \
signatureAlgorithm[1] != SignatureAlgorithm.ecdsa:
scheme = SignatureScheme.toRepr(signatureAlgorithm)
# for pkcs1 signatures hash is used to add PKCS#1 prefix, but
# that was already done by calcVerifyBytes
hash_name = None
salt_len = 0
if scheme is None:
padding = 'pkcs1'
else:
padding = SignatureScheme.getPadding(scheme)
if padding == 'pss':
hash_name = SignatureScheme.getHash(scheme)
salt_len = getattr(hashlib, hash_name)().digest_size
else:
padding = SignatureScheme.getPadding(scheme)
if padding == 'pss':
hashName = SignatureScheme.getHash(scheme)
saltLen = getattr(hashlib, hashName)().digest_size

if not publicKey.verify(certificateVerify.signature,
verifyBytes,
padding,
hashName,
saltLen):
for result in self._sendError(\
hash_name = HashAlgorithm.toStr(signatureAlgorithm[0])
verify_bytes = verify_bytes[
:public_key.public_key.curve.baselen]
padding = None
salt_len = None

if not public_key.verify(certificateVerify.signature,
verify_bytes,
padding,
hash_name,
salt_len):
for result in self._sendError(
AlertDescription.decrypt_error,
"Signature failed to verify"):
yield result
Expand Down Expand Up @@ -4090,14 +4131,10 @@ def _sigHashesToList(settings, privateKey=None, certList=None,

# in TLS 1.3 ECDSA key curve is bound to hash
if publicKey and version > (3, 3):
size = len(publicKey)
size, r = divmod(size, 8)
size += int(bool(r))
if size == 32 and hashName != "sha256":
continue
if size == 48 and hashName != "sha384":
continue
if size == 65 and hashName != "sha512":
curve = publicKey.curve_name
matching_hash = TLSConnection._curve_name_to_hash_name(
curve)
if hashName != matching_hash:
continue

sigAlgs.append((getattr(HashAlgorithm, hashName),
Expand Down Expand Up @@ -4146,3 +4183,18 @@ def _curveNamesToList(settings):
def _groupNamesToList(settings):
"""Convert list of acceptable ff groups to TLS identifiers."""
return [getattr(GroupName, val) for val in settings.dhGroups]

@staticmethod
def _curve_name_to_hash_name(curve_name):
"""Returns the matching hash for a given curve name, for TLS 1.3
expects the python-ecdsa curve names as parameter
"""
if curve_name == "NIST256p":
return "sha256"
if curve_name == "NIST384p":
return "sha384"
if curve_name == "NIST521p":
return "sha512"
raise TLSIllegalParameterException(
"Curve {0} is not supported in TLS 1.3".format(curve_name))

0 comments on commit f7be560

Please sign in to comment.