Skip to content

Commit

Permalink
Merge e53b07d into 513e165
Browse files Browse the repository at this point in the history
  • Loading branch information
tomato42 committed Sep 30, 2019
2 parents 513e165 + e53b07d commit c8ad4fc
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 55 deletions.
11 changes: 11 additions & 0 deletions tests/clientECCert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-----BEGIN CERTIFICATE-----
MIIBfjCCASOgAwIBAgIUTxkWwfl73xnqhoJqWZ+R4RPCX54wCgYIKoZIzj0EAwIw
FDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE5MDkyNTE2NDgzMloXDTI5MDkyMjE2
NDgzMlowFDESMBAGA1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0D
AQcDQgAEQNSgjtKQ2KEyIjcamNSSH83huV2aYpJMCRYAXvEBUNygmCwxCod2j97J
Z4rnBmd9ySlceT65nXFuDuKlftIlLaNTMFEwHQYDVR0OBBYEFM9448HkxS8PnjT2
CRzwBP3SWGK1MB8GA1UdIwQYMBaAFM9448HkxS8PnjT2CRzwBP3SWGK1MA8GA1Ud
EwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSQAwRgIhAKCSuUb8eWvb3WCQcA8Wj2av
V8UXsXPk5N2HyDojhklvAiEA97Z8QDFxMg20hm8cPGMlQT1r3PtPwF0eRsL2rPu0
nxY=
-----END CERTIFICATE-----
5 changes: 5 additions & 0 deletions tests/clientECKey.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgjIlFWfrrN8Rsn7RU
9mZsG6hTDEa/1pXw8oNBML7UjeihRANCAARA1KCO0pDYoTIiNxqY1JIfzeG5XZpi
kkwJFgBe8QFQ3KCYLDEKh3aP3slniucGZ33JKVx5PrmdcW4O4qV+0iUt
-----END PRIVATE KEY-----
36 changes: 32 additions & 4 deletions tests/tlstest.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,27 @@ def connect():

synchro.recv(1)
connection = connect()
# TODO add client certificate support in TLS 1.3
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
connection.handshakeClientCert(x509Chain, x509Key)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()

test_no += 1

print("Test {0} - good mutual ECDSA X.509".format(test_no))
with open(os.path.join(dir, "clientECCert.pem")) as f:
x509Cert = X509().parse(f.read())
x509Chain = X509CertChain([x509Cert])
with open(os.path.join(dir, "clientECKey.pem")) as f:
x509Key = parsePEMKey(f.read(), private=True)

synchro.recv(1)
connection = connect()
connection.handshakeClientCert(x509Chain, x509Key)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) ==\
256
connection.close()

test_no += 1
Expand Down Expand Up @@ -1692,6 +1707,19 @@ def connect():

test_no += 1

print("Test {0} - good mutual ECDSA X.509".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, reqCert=True)
testConnServer(connection)
assert(isinstance(connection.session.clientCertChain, X509CertChain))
assert len(connection.session.clientCertChain.getEndEntityPublicKey()) ==\
256
connection.close()

test_no += 1

print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no))
synchro.send(b'R')
connection = connect()
Expand Down
2 changes: 1 addition & 1 deletion tlslite/keyexchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def makeCertificateVerify(version, handshakeHashes, validSigAlgs,
premasterSecret,
clientRandom,
serverRandom,
privateKey.key_type)
key_type=privateKey.key_type)
if signatureAlgorithm and \
signatureAlgorithm[1] == SignatureAlgorithm.ecdsa:
padding = None
Expand Down
139 changes: 89 additions & 50 deletions tlslite/tlsconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ def _clientTLS13Handshake(self, settings, session, clientHello,
assert isinstance(certificate_verify, CertificateVerify)

signature_scheme = certificate_verify.signatureAlgorithm
self.serverSigAlg = signature_scheme

signature_context = KeyExchange.calcVerifyBytes((3, 4),
srv_cert_verify_hh,
Expand All @@ -1270,6 +1271,16 @@ def _clientTLS13Handshake(self, settings, session, clientHello,
if signature_scheme[1] == SignatureAlgorithm.ecdsa:
padType = None
hashName = HashAlgorithm.toRepr(signature_scheme[0])
if publicKey.curve_name == "NIST256p" and \
hashName != "sha256" or \
publicKey.curve_name == "NIST384p" and \
hashName != "sha384" or \
publicKey.curve_name == "NIST521p" and \
hashName != "sha512":
raise TLSIllegalParameterException(
"server selected signature method invalid for the "
"certificate it presented (curve mismatch)")

saltLen = None
else:
scheme = SignatureScheme.toRepr(signature_scheme)
Expand Down Expand Up @@ -1470,7 +1481,7 @@ def _clientTLS13Handshake(self, settings, session, clientHello,
bytearray(b''), # no session_id in TLS 1.3
serverHello.cipher_suite,
None, # no SRP
None, # no client cert chain
clientCertChain,
certificate.cert_chain if certificate else None,
None, # no TACK
False, # no TACK in hello
Expand Down Expand Up @@ -1766,15 +1777,13 @@ def _clientFinished(self, premasterSecret, clientRandom, serverRandom,
yield result
yield masterSecret

def _clientGetKeyFromChain(self, certificate, settings, tackExt=None):
#Get and check cert chain from the Certificate message
cert_chain = certificate.cert_chain
if not cert_chain or cert_chain.getNumCerts() == 0:
for result in self._sendError(AlertDescription.illegal_parameter,
"Other party sent a Certificate message without "\
"certificates"):
yield result
def _check_certchain_with_settings(self, cert_chain, settings):
"""
Verify that the key parameters match enabled ones.
Checks if the certificate key size matches the minimum and maximum
sizes set or that it uses curves enabled in settings
"""
#Get and check public key from the cert chain
publicKey = cert_chain.getEndEntityPublicKey()
cert_type = cert_chain.x509List[0].certAlg
Expand Down Expand Up @@ -1826,6 +1835,23 @@ def _clientGetKeyFromChain(self, certificate, settings, tackExt=None):
"Other party's public key too large: %d" %
len(publicKey)):
yield result
yield publicKey

def _clientGetKeyFromChain(self, certificate, settings, tackExt=None):
#Get and check cert chain from the Certificate message
cert_chain = certificate.cert_chain
if not cert_chain or cert_chain.getNumCerts() == 0:
for result in self._sendError(AlertDescription.illegal_parameter,
"Other party sent a Certificate message without "\
"certificates"):
yield result

for result in self._check_certchain_with_settings(cert_chain,
settings):
if result in (0, 1):
yield result
else: break
publicKey = result

# If there's no TLS Extension, look for a TACK cert
if tackpyLoaded:
Expand Down Expand Up @@ -2701,13 +2727,18 @@ def _serverTLS13Handshake(self, settings, clientHello, cipherSuite,
signature_scheme, None, None, None,
prf_name, b'client')

scheme = SignatureScheme.toRepr(signature_scheme)
pad_type = SignatureScheme.getPadding(scheme)
hash_name = SignatureScheme.getHash(scheme)
salt_len = getattr(hashlib, hash_name)().digest_size

public_key = client_cert_chain.getEndEntityPublicKey()

if signature_scheme[1] == SignatureAlgorithm.ecdsa:
hash_name = HashAlgorithm.toRepr(signature_scheme[0])
pad_type = None
salt_len = None
else:
scheme = SignatureScheme.toRepr(signature_scheme)
pad_type = SignatureScheme.getPadding(scheme)
hash_name = SignatureScheme.getHash(scheme)
salt_len = getattr(hashlib, hash_name)().digest_size

if not public_key.verify(certificate_verify.signature,
signature_context,
pad_type,
Expand Down Expand Up @@ -3704,7 +3735,8 @@ def _serverCertKeyExchange(self, clientHello, serverHello,
if not reqCAs:
reqCAs = []
valid_sig_algs = self._sigHashesToList(settings)
certificateRequest.create([ClientCertificateType.rsa_sign],
certificateRequest.create([ClientCertificateType.rsa_sign,
ClientCertificateType.ecdsa_sign],
reqCAs,
valid_sig_algs)
msgs.append(certificateRequest)
Expand Down Expand Up @@ -3792,46 +3824,55 @@ def _serverCertKeyExchange(self, clientHello, serverHello,
"Verify"):
yield result
signatureAlgorithm = certificateVerify.signatureAlgorithm
if not signatureAlgorithm and \
clientCertChain.x509List[0].certAlg == "ecdsa":
signatureAlgorithm = (HashAlgorithm.sha1,
SignatureAlgorithm.ecdsa)

cvhh = self._certificate_verify_handshake_hash
verifyBytes = KeyExchange.calcVerifyBytes(self.version,
cvhh,
signatureAlgorithm,
premasterSecret,
clientHello.random,
serverHello.random)
publicKey = clientCertChain.getEndEntityPublicKey()
if len(publicKey) < settings.minKeySize:
for result in self._sendError(\
AlertDescription.handshake_failure,
"Client's public key too small: %d" % len(publicKey)):
yield result
verifyBytes = KeyExchange.calcVerifyBytes(
self.version,
cvhh,
signatureAlgorithm,
premasterSecret,
clientHello.random,
serverHello.random,
key_type=clientCertChain.x509List[0].certAlg)

if len(publicKey) > settings.maxKeySize:
for result in self._sendError(\
AlertDescription.handshake_failure,
"Client's public key too large: %d" % len(publicKey)):
for result in self._check_certchain_with_settings(
clientCertChain,
settings):
if result in (0, 1):
yield result

scheme = SignatureScheme.toRepr(signatureAlgorithm)
# for pkcs1 signatures hash is used to add PKCS#1 prefix, but
# that was already done by calcVerifyBytes
hashName = None
saltLen = 0
if scheme is None:
padding = 'pkcs1'
else: break
publicKey = result

if not signatureAlgorithm or \
signatureAlgorithm[1] != SignatureAlgorithm.ecdsa:
scheme = SignatureScheme.toRepr(signatureAlgorithm)
# for pkcs1 signatures hash is used to add PKCS#1 prefix, but
# that was already done by calcVerifyBytes
hashName = None
saltLen = 0
if scheme is None:
padding = 'pkcs1'
else:
padding = SignatureScheme.getPadding(scheme)
if padding == 'pss':
hashName = SignatureScheme.getHash(scheme)
saltLen = getattr(hashlib, hashName)().digest_size
else:
padding = SignatureScheme.getPadding(scheme)
if padding == 'pss':
hashName = SignatureScheme.getHash(scheme)
saltLen = getattr(hashlib, hashName)().digest_size
hashName = HashAlgorithm.toStr(signatureAlgorithm[0])
verifyBytes = verifyBytes[:publicKey.public_key.curve.baselen]
padding = None
saltLen = None

if not publicKey.verify(certificateVerify.signature,
verifyBytes,
padding,
hashName,
saltLen):
for result in self._sendError(\
for result in self._sendError(
AlertDescription.decrypt_error,
"Signature failed to verify"):
yield result
Expand Down Expand Up @@ -4090,14 +4131,12 @@ def _sigHashesToList(settings, privateKey=None, certList=None,

# in TLS 1.3 ECDSA key curve is bound to hash
if publicKey and version > (3, 3):
size = len(publicKey)
size, r = divmod(size, 8)
size += int(bool(r))
if size == 32 and hashName != "sha256":
curve = publicKey.curve_name
if curve == "NIST256p" and hashName != "sha256":
continue
if size == 48 and hashName != "sha384":
if curve == "NIST384p" and hashName != "sha384":
continue
if size == 65 and hashName != "sha512":
if curve == "NIST521p" and hashName != "sha512":
continue

sigAlgs.append((getattr(HashAlgorithm, hashName),
Expand Down

0 comments on commit c8ad4fc

Please sign in to comment.