From c6a25bb27d9536542ca657432acb9d56bc04193c Mon Sep 17 00:00:00 2001 From: Ivan Nikolchev Date: Thu, 7 Nov 2019 13:09:17 +0100 Subject: [PATCH 1/3] AES-CCM implementation per rfc3610 --- tlslite/utils/aesccm.py | 181 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 tlslite/utils/aesccm.py diff --git a/tlslite/utils/aesccm.py b/tlslite/utils/aesccm.py new file mode 100644 index 000000000..01f362e78 --- /dev/null +++ b/tlslite/utils/aesccm.py @@ -0,0 +1,181 @@ +# Copyright (c) 2019 Ivan Nikolchev +# +# See the LICENSE file for legal information regarding use of this file. +# + +from __future__ import division +from tlslite.utils.cryptomath import numberToByteArray +from tlslite.utils.python_aes import Python_AES +import sys +import array + + +class AESCCM(object): + # AES-CCM implementation per RFC3610 + + def __init__(self, key, implementation, rawAesEncrypt, tag_length=16): + self.isBlockCipher = False + self.isAEAD = True + self.key = key + self.tagLength = tag_length + if len(self.key) == 16 and self.tagLength == 8: + self.name = "aes128ccm_8" + elif len(self.key) == 16 and self.tagLength == 16: + self.name = "aes128ccm" + elif len(self.key) == 32 and self.tagLength == 8: + self.name = "aes256ccm_8" + else: + assert len(self.key) == 32 and self.tagLength == 16 + self.name = "aes256ccm" + self._rawAesEncrypt = rawAesEncrypt + self.implementation = implementation + self.nonceLength = 12 + + def _cbcmac_calc(self, nonce, aad, msg): + L = 15 - len(nonce) + mac_data = bytearray() + + # Flags constructed as in section 2.2 in the rfc + flags = 64 * (len(aad) > 0) + flags += 8 * ((self.tagLength - 2) // 2) + flags += 1 * (L - 1) + + # Construct B_0 + b_0 = bytearray([flags]) + nonce + numberToByteArray(len(msg), L) + + aad_len_encoded = bytearray() + if len(aad) > 0: + if len(aad) < (2 ** 16 - 2 ** 8): + oct_size = 2 + elif len(aad) < (2 ** 32): + oct_size = 4 + aad_len_encoded = b'\xFF\xFE' + else: + oct_size = 8 + aad_len_encoded = b'\xFF\xFF' + + aad_len_encoded += numberToByteArray(len(aad), oct_size) + + # Construct the bytearray that goes into the MAC + mac_data += b_0 + mac_data += aad_len_encoded + mac_data += aad + + # We need to pad with zeroes before and after msg blocks are added + self._pad_with_zeroes(mac_data, 16) + mac_data += msg + self._pad_with_zeroes(mac_data, 16) + + # The mac data is now constructed and + # we need to run in through AES-CBC with 0 IV + + cbc = Python_AES(self.key, 2, bytearray(b'\x00' * 16)) + cbcmac = cbc.encrypt(mac_data) + + # If the tagLength has default value 16, we return + # the whole last block. Otherwise we return only + # the first tagLength bytes from the last block + if self.tagLength == 16: + t = cbcmac[-16:] + else: + t = cbcmac[-16:-(16-self.tagLength)] + return t + + def seal(self, nonce, msg, aad): + if len(nonce) != 12: + raise ValueError("Bad nonce length") + + L = 15 - len(nonce) + auth_value = bytearray(self.tagLength) + + # We construct the key stream blocks. + # S_0 is not used for encrypting the message, it is only used + # to compute the authentication value. + # S_1..S_n are used to encrypt the message. + + flags = L - 1 + s_0 = self._rawAesEncrypt(bytearray([flags]) + + nonce + numberToByteArray(0, L)) + + s_n = self._construct_s_n(msg, flags, nonce, L) + + if sys.version_info[0] >= 3: + if len(msg) % 8 != 0: + msg_added = 8 - (len(msg) % 8) + self._pad_with_zeroes(msg, 8) + enc_msg = self._use_memoryview(msg, s_n)[:-msg_added] + msg = msg[:-msg_added] + else: + enc_msg = self._use_memoryview(msg, s_n) + else: + enc_msg = bytearray(i ^ j for i, j in zip(msg, s_n)) + + mac = self._cbcmac_calc(nonce, aad, msg) + + for i in range(0, self.tagLength): + auth_value[i] = mac[i] ^ s_0[i] + + ciphertext = enc_msg + auth_value + return ciphertext + + def open(self, nonce, ciphertext, aad): + + if len(nonce) != 12: + raise ValueError("Bad nonce length") + if len(ciphertext) < 16: + return None + + L = 15 - len(nonce) + received_mac = bytearray(self.tagLength) + flags = L - 1 + + # Same construction as in seal function + + s_0 = self._rawAesEncrypt(bytearray([flags]) + + nonce + numberToByteArray(0, L)) + + msg = bytearray(len(ciphertext) - self.tagLength) + + s_n = self._construct_s_n(ciphertext, flags, nonce, L) + + # We decrypt the message + for i in range(0, len(ciphertext) - self.tagLength): + msg[i] = ciphertext[i] ^ s_n[i] + + auth_value = ciphertext[-self.tagLength:] + computed_mac = self._cbcmac_calc(nonce, aad, msg) + + # We decrypt the auth value + for i in range(0, self.tagLength): + received_mac[i] = auth_value[i] ^ s_0[i] + + # Compare the mac vlaue is the same as the one we computed + if received_mac != computed_mac: + return None + return msg + + def _construct_s_n(self, ciphertext, flags, nonce, L): + s_n = bytearray() + if len(ciphertext) % 16 == 0: + counter_lmt = len(ciphertext) / 16 + else: + counter_lmt = (len(ciphertext) / 16) + 1 + + for i in range(1, int(counter_lmt) + 1): + s_n += self._rawAesEncrypt(bytearray([flags]) + + nonce + numberToByteArray(i, L)) + return s_n + + @staticmethod + def _pad_with_zeroes(data, size): + if len(data) % size != 0: + zeroes_to_add = size - (len(data) % size) + data += b'\x00' * zeroes_to_add + + @staticmethod + def _use_memoryview(msg, s_n): + msg_mv = memoryview(msg).cast('Q') + s_n_mv = memoryview(s_n).cast('Q') + enc_arr = array.array('Q', (i ^ j for i, j in zip(msg_mv, s_n_mv))) + enc_msg = bytearray(enc_arr.tobytes()) + return enc_msg From cf88000497b5c14fbcbabdd632ea2764f7df18b0 Mon Sep 17 00:00:00 2001 From: Ivan Nikolchev Date: Thu, 7 Nov 2019 15:25:13 +0100 Subject: [PATCH 2/3] Test coverage for AES-CCM and integration with the rest of tlslite --- tests/tlstest.py | 16 +- tlslite/constants.py | 95 +++++++ tlslite/handshakesettings.py | 5 +- tlslite/recordlayer.py | 20 +- tlslite/tlsconnection.py | 11 +- tlslite/utils/cipherfactory.py | 39 +++ tlslite/utils/python_aesccm.py | 10 + unit_tests/test_tlslite_constants.py | 3 +- unit_tests/test_tlslite_utils_aesccm.py | 350 ++++++++++++++++++++++++ 9 files changed, 539 insertions(+), 10 deletions(-) create mode 100644 tlslite/utils/python_aesccm.py create mode 100644 unit_tests/test_tlslite_utils_aesccm.py diff --git a/tests/tlstest.py b/tests/tlstest.py index 45470a782..903a133b9 100755 --- a/tests/tlstest.py +++ b/tests/tlstest.py @@ -767,7 +767,8 @@ def connect(): print("Test {0} - throughput test".format(test_no)) for implementation in implementations: - for cipher in ["aes128gcm", "aes256gcm", "aes128", "aes256", "3des", + for cipher in ["aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8", + "aes128gcm", "aes256gcm", "aes128", "aes256", "3des", "rc4", "chacha20-poly1305_draft00", "chacha20-poly1305"]: # skip tests with implementations that don't support them @@ -778,7 +779,8 @@ def connect(): implementation not in ("pycrypto", "python"): continue - if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305") \ + if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305", + "aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8") \ and implementation not in ("python", ): continue @@ -791,7 +793,8 @@ def connect(): settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] - if cipher not in ("aes128gcm", "aes256gcm", "chacha20-poly1305"): + if cipher not in ("aes128ccm", "aes128ccm_8", "aes128gcm", + "aes256gcm", "chacha20-poly1305"): settings.maxVersion = (3, 3) connection.handshakeClientCert(settings=settings) print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ') @@ -1885,7 +1888,8 @@ def server_bind(self): print("Test {0} - throughput test".format(test_no)) for implementation in implementations: - for cipher in ["aes128gcm", "aes256gcm", "aes128", "aes256", "3des", + for cipher in ["aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8", + "aes128gcm", "aes256gcm", "aes128", "aes256", "3des", "rc4", "chacha20-poly1305_draft00", "chacha20-poly1305"]: # skip tests with implementations that don't support them @@ -1896,7 +1900,9 @@ def server_bind(self): implementation not in ("pycrypto", "python"): continue - if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305") \ + if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305", + "aes128ccm", "aes128ccm_8", + "aes256ccm", "aes256ccm_8") \ and implementation not in ("python", ): continue diff --git a/tlslite/constants.py b/tlslite/constants.py index 2d37e145c..ddd9c9a97 100644 --- a/tlslite/constants.py +++ b/tlslite/constants.py @@ -643,6 +643,25 @@ class CipherSuite: TLS_DH_ANON_WITH_AES_256_GCM_SHA384 = 0x00A7 ietfNames[0x00A7] = 'TLS_DH_ANON_WITH_AES_256_GCM_SHA384' + # RFC 6655 - AES-CCM ciphers for TLSv1.2 + TLS_RSA_WITH_AES_128_CCM = 0xC09C + ietfNames[0xC09C] = 'TLS_RSA_WITH_AES_128_CCM' + TLS_RSA_WITH_AES_256_CCM = 0xC09D + ietfNames[0xC09D] = 'TLS_RSA_WITH_AES_256_CCM' + TLS_DHE_RSA_WITH_AES_128_CCM = 0xC09E + ietfNames[0xC09E] = 'TLS_DHE_RSA_WITH_AES_128_CCM' + TLS_DHE_RSA_WITH_AES_256_CCM = 0xC09F + ietfNames[0xC09F] = 'TLS_DHE_RSA_WITH_AES_256_CCM' + TLS_RSA_WITH_AES_128_CCM_8 = 0xC0A0 + ietfNames[0xC0A0] = 'TLS_RSA_WITH_AES_128_CCM_8' + TLS_RSA_WITH_AES_256_CCM_8 = 0xC0A1 + ietfNames[0xC0A1] = 'TLS_RSA_WITH_AES_256_CCM_8' + TLS_DHE_RSA_WITH_AES_128_CCM_8 = 0xC0A2 + ietfNames[0xC0A2] = 'TLS_DHE_RSA_WITH_AES_128_CCM_8' + TLS_DHE_RSA_WITH_AES_256_CCM_8 = 0xC0A3 + ietfNames[0xC0A3] = 'TLS_DHE_RSA_WITH_AES_256_CCM_8' + + # Weird pseudo-ciphersuite from RFC 5746 # Signals that "secure renegotiation" is supported # We actually don't do any renegotiation, but this @@ -657,6 +676,10 @@ class CipherSuite: ietfNames[0x1302] = 'TLS_AES_256_GCM_SHA384' TLS_CHACHA20_POLY1305_SHA256 = 0x1303 ietfNames[0x1303] = 'TLS_CHACHA20_POLY1305_SHA256' + TLS_AES_128_CCM_SHA256 = 0x1304 + ietfNames[0x1304] = 'TLS_AES_128_CCM_SHA256' + TLS_AES_128_CCM_8_SHA256 = 0x1305 + ietfNames[0x1305] = 'TLS_AES_128_CCM_8_SHA256' # RFC 7507 - Fallback Signaling Cipher Suite Value for Preventing Protocol # Downgrade Attacks @@ -794,6 +817,16 @@ class CipherSuite: TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256 = 0xCCAA ietfNames[0xCCAA] = 'TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256' + # RFC 7251 - AES-CCM ECC Ciphers for TLS + TLS_ECDHE_ECDSA_WITH_AES_128_CCM = 0xC0AC + ietfNames[0xC0AC] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CCM' + TLS_ECDHE_ECDSA_WITH_AES_256_CCM = 0xC0AD + ietfNames[0xC0AD] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CCM' + TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8 = 0xC0AE + ietfNames[0xC0AE] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8' + TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8 = 0xC0AF + ietfNames[0xC0AF] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8' + #pylint: enable = invalid-name # # Define cipher suite families below @@ -874,6 +907,32 @@ class CipherSuite: aes256GcmSuites.append(TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) aes256GcmSuites.append(TLS_AES_256_GCM_SHA384) + #: AES-128 CCM_8 ciphers + aes128Ccm_8Suites = [] + aes128Ccm_8Suites.append(TLS_RSA_WITH_AES_128_CCM_8) + aes128Ccm_8Suites.append(TLS_DHE_RSA_WITH_AES_128_CCM_8) + aes128Ccm_8Suites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8) + aes128Ccm_8Suites.append(TLS_AES_128_CCM_8_SHA256) + + #: AES-128 CCM ciphers + aes128CcmSuites = [] + aes128CcmSuites.append(TLS_RSA_WITH_AES_128_CCM) + aes128CcmSuites.append(TLS_DHE_RSA_WITH_AES_128_CCM) + aes128CcmSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CCM) + aes128CcmSuites.append(TLS_AES_128_CCM_SHA256) + + #: AES-256 CCM_8 ciphers + aes256Ccm_8Suites = [] + aes256Ccm_8Suites.append(TLS_RSA_WITH_AES_256_CCM_8) + aes256Ccm_8Suites.append(TLS_DHE_RSA_WITH_AES_256_CCM_8) + aes256Ccm_8Suites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8) + + # AES-256 CCM ciphers + aes256CcmSuites = [] + aes256CcmSuites.append(TLS_RSA_WITH_AES_256_CCM) + aes256CcmSuites.append(TLS_DHE_RSA_WITH_AES_256_CCM) + aes256CcmSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CCM) + #: CHACHA20 cipher, 00'th IETF draft (implicit POLY1305 authenticator) chacha20draft00Suites = [] chacha20draft00Suites.append(TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_draft_00) @@ -985,6 +1044,10 @@ class CipherSuite: aeadSuites = [] aeadSuites.extend(aes128GcmSuites) aeadSuites.extend(aes256GcmSuites) + aeadSuites.extend(aes128CcmSuites) + aeadSuites.extend(aes128Ccm_8Suites) + aeadSuites.extend(aes256CcmSuites) + aeadSuites.extend(aes256Ccm_8Suites) aeadSuites.extend(chacha20Suites) aeadSuites.extend(chacha20draft00Suites) @@ -1022,6 +1085,10 @@ class CipherSuite: tls12Suites.remove(TLS_AES_128_GCM_SHA256) tls13Suites.append(TLS_CHACHA20_POLY1305_SHA256) tls12Suites.remove(TLS_CHACHA20_POLY1305_SHA256) + tls13Suites.append(TLS_AES_128_CCM_SHA256) + tls12Suites.remove(TLS_AES_128_CCM_SHA256) + tls13Suites.append(TLS_AES_128_CCM_8_SHA256) + tls12Suites.remove(TLS_AES_128_CCM_8_SHA256) @staticmethod def filterForVersion(suites, minVersion, maxVersion): @@ -1085,6 +1152,14 @@ def _filterSuites(suites, settings, version=None): cipherSuites += CipherSuite.aes128GcmSuites if "aes256gcm" in cipherNames and version >= (3, 3): cipherSuites += CipherSuite.aes256GcmSuites + if "aes128ccm" in cipherNames and version >= (3, 3): + cipherSuites += CipherSuite.aes128CcmSuites + if "aes128ccm_8" in cipherNames and version >= (3, 3): + cipherSuites += CipherSuite.aes128Ccm_8Suites + if "aes256ccm" in cipherNames and version >= (3, 3): + cipherSuites += CipherSuite.aes256CcmSuites + if "aes256ccm_8" in cipherNames and version >= (3, 3): + cipherSuites += CipherSuite.aes256Ccm_8Suites if "aes128" in cipherNames: cipherSuites += CipherSuite.aes128Suites if "aes256" in cipherNames: @@ -1158,10 +1233,14 @@ def getSrpAllSuites(cls, settings, version=None): certSuites = [] certSuites.append(TLS_RSA_WITH_AES_256_GCM_SHA384) certSuites.append(TLS_RSA_WITH_AES_128_GCM_SHA256) + certSuites.append(TLS_RSA_WITH_AES_256_CCM) + certSuites.append(TLS_RSA_WITH_AES_128_CCM) certSuites.append(TLS_RSA_WITH_AES_256_CBC_SHA256) certSuites.append(TLS_RSA_WITH_AES_128_CBC_SHA256) certSuites.append(TLS_RSA_WITH_AES_256_CBC_SHA) certSuites.append(TLS_RSA_WITH_AES_128_CBC_SHA) + certSuites.append(TLS_RSA_WITH_AES_256_CCM_8) + certSuites.append(TLS_RSA_WITH_AES_128_CCM_8) certSuites.append(TLS_RSA_WITH_3DES_EDE_CBC_SHA) certSuites.append(TLS_RSA_WITH_RC4_128_SHA) certSuites.append(TLS_RSA_WITH_RC4_128_MD5) @@ -1180,10 +1259,14 @@ def getCertSuites(cls, settings, version=None): dheCertSuites.append(TLS_DHE_RSA_WITH_CHACHA20_POLY1305_draft_00) dheCertSuites.append(TLS_DHE_RSA_WITH_AES_256_GCM_SHA384) dheCertSuites.append(TLS_DHE_RSA_WITH_AES_128_GCM_SHA256) + dheCertSuites.append(TLS_DHE_RSA_WITH_AES_256_CCM) + dheCertSuites.append(TLS_DHE_RSA_WITH_AES_128_CCM) dheCertSuites.append(TLS_DHE_RSA_WITH_AES_256_CBC_SHA256) dheCertSuites.append(TLS_DHE_RSA_WITH_AES_128_CBC_SHA256) dheCertSuites.append(TLS_DHE_RSA_WITH_AES_256_CBC_SHA) dheCertSuites.append(TLS_DHE_RSA_WITH_AES_128_CBC_SHA) + dheCertSuites.append(TLS_DHE_RSA_WITH_AES_256_CCM_8) + dheCertSuites.append(TLS_DHE_RSA_WITH_AES_128_CCM_8) dheCertSuites.append(TLS_DHE_RSA_WITH_3DES_EDE_CBC_SHA) @classmethod @@ -1219,11 +1302,15 @@ def getEcdheCertSuites(cls, settings, version=None): ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_draft_00) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256) + ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CCM) + ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CCM) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA) + ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8) + ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_RC4_128_SHA) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_NULL_SHA) @@ -1274,6 +1361,14 @@ def canonicalCipherName(ciphersuite): return "aes128gcm" elif ciphersuite in CipherSuite.aes256GcmSuites: return "aes256gcm" + elif ciphersuite in CipherSuite.aes128Ccm_8Suites: + return "aes128ccm_8" + elif ciphersuite in CipherSuite.aes128CcmSuites: + return "aes128ccm" + elif ciphersuite in CipherSuite.aes256CcmSuites: + return "aes256ccm" + elif ciphersuite in CipherSuite.aes256Ccm_8Suites: + return "aes256ccm_8" elif ciphersuite in CipherSuite.aes128Suites: return "aes128" elif ciphersuite in CipherSuite.aes256Suites: diff --git a/tlslite/handshakesettings.py b/tlslite/handshakesettings.py index 2c0c501a5..1f3bea7de 100644 --- a/tlslite/handshakesettings.py +++ b/tlslite/handshakesettings.py @@ -14,9 +14,11 @@ CIPHER_NAMES = ["chacha20-poly1305", "aes256gcm", "aes128gcm", + "aes256ccm", "aes128ccm", "aes256", "aes128", "3des"] ALL_CIPHER_NAMES = CIPHER_NAMES + ["chacha20-poly1305_draft00", + "aes128ccm_8", "aes256ccm_8", "rc4", "null"] # Don't allow "md5" by default MAC_NAMES = ["sha", "sha256", "sha384", "aead"] @@ -45,7 +47,8 @@ "secp192r1": ('NIST192p', 'P-192'), "secp224r1": ('NIST224p', 'P-224')} KNOWN_VERSIONS = ((3, 0), (3, 1), (3, 2), (3, 3), (3, 4)) -TICKET_CIPHERS = ["chacha20-poly1305", "aes256gcm", "aes128gcm"] +TICKET_CIPHERS = ["chacha20-poly1305", "aes256gcm", "aes128gcm", "aes128ccm", + "aes128ccm_8", "aes256ccm", "aes256ccm_8"] PSK_MODES = ["psk_dhe_ke", "psk_ke"] diff --git a/tlslite/recordlayer.py b/tlslite/recordlayer.py index 9d7cc2a98..eace62622 100644 --- a/tlslite/recordlayer.py +++ b/tlslite/recordlayer.py @@ -21,8 +21,8 @@ from .utils import tlshashlib as hashlib from .constants import ContentType, CipherSuite from .messages import RecordHeader3, RecordHeader2, Message -from .utils.cipherfactory import createAESGCM, createAES, createRC4, \ - createTripleDES, createCHACHA20 +from .utils.cipherfactory import createAESCCM, createAESCCM_8, createAESGCM,\ + createAES, createRC4, createTripleDES, createCHACHA20 from .utils.codec import Parser, Writer from .utils.compat import compatHMAC from .utils.cryptomath import getRandomBytes, MD5, HKDF_expand_label @@ -1014,6 +1014,22 @@ def _getCipherSettings(cipherSuite): keyLength = 16 ivLength = 4 createCipherFunc = createAESGCM + elif cipherSuite in CipherSuite.aes256Ccm_8Suites: + keyLength = 32 + ivLength = 4 + createCipherFunc = createAESCCM_8 + elif cipherSuite in CipherSuite.aes256CcmSuites: + keyLength = 32 + ivLength = 4 + createCipherFunc = createAESCCM + elif cipherSuite in CipherSuite.aes128Ccm_8Suites: + keyLength = 16 + ivLength = 4 + createCipherFunc = createAESCCM_8 + elif cipherSuite in CipherSuite.aes128CcmSuites: + keyLength = 16 + ivLength = 4 + createCipherFunc = createAESCCM elif cipherSuite in CipherSuite.chacha20Suites: keyLength = 32 ivLength = 12 diff --git a/tlslite/tlsconnection.py b/tlslite/tlsconnection.py index 8ca80d5dd..9891a2b44 100644 --- a/tlslite/tlsconnection.py +++ b/tlslite/tlsconnection.py @@ -37,7 +37,8 @@ ECDHE_RSAKeyExchange, SRPKeyExchange, ADHKeyExchange, \ AECDHKeyExchange, FFDHKeyExchange, ECDHKeyExchange from .handshakehelpers import HandshakeHelpers -from .utils.cipherfactory import createAESGCM, createCHACHA20 +from .utils.cipherfactory import createAESCCM, createAESCCM_8, \ + createAESGCM, createCHACHA20 class TLSConnection(TLSRecordLayer): """ @@ -2348,6 +2349,10 @@ def _serverSendTickets(self, settings): if settings.ticketCipher in ("aes128gcm", "aes256gcm"): cipher = createAESGCM(key, settings.cipherImplementations) + elif settings.ticketCipher in ("aes128ccm", "aes256ccm"): + cipher = createAESCCM(key, settings.cipherImplementations) + elif settings.ticketCipher in ("aes128ccm_8", "aes256ccm_8"): + cipher = createAESCCM_8(key, settings.cipherImplementations) else: assert settings.ticketCipher == "chacha20-poly1305" cipher = createCHACHA20(key, @@ -2382,6 +2387,10 @@ def _tryDecrypt(self, settings, identity): key, iv = self._derive_key_iv(nonce, user_key, settings) if settings.ticketCipher in ("aes128gcm", "aes256gcm"): cipher = createAESGCM(key, settings.cipherImplementations) + elif settings.ticketCipher in ("aes128ccm", "aes256ccm"): + cipher = createAESCCM(key, settings.cipherImplementations) + elif settings.ticketCipher in ("aes128ccm_8", "aes256ccm_8"): + cipher = createAESCCM_8(key, settings.cipherImplementations) else: assert settings.ticketCipher == "chacha20-poly1305" cipher = createCHACHA20(key, settings.cipherImplementations) diff --git a/tlslite/utils/cipherfactory.py b/tlslite/utils/cipherfactory.py index 06e92d4de..ef418ace4 100644 --- a/tlslite/utils/cipherfactory.py +++ b/tlslite/utils/cipherfactory.py @@ -7,6 +7,7 @@ from tlslite.utils import python_aes from tlslite.utils import python_aesgcm +from tlslite.utils import python_aesccm from tlslite.utils import python_chacha20_poly1305 from tlslite.utils import python_rc4 from tlslite.utils import python_tripledes @@ -74,6 +75,44 @@ def createAESGCM(key, implList=None): return python_aesgcm.new(key) raise NotImplementedError() +def createAESCCM(key, implList=None): + """ Create a new AESCCM object. + + :type key: bytearray + :param key: A 16 or 32 byte byte array to serve as key. + + :rtype: tlslite.utils.AESCCM + :returns: An AESCCM object. + """ + + if implList is None: + implList = ["python"] + + for impl in implList: + if impl == "python": + return python_aesccm.new(key) + + raise NotImplementedError() + +def createAESCCM_8(key, implList=None): + """ Create a new AESCCM object with truncated tag. + + :type key: bytearray + :param key: A 16 or 32 byte byte array to serve as key. + + :rtype: tlslite.utils.AESCCM + :returns: An AESCCM object. + """ + + if implList is None: + implList = ["python"] + + for impl in implList: + if impl == "python": + return python_aesccm.new(key, 8) + + raise NotImplementedError() + def createCHACHA20(key, implList=None): """Create a new CHACHA20_POLY1305 object. diff --git a/tlslite/utils/python_aesccm.py b/tlslite/utils/python_aesccm.py new file mode 100644 index 000000000..9bb6fee4e --- /dev/null +++ b/tlslite/utils/python_aesccm.py @@ -0,0 +1,10 @@ +# Author: Ivan Nikolchev +# See the LICENSE file for legal information regarding use of this file. + +""" Pure Python AES-CCM implementation.""" + +from tlslite.utils.aesccm import AESCCM +from tlslite.utils.rijndael import Rijndael + +def new(key, tagLength=16): + return AESCCM(key, "python", Rijndael(key, 16).encrypt, tagLength) diff --git a/unit_tests/test_tlslite_constants.py b/unit_tests/test_tlslite_constants.py index af1602599..ab2e6847d 100644 --- a/unit_tests/test_tlslite_constants.py +++ b/unit_tests/test_tlslite_constants.py @@ -180,7 +180,8 @@ def test_getTLS13Suites(self): self.assertEqual(CipherSuite.getTLS13Suites(hs), [CipherSuite.TLS_AES_256_GCM_SHA384, CipherSuite.TLS_AES_128_GCM_SHA256, - CipherSuite.TLS_CHACHA20_POLY1305_SHA256]) + CipherSuite.TLS_CHACHA20_POLY1305_SHA256, + CipherSuite.TLS_AES_128_CCM_SHA256]) def test_getTLS13Suites_with_TLS1_2(self): hs = HandshakeSettings() diff --git a/unit_tests/test_tlslite_utils_aesccm.py b/unit_tests/test_tlslite_utils_aesccm.py new file mode 100644 index 000000000..b77461661 --- /dev/null +++ b/unit_tests/test_tlslite_utils_aesccm.py @@ -0,0 +1,350 @@ +# compatibility with Python 2.6, for that we need unittest2 package, +# which is not available on 3.3 or 3.4 +try: + import unittest2 as unittest +except ImportError: + import unittest + +from tlslite.utils.rijndael import Rijndael +from tlslite.utils.aesccm import AESCCM +from tlslite.utils.cipherfactory import createAESCCM, createAESCCM_8 + + +class TestAESCCM(unittest.TestCase): + def test___init__128(self): + key = bytearray(16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + self.assertIsNotNone(aesCCM) + self.assertEqual(aesCCM.name, "aes128ccm") + + def test___init__128_8(self): + key = bytearray(16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt, 8) + + self.assertIsNotNone(aesCCM) + self.assertEqual(aesCCM.name, "aes128ccm_8") + + def test___init__256(self): + key = bytearray(32) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + self.assertIsNotNone(aesCCM) + self.assertEqual(aesCCM.name, "aes256ccm") + + def test___init__256_8(self): + key = bytearray(32) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt, 8) + + self.assertIsNotNone(aesCCM) + self.assertEqual(aesCCM.name, "aes256ccm_8") + + def test___init___with_invalid_key(self): + key = bytearray(8) + + with self.assertRaises(AssertionError): + aesCCM = AESCCM(key, "python", Rijndael(bytearray(16), 16).encrypt) + + def test_default_implementation(self): + key = bytearray(16) + + aesCCM = createAESCCM(key) + self.assertEqual(aesCCM.implementation, "python") + + def test_default_implementation_small_tag(self): + key = bytearray(16) + + aesCCM = createAESCCM_8(key) + self.assertEqual(aesCCM.implementation, "python") + + def test_seal(self): + key = bytearray(b'\x01'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*12) + + plaintext = bytearray(b'text to encrypt.') + self.assertEqual(len(plaintext), 16) + + encData = aesCCM.seal(nonce, plaintext, bytearray(0)) + + self.assertEqual(bytearray(b'%}Q.\x99\xa3\r\xae\xcbMc\xf2\x16,^\xff' + b'\xa0I\x8e\xf9\xc9F>\xbf\xa4\x00Y\x02p' + b'\xe3\xb8\xa2'), encData) + + def test_seal_256(self): + key = bytearray(b'\x01'*32) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*12) + + plaintext = bytearray(b'text to encrypt.') + self.assertEqual(len(plaintext), 16) + + encData = aesCCM.seal(nonce, plaintext, bytearray(0)) + + self.assertEqual(bytearray(b'IN\x1c\x06\xb8\x0b9SD<\xf8RL' + b'\xb4,=\xd6&d\xae^1\xf8\xbf' + b'\xfa8D\x98\xdd\x14\xb51'), encData) + + def test_seal_small_tag(self): + key = bytearray(b'\x01'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt, 8) + + nonce = bytearray(b'\x02'*12) + + plaintext = bytearray(b'text to encrypt.') + self.assertEqual(len(plaintext), 16) + + encData = aesCCM.seal(nonce, plaintext, bytearray(0)) + + self.assertEqual(bytearray(b'%}Q.\x99\xa3\r\xae\xcbMc\xf2\x16,^\xff' + b'\x14\xb8-?\x7f\xac\x8bI'), encData) + + def test_seal_256_small_tag(self): + key = bytearray(b'\x01'*32) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt, 8) + + nonce = bytearray(b'\x02'*12) + + plaintext = bytearray(b'text to encrypt.') + self.assertEqual(len(plaintext), 16) + + encData = aesCCM.seal(nonce, plaintext, bytearray(0)) + + self.assertEqual(bytearray(b'IN\x1c\x06\xb8\x0b9SD<\xf8RL' + b'\xb4,=\xa2\x91\x84j1*\x0f\xeb'), encData) + + def test_seal_with_invalid_nonce(self): + key = bytearray(b'\x01'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*11) + + plaintext = bytearray(b'text to encrypt.') + self.assertEqual(len(plaintext), 16) + + with self.assertRaises(ValueError) as err: + aesCCM.seal(nonce, plaintext, bytearray(0)) + self.assertEqual("Bad nonce length", str(err.exception)) + + def test_open(self): + key = bytearray(b'\x01'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*12) + + ciphertext = bytearray(b'%}Q.\x99\xa3\r\xae\xcbMc\xf2\x16,^\xff\xa0I' + b'\x8e\xf9\xc9F>\xbf\xa4\x00Y\x02p\xe3\xb8\xa2') + + plaintext = aesCCM.open(nonce, ciphertext, bytearray(0)) + + self.assertEqual(plaintext, bytearray(b'text to encrypt.')) + + def test_open_256(self): + key = bytearray(b'\x01'*32) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*12) + + ciphertext = bytearray(b'IN\x1c\x06\xb8\x0b9SD<\xf8RL' + b'\xb4,=\xd6&d\xae^1\xf8\xbf' + b'\xfa8D\x98\xdd\x14\xb51') + + plaintext = aesCCM.open(nonce, ciphertext, bytearray(0)) + + self.assertEqual(plaintext, bytearray(b'text to encrypt.')) + + def test_open_small_tag(self): + key = bytearray(b'\x01'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt, 8) + + nonce = bytearray(b'\x02'*12) + + ciphertext = bytearray(b'%}Q.\x99\xa3\r\xae\xcbMc\xf2\x16,^\xff\x14' + b'\xb8-?\x7f\xac\x8bI') + + plaintext = aesCCM.open(nonce, ciphertext, bytearray(0)) + + self.assertEqual(plaintext, bytearray(b'text to encrypt.')) + + def test_open_256_small_tag(self): + key = bytearray(b'\x01'*32) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt, 8) + + nonce = bytearray(b'\x02'*12) + + ciphertext = bytearray(b'IN\x1c\x06\xb8\x0b9SD<\xf8RL' + b'\xb4,=\xa2\x91\x84j1*\x0f\xeb') + plaintext = aesCCM.open(nonce, ciphertext, bytearray(0)) + + self.assertEqual(plaintext, bytearray(b'text to encrypt.')) + + def test_open_with_incorrect_key(self): + key = bytearray(b'\x01'*15 + b'\x00') + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*12) + + ciphertext = bytearray( + b'\'\x81h\x17\xe6Z)\\\xf2\x8emF\xcb\x91\x0eu' + b'z1:\xf6}\xa7\\@\xba\x11\xd8r\xdf#K\xd4') + + plaintext = aesCCM.open(nonce, ciphertext, bytearray(0)) + + self.assertIsNone(plaintext) + + def test_open_with_incorrect_nonce(self): + key = bytearray(b'\x01'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*11 + b'\x01') + + ciphertext = bytearray( + b'\'\x81h\x17\xe6Z)\\\xf2\x8emF\xcb\x91\x0eu' + b'z1:\xf6}\xa7\\@\xba\x11\xd8r\xdf#K\xd4') + + plaintext = aesCCM.open(nonce, ciphertext, bytearray(0)) + + self.assertIsNone(plaintext) + + def test_open_with_invalid_nonce(self): + key = bytearray(b'\x01'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*11) + + ciphertext = bytearray( + b'\'\x81h\x17\xe6Z)\\\xf2\x8emF\xcb\x91\x0eu' + b'z1:\xf6}\xa7\\@\xba\x11\xd8r\xdf#K\xd4') + + with self.assertRaises(ValueError) as err: + aesCCM.open(nonce, ciphertext, bytearray(0)) + self.assertEqual("Bad nonce length", str(err.exception)) + + def test_open_with_invalid_ciphertext(self): + key = bytearray(b'\x01'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x02'*12) + + ciphertext = bytearray( + b'\xff'*15) + + self.assertIsNone(aesCCM.open(nonce, ciphertext, bytearray(0))) + + def test_seal_with_test_vector_1(self): + key = bytearray(b'\x00'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x00'*12) + + plaintext = bytearray(b'') + self.assertEqual(len(plaintext), 0) + + encData = aesCCM.seal(nonce, plaintext, bytearray(0)) + self.assertEqual(bytearray(b'\xb9\xf6P\xfb<9\xbb\x1b\xee\x0e)\x1d3' + b'\xf6\xae('), encData) + + def test_seal_with_test_vector_2(self): + key = bytearray(b'\x00'*16) + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\x00'*12) + + plaintext = bytearray(b'\x00'*16) + self.assertEqual(len(plaintext), 16) + + encData = aesCCM.seal(nonce, plaintext, bytearray(0)) + + self.assertEqual(bytearray(b'n\xc7_\xb2\xe2\xb4\x87F\x1e\xdd\xcb\xb8' + b'\x97\x11\x92\xbaMO\xa3\xaf\x0b\xf6\xd3E' + b'Aq0o\xfa\xdd\x9a\xfd'), encData) + + def test_seal_with_test_vector_3(self): + key = bytearray(b'\xfe\xff\xe9\x92\x86\x65\x73\x1c' + b'\x6d\x6a\x8f\x94\x67\x30\x83\x08') + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\xca\xfe\xba\xbe\xfa\xce\xdb\xad\xde\xca\xf8\x88') + + plaintext = bytearray(b'\xd9\x31\x32\x25\xf8\x84\x06\xe5' + b'\xa5\x59\x09\xc5\xaf\xf5\x26\x9a' + b'\x86\xa7\xa9\x53\x15\x34\xf7\xda' + b'\x2e\x4c\x30\x3d\x8a\x31\x8a\x72' + b'\x1c\x3c\x0c\x95\x95\x68\x09\x53' + b'\x2f\xcf\x0e\x24\x49\xa6\xb5\x25' + b'\xb1\x6a\xed\xf5\xaa\x0d\xe6\x57' + b'\xba\x63\x7b\x39\x1a\xaf\xd2\x55') + + self.assertEqual(len(plaintext), 4*16) + + encData = aesCCM.seal(nonce, plaintext, bytearray(0)) + + self.assertEqual(bytearray(b"\x08\x93\xe9K\x91H\x80\x1a\xf0\xf74&" + b"\xab\xb0\x0e<\xa4\x9b\xf0\x9dy\xa2" + b"\x01\'\xa7\xeb\x19&\xfa\x89\x057\x87" + b"\xff\x02\xd0}q\x81;\x88[\x85\xe7\xf9" + b"lN\xed\xf4 \xdb\x12j\x04Q\xce\x13\xbdA" + b"\xba\x01\x8d\x1b\xa7\xfc\xece\x99Dg\xa7" + b"{\x8b&B\xde\x91,\x01."), encData) + + def test_seal_with_test_vector_4(self): + key = bytearray(b'\xfe\xff\xe9\x92\x86\x65\x73\x1c' + + b'\x6d\x6a\x8f\x94\x67\x30\x83\x08') + + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(b'\xca\xfe\xba\xbe\xfa\xce\xdb\xad\xde\xca\xf8\x88') + + plaintext = bytearray(b'\xd9\x31\x32\x25\xf8\x84\x06\xe5' + b'\xa5\x59\x09\xc5\xaf\xf5\x26\x9a' + b'\x86\xa7\xa9\x53\x15\x34\xf7\xda' + b'\x2e\x4c\x30\x3d\x8a\x31\x8a\x72' + b'\x1c\x3c\x0c\x95\x95\x68\x09\x53' + b'\x2f\xcf\x0e\x24\x49\xa6\xb5\x25' + b'\xb1\x6a\xed\xf5\xaa\x0d\xe6\x57' + b'\xba\x63\x7b\x39') + + data = bytearray(b'\xfe\xed\xfa\xce\xde\xad\xbe\xef' + b'\xfe\xed\xfa\xce\xde\xad\xbe\xef' + b'\xab\xad\xda\xd2') + + encData = aesCCM.seal(nonce, plaintext, data) + + self.assertEqual(bytearray(b'\x08\x93\xe9K\x91H\x80\x1a\xf0\xf74&\xab' + b'\xb0\x0e<\xa4\x9b\xf0\x9dy\xa2\x01\'\xa7' + b'\xeb\x19&\xfa\x89\x057\x87\xff\x02\xd0}q' + b'\x81;\x88[\x85\xe7\xf9lN\xed\xf4 \xdb' + b'\x12j\x04Q\xce\x13\xbdA\xba\x028\xc3&' + b'\xb4{4\xf7\x8fe\x9eu' + b'\x10\x96\xcd"'), encData) + + def test_seal_with_test_vector_5(self): + key = bytearray(32) + + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(12) + plaintext = bytearray(0) + data = bytearray(0) + + encData = aesCCM.seal(nonce, plaintext, data) + + self.assertEqual(bytearray(b'\xa8\x90&^C\xa2hU\xf2i' + b'\xb9?\xf4\xdd\xde\xf6'), encData) + + def test_seal_with_test_vector_6(self): + key = bytearray(32) + + aesCCM = AESCCM(key, "python", Rijndael(key, 16).encrypt) + + nonce = bytearray(12) + plaintext = bytearray(16) + data = bytearray(0) + + encData = aesCCM.seal(nonce, plaintext, data) + + self.assertEqual(bytearray(b'\xc1\x94@D\xc8\xe7\xaa\x95\xd2\xde\x95' + b'\x13\xc7\xf3\xdd\x8cK\n>^Q\xf1Q\xeb\x0f' + b'\xfa\xe7\xc4=\x01\x0f\xdb'), encData) From 49865c5bfe087a8bc390e636c17c213c4985815b Mon Sep 17 00:00:00 2001 From: Ivan Nikolchev Date: Thu, 7 Nov 2019 15:25:37 +0100 Subject: [PATCH 3/3] Remove python 3.2 from Travis --- .travis.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index c69e5e91b..72edf8a37 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,6 @@ addons: python: - 2.6 - 2.7 - - 3.2 - 3.3 - 3.4 - 3.5 @@ -136,14 +135,13 @@ script: fi - make test-local # --appends is supported only in the new coverage (>4) - - if [[ $TRAVIS_PYTHON_VERSION != '3.2' ]]; then coverage combine --append; fi + - coverage combine --append - coverage report -m - ./setup.py install - make test # pylint doesn't work on 2.6: https://bitbucket.org/logilab/pylint/issue/390/py26-compatiblity-broken - # diff-quality doesn't work on 3.2: https://github.com/edx/diff-cover/issues/94 - | - if [[ $TRAVIS_PYTHON_VERSION != '2.6' ]] && [[ $TRAVIS_PYTHON_VERSION != '3.2' ]]; then + if [[ $TRAVIS_PYTHON_VERSION != '2.6' ]]; then pylint --msg-template="{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}" tlslite > pylint_report.txt || : diff-quality --violations=pylint --fail-under=90 pylint_report.txt fi