Skip to content

Commit

Permalink
Merge pull request #398 from inikolcev/clean_key_mat_extractors
Browse files Browse the repository at this point in the history
Move keying material extractors to one place
  • Loading branch information
tomato42 committed Apr 3, 2020
2 parents c32629e + 3e94cef commit 9912a51
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 134 deletions.
12 changes: 6 additions & 6 deletions tlslite/keyexchange.py
Expand Up @@ -6,7 +6,7 @@

import ecdsa
from .mathtls import goodGroupParameters, makeK, makeU, makeX, \
calcMasterSecret, paramStrength, RFC7919_GROUPS
paramStrength, RFC7919_GROUPS, calc_key
from .errors import TLSInsufficientSecurity, TLSUnknownPSKIdentity, \
TLSIllegalParameterException, TLSDecryptionFailed, TLSInternalError, \
TLSDecodeError
Expand Down Expand Up @@ -260,11 +260,11 @@ def calcVerifyBytes(version, handshakeHashes, signatureAlg,
prf_name = None, peer_tag=b'client', key_type="rsa"):
"""Calculate signed bytes for Certificate Verify"""
if version == (3, 0):
masterSecret = calcMasterSecret(version,
0,
premasterSecret,
clientRandom,
serverRandom)
masterSecret = calc_key(version, premasterSecret,
0, b"master secret",
client_random=clientRandom,
server_random=serverRandom,
output_length=48)
verifyBytes = handshakeHashes.digestSSL(masterSecret, b"")
elif version in ((3, 1), (3, 2)):
if key_type != "ecdsa":
Expand Down
87 changes: 87 additions & 0 deletions tlslite/mathtls.py
Expand Up @@ -13,6 +13,7 @@
from .constants import CipherSuite
from .utils import tlshashlib as hashlib
from .utils import tlshmac as hmac
from .utils.deprecations import deprecated_method

# 1024, 1536, 2048, 3072, 4096, 6144, and 8192 bit groups
# Formatted to match lines in RFC
Expand Down Expand Up @@ -509,6 +510,7 @@ def PRF_SSL(secret, seed, length):
index += 1
return bytes

@deprecated_method("Please use calcKey method instead.")
def calcExtendedMasterSecret(version, cipherSuite, premasterSecret,
handshakeHashes):
"""Derive Extended Master Secret from premaster and handshake msgs"""
Expand All @@ -532,6 +534,7 @@ def calcExtendedMasterSecret(version, cipherSuite, premasterSecret,
return masterSecret


@deprecated_method("Please use calcKey method instead.")
def calcMasterSecret(version, cipherSuite, premasterSecret, clientRandom,
serverRandom):
"""Derive Master Secret from premaster secret and random values"""
Expand All @@ -556,6 +559,7 @@ def calcMasterSecret(version, cipherSuite, premasterSecret, clientRandom,
raise AssertionError()
return masterSecret

@deprecated_method("Please use calcKey method instead.")
def calcFinished(version, masterSecret, cipherSuite, handshakeHashes,
isClient):
"""Calculate the Handshake protocol Finished value
Expand Down Expand Up @@ -595,6 +599,89 @@ def calcFinished(version, masterSecret, cipherSuite, handshakeHashes,

return verifyData

def calc_key(version, secret, cipher_suite, label, handshake_hashes=None,
client_random=None, server_random=None, output_length=None):
"""
Method for calculating different keys depending on input.
It can be used to calculate finished value, master secret,
extended master secret or key expansion.
:param version: TLS protocol version
:type version: tuple(int, int)
:param bytearray secret: master secret or premasterSecret which will be
used in the PRF.
:param int cipher_suite: Negotiated cipher suite of the connection.
:param bytes label: label for the key you want to calculate
(ex. 'master secret', 'extended master secret', etc).
:param handshake_hashes: running hash of the handshake messages
needed for calculating extended master secret or finished value.
:type handshake_hashes: ~tlslite.handshakehashes.HandshakeHashes
:param bytearray client_random: client random needed for calculating
master secret or key expansion.
:param bytearray server_random: server random needed for calculating
master secret or key expansion.
:param int output_length: Number of bytes to output.
"""


# SSL3 calculations.
if version == (3, 0):
# Calculating Finished value, either for message sent
# by server or by client
if label == b"client finished":
senderStr = b"\x43\x4C\x4E\x54"
return handshake_hashes.digestSSL(secret, senderStr)
elif label == b"server finished":
senderStr = b"\x53\x52\x56\x52"
return handshake_hashes.digestSSL(secret, senderStr)
else:
assert label in [b"key expansion", b"master secret"]
func = PRF_SSL

# TLS1.0 or TLS1.1 calculations.
elif version in ((3, 1), (3, 2)):
func = PRF
# Seed needed for calculating extended master secret
if label == b"extended master secret":
seed = handshake_hashes.digest('md5') + \
handshake_hashes.digest('sha1')
# Seed needed for calculating Finished value
elif label in [b"server finished", b"client finished"]:
seed = handshake_hashes.digest()
else:
assert label in [b"key expansion", b"master secret"]

# TLS1.2 calculations.
else:
assert version == (3, 3)
if cipher_suite in CipherSuite.sha384PrfSuites:
func = PRF_1_2_SHA384
# Seed needed for calculating Finished value or extended master
# secret
if label in [b"extended master secret", b"server finished",
b"client finished"]:
seed = handshake_hashes.digest('sha384')
else:
assert label in [b"key expansion", b"master secret"]
else:
# Same as above, just using sha256
func = PRF_1_2
if label in [b"extended master secret", b"server finished",
b"client finished"]:
seed = handshake_hashes.digest('sha256')
else:
assert label in [b"key expansion", b"master secret"]

# Seed needed for calculating key expansion or master secret
if label == b"key expansion":
seed = server_random + client_random
if label == b"master secret":
seed = client_random + server_random

if func == PRF_SSL:
return func(secret, seed, output_length)
return func(secret, label, seed, output_length)

def makeX(salt, username, password):
if len(username)>=256:
raise ValueError("username too long")
Expand Down
37 changes: 5 additions & 32 deletions tlslite/recordlayer.py
Expand Up @@ -30,8 +30,7 @@
from .errors import TLSRecordOverflow, TLSIllegalParameterException,\
TLSAbruptCloseError, TLSDecryptionFailed, TLSBadRecordMAC, \
TLSUnexpectedMessage
from .mathtls import createMAC_SSL, createHMAC, PRF_SSL, PRF, PRF_1_2, \
PRF_1_2_SHA384
from .mathtls import createMAC_SSL, createHMAC, calc_key

class RecordSocket(object):
"""
Expand Down Expand Up @@ -1097,34 +1096,6 @@ def _getHMACMethod(version):

return createMACFunc

def _calcKeyBlock(self, cipherSuite, masterSecret, clientRandom,
serverRandom, outputLength):
"""Calculate the overall key to slice up"""
if self.version == (3, 0):
keyBlock = PRF_SSL(masterSecret,
serverRandom + clientRandom,
outputLength)
elif self.version in ((3, 1), (3, 2)):
keyBlock = PRF(masterSecret,
b"key expansion",
serverRandom + clientRandom,
outputLength)
elif self.version == (3, 3):
if cipherSuite in CipherSuite.sha384PrfSuites:
keyBlock = PRF_1_2_SHA384(masterSecret,
b"key expansion",
serverRandom + clientRandom,
outputLength)
else:
keyBlock = PRF_1_2(masterSecret,
b"key expansion",
serverRandom + clientRandom,
outputLength)
else:
raise AssertionError()

return keyBlock

def calcSSL2PendingStates(self, cipherSuite, masterSecret, clientRandom,
serverRandom, implementations):
"""
Expand Down Expand Up @@ -1215,8 +1186,10 @@ def calcPendingStates(self, cipherSuite, masterSecret, clientRandom,
outputLength = (macLength*2) + (keyLength*2) + (ivLength*2)

#Calculate Keying Material from Master Secret
keyBlock = self._calcKeyBlock(cipherSuite, masterSecret, clientRandom,
serverRandom, outputLength)
keyBlock = calc_key(self.version, masterSecret, cipherSuite,
b"key expansion", client_random=clientRandom,
server_random=serverRandom,
output_length=outputLength)

#Slice up Keying Material
clientPendingState = ConnectionState()
Expand Down
65 changes: 37 additions & 28 deletions tlslite/tlsconnection.py
Expand Up @@ -1788,16 +1788,16 @@ def _clientFinished(self, premasterSecret, clientRandom, serverRandom,
# use the certificate authentication, the hashes are the same
if not cvhh:
cvhh = self._handshake_hash
masterSecret = calcExtendedMasterSecret(self.version,
cipherSuite,
premasterSecret,
cvhh)
masterSecret = calc_key(self.version, premasterSecret,
cipherSuite, b"extended master secret",
handshake_hashes=cvhh,
output_length=48)
else:
masterSecret = calcMasterSecret(self.version,
cipherSuite,
premasterSecret,
clientRandom,
serverRandom)
masterSecret = calc_key(self.version, premasterSecret,
cipherSuite, b"master secret",
client_random=clientRandom,
server_random=serverRandom,
output_length=48)
self._calcPendingStates(cipherSuite, masterSecret,
clientRandom, serverRandom,
cipherImplementations)
Expand Down Expand Up @@ -4069,16 +4069,16 @@ def _serverFinished(self, premasterSecret, clientRandom, serverRandom,
# to regular handshake hash
if not cvhh:
cvhh = self._handshake_hash
masterSecret = calcExtendedMasterSecret(self.version,
cipherSuite,
premasterSecret,
cvhh)
masterSecret = calc_key(self.version, premasterSecret,
cipherSuite, b"extended master secret",
handshake_hashes=cvhh,
output_length=48)
else:
masterSecret = calcMasterSecret(self.version,
cipherSuite,
premasterSecret,
clientRandom,
serverRandom)
masterSecret = calc_key(self.version, premasterSecret,
cipherSuite, b"master secret",
client_random=clientRandom,
server_random=serverRandom,
output_length=48)

#Calculate pending connection states
self._calcPendingStates(cipherSuite, masterSecret,
Expand Down Expand Up @@ -4125,12 +4125,16 @@ def _sendFinished(self, masterSecret, cipherSuite=None, nextProto=None,
for result in self._sendMsg(nextProtoMsg):
yield result

#Figure out the correct label to use
if self._client:
label = b"client finished"
else:
label = b"server finished"
#Calculate verification data
verifyData = calcFinished(self.version,
masterSecret,
cipherSuite,
self._handshake_hash,
self._client)
verifyData = calc_key(self.version, masterSecret,
cipherSuite, label,
handshake_hashes=self._handshake_hash,
output_length=12)
if self.fault == Fault.badFinished:
verifyData[0] = (verifyData[0]+1)%256

Expand Down Expand Up @@ -4175,12 +4179,17 @@ def _getFinished(self, masterSecret, cipherSuite=None,
if nextProto:
self.next_proto = nextProto

#Figure out which label to use.
if self._client:
label = b"server finished"
else:
label = b"client finished"

#Calculate verification data
verifyData = calcFinished(self.version,
masterSecret,
cipherSuite,
self._handshake_hash,
not self._client)
verifyData = calc_key(self.version, masterSecret,
cipherSuite, label,
handshake_hashes=self._handshake_hash,
output_length=12)

#Get and check Finished message under new state
for result in self._getMsg(ContentType.handshake,
Expand Down
14 changes: 14 additions & 0 deletions tlslite/utils/deprecations.py
Expand Up @@ -202,3 +202,17 @@ def wrapper(cls):
orig_vars.pop('__weakref__', None)
return metaclass(cls.__name__, cls.__bases__, orig_vars)
return wrapper

def deprecated_method(message):
"""Decorator for deprecating methods.
:param ste message: The message you want to display.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
warnings.warn("{0} is a deprecated method. {1}".format(func.__name__, message),
DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
return wrapper
return decorator

0 comments on commit 9912a51

Please sign in to comment.