From bcd8a454b7ab53ac6f531a9f8d88061217ff9d67 Mon Sep 17 00:00:00 2001 From: Paul Lettich Date: Tue, 5 Mar 2019 16:38:32 +0100 Subject: [PATCH 1/4] Migrate from PyCrypto to pyca/cryptography Part 1: AES - remove all AES encryption/decryption based on `PyCrypto` - add only one AES (CBC mode) encryption/decryption function which uses `pyca/cryptography` without padding - renamed yubikey AES (ECB mode) decryption function. - currently the (un)padding must be done in the calling functions - removed the `security/pkcs11.py` module since it is not used. - refactored some parameter names --- privacyidea/lib/crypto.py | 137 ++++++++++-------- privacyidea/lib/importotp.py | 10 +- privacyidea/lib/security/aeshsm.py | 14 +- privacyidea/lib/security/default.py | 80 +++++------ privacyidea/lib/security/pkcs11.py | 188 ------------------------- privacyidea/lib/subscriptions.py | 4 +- privacyidea/lib/tokens/yubikeytoken.py | 2 +- tests/test_lib_crypto.py | 6 +- 8 files changed, 134 insertions(+), 307 deletions(-) delete mode 100644 privacyidea/lib/security/pkcs11.py diff --git a/privacyidea/lib/crypto.py b/privacyidea/lib/crypto.py index 14d3eda561..4dc10e0129 100644 --- a/privacyidea/lib/crypto.py +++ b/privacyidea/lib/crypto.py @@ -53,8 +53,8 @@ import binascii import six import ctypes -from Crypto.Hash import SHA256 as HashFunc -from Crypto.Cipher import AES +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from Crypto.PublicKey import RSA import base64 try: @@ -71,7 +71,8 @@ from privacyidea.lib.error import HSMException from privacyidea.lib.framework import (get_app_local_store, get_app_config_value, get_app_config) -from privacyidea.lib.utils import to_unicode, to_bytes, hexlify_and_unicode, b64encode_and_unicode +from privacyidea.lib.utils import (to_unicode, to_bytes, hexlify_and_unicode, + b64encode_and_unicode) if not PY2: long = int @@ -107,16 +108,18 @@ def hmac_digest(self, data_input, hash_algo): self._clearKey_(preserve=self.preserve) return h - def aes_decrypt(self, data_input): + def aes_ecb_decrypt(self, enc_data): ''' - support inplace aes decryption for the yubikey + support inplace aes decryption for the yubikey (mode ECB) - :param data_input: data, that should be decrypted + :param enc_data: data, that should be decrypted :return: the decrypted data ''' self._setupKey_() - aes = AES.new(self.bkey, AES.MODE_ECB) - msg_bin = aes.decrypt(data_input) + backend = default_backend() + cipher = Cipher(algorithms.AES(self.bkey), modes.ECB(), backend=backend) + decryptor = cipher.decryptor() + msg_bin = decryptor.update(enc_data) + decryptor.finalize() self._clearKey_(preserve=self.preserve) return msg_bin @@ -307,70 +310,75 @@ def decryptPin(cryptPin): @log_with(log, log_entry=False) -def encrypt(data, iv, id=0): - ''' +def encrypt(data, iv, key_id=0): + """ encrypt a variable from the given input with an initialisation vector - :param data: buffer, which contains the value - :type data: bytes or str - :param iv: initialisation vector - :type iv: bytes or str - :param id: contains the key id of the keyset which should be used - :type id: int - :return: encrypted and hexlified data + :param data: buffer, which contains the value + :type data: bytes or str + :param iv: initialisation vector + :type iv: bytes or str + :param key_id: contains the key id of the keyset which should be used + :type key_id: int + :return: encrypted and hexlified data :rtype: str - - ''' + """ hsm = get_hsm() - ret = hsm.encrypt(to_bytes(data), to_bytes(iv), id) + ret = hsm.encrypt(to_bytes(data), to_bytes(iv), key_id=key_id) return hexlify_and_unicode(ret) @log_with(log, log_exit=False) -def decrypt(input, iv, id=0): - ''' - decrypt a variable from the given input with an initialiation vector - - :param input: buffer, which contains the crypted value - :type input: bytes or str - :param iv: initialisation vector - :type iv: bytes or str - :param id: contains the key id of the keyset which should be used - :type id: int - :return: decrypted buffer +def decrypt(enc_data, iv, key_id=0): + """ + decrypt a variable from the given input with an initialisation vector + + :param enc_data: buffer, which contains the crypted value + :type enc_data: bytes or str + :param iv: initialisation vector + :type iv: bytes or str + :param key_id: contains the key id of the keyset which should be used + :type key_id: int + :return: decrypted buffer :rtype: bytes - ''' + """ hsm = get_hsm() - res = hsm.decrypt(to_bytes(input), to_bytes(iv), id) + res = hsm.decrypt(to_bytes(enc_data), to_bytes(iv), key_id=key_id) return res @log_with(log, log_exit=False) -def aes_decrypt(key, iv, cipherdata, mode=AES.MODE_CBC): +def aes_cbc_decrypt(key, iv, enc_data): """ - Decrypts the given cipherdata with the key/iv. + Decrypts the given cipherdata with AES (CBC Mode) using the key/iv. + + Attention: This function returns the decrypted data as is, without removing + any padding. The calling function must take care of this! :param key: The encryption key :type key: bytes :param iv: The initialization vector :type iv: bytes - :param cipherdata: The cipher text - :type cipherdata: binary string + :param enc_data: The cipher text + :type enc_data: binary string :param mode: The AES MODE :return: plain text in binary data :rtype: bytes """ - aes = AES.new(key, mode, iv) - output = aes.decrypt(cipherdata) - padding = six.indexbytes(output, len(output) - 1) - # remove padding - output = output[0:-padding] + backend = default_backend() + mode = modes.CBC(iv) + cipher = Cipher(algorithms.AES(key), mode=mode, backend=backend) + decryptor = cipher.decryptor() + output = decryptor.update(enc_data) + decryptor.finalize() return output -def aes_encrypt(key, iv, data, mode=AES.MODE_CBC): +def aes_cbc_encrypt(key, iv, data): """ - encrypts the given data with key/iv + encrypts the given data with AES (CBC Mode) using key/iv. + + Attention: This function expects correctly padded input data (multiple of + AES block size). The calling function must take care of this! :param key: The encryption key :type key: binary string @@ -382,11 +390,13 @@ def aes_encrypt(key, iv, data, mode=AES.MODE_CBC): :return: plain text in binary data :rtype: bytes """ - aes = AES.new(key, mode, iv) - # pad data - num_pad = aes.block_size - (len(data) % aes.block_size) - data = data + six.int2byte(num_pad) * num_pad - output = aes.encrypt(data) + assert len(data) % (algorithms.AES.block_size // 8) == 0 + # do the encryption + backend = default_backend() + mode = modes.CBC(iv) + cipher = Cipher(algorithms.AES(key), mode=mode, backend=backend) + encryptor = cipher.encryptor() + output = encryptor.update(data) + encryptor.finalize() return output @@ -403,25 +413,34 @@ def aes_encrypt_b64(key, data): :return: base64 encrypted output, containing IV and encrypted data :rtype: str """ + # pad data + block_size = algorithms.AES.block_size // 8 + num_pad = block_size - (len(data) % block_size) + data = data + six.int2byte(num_pad) * num_pad iv = geturandom(16) - encdata = aes_encrypt(key, iv, data) + encdata = aes_cbc_encrypt(key, iv, data) return b64encode_and_unicode(iv + encdata) -def aes_decrypt_b64(key, data_b64): +def aes_decrypt_b64(key, enc_data_b64): """ This function decrypts base64 encoded data (containing the IV) using AES-128-CBC. Used for PSKC :param key: binary key - :param data_b64: base64 encoded data (IV + encdata) - :type data_b64: str + :param enc_data_b64: base64 encoded data (IV + encdata) + :type enc_data_b64: str :return: encrypted data """ - data_bin = base64.b64decode(data_b64) + data_bin = base64.b64decode(enc_data_b64) iv = data_bin[:16] encdata = data_bin[16:] - output = aes_decrypt(key, iv, encdata) + output = aes_cbc_decrypt(key, iv, encdata) + + # remove padding + padding = six.indexbytes(output, len(output) - 1) + output = output[0:-padding] + return output @@ -656,10 +675,10 @@ def sign(self, s): s = s.encode('utf8') RSAkey = RSA.importKey(self.private) if SIGN_WITH_RSA: - hashvalue = HashFunc.new(s).digest() + hashvalue = sha256(s).digest() signature = RSAkey.sign(hashvalue, 1) else: - hashvalue = HashFunc.new(s) + hashvalue = sha256(s) signature = pkcs1_15.new(RSAkey).sign(hashvalue) s_signature = str(signature[0]) return s_signature @@ -680,10 +699,10 @@ def verify(self, s, signature): RSAkey = RSA.importKey(self.public) signature = long(signature) if SIGN_WITH_RSA: - hashvalue = HashFunc.new(s).digest() + hashvalue = sha256(s).digest() r = RSAkey.verify(hashvalue, (signature,)) else: - hashvalue = HashFunc.new(s) + hashvalue = sha256(s) pkcs1_15.new(RSAkey).verify(hashvalue, signature) except Exception as _e: # pragma: no cover log.error("Failed to verify signature: {0!r}".format(s)) diff --git a/privacyidea/lib/importotp.py b/privacyidea/lib/importotp.py index 6821d2eea8..84dccf4b41 100644 --- a/privacyidea/lib/importotp.py +++ b/privacyidea/lib/importotp.py @@ -51,12 +51,14 @@ import binascii import base64 import cgi +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + from privacyidea.lib.utils import (modhex_decode, modhex_encode, hexlify_and_unicode, to_unicode, to_utf8) from privacyidea.lib.config import get_token_class from privacyidea.lib.log import log_with from privacyidea.lib.crypto import (aes_decrypt_b64, aes_encrypt_b64, geturandom) -from Crypto.Cipher import AES from bs4 import BeautifulSoup import traceback from passlib.utils.pbkdf2 import pbkdf2 @@ -75,8 +77,10 @@ def _create_static_password(key_hex): ''' msg_hex = "000000000000ffffffffffffffff0f2e" msg_bin = binascii.unhexlify(msg_hex) - aes = AES.new(binascii.unhexlify(key_hex), AES.MODE_ECB) - password_bin = aes.encrypt(msg_bin) + cipher = Cipher(algorithms.AES(binascii.unhexlify(key_hex)), + modes.ECB(), default_backend()) + encryptor = cipher.encryptor() + password_bin = encryptor.update(msg_bin) + encryptor.finalize() password = modhex_encode(password_bin) return password diff --git a/privacyidea/lib/security/aeshsm.py b/privacyidea/lib/security/aeshsm.py index 5eec6b6a03..71775d42dc 100644 --- a/privacyidea/lib/security/aeshsm.py +++ b/privacyidea/lib/security/aeshsm.py @@ -207,20 +207,20 @@ def encrypt(self, data, iv, key_id=SecurityModule.TOKEN_KEY): return int_list_to_bytestring(r) - def decrypt(self, data, iv, key_id=SecurityModule.TOKEN_KEY): + def decrypt(self, enc_data, iv, key_id=SecurityModule.TOKEN_KEY): """ :rtype bytes """ - if len(data) == 0: + if len(enc_data) == 0: return bytes("") - log.debug("Decrypting {} bytes with key {}".format(len(data), key_id)) + log.debug("Decrypting {} bytes with key {}".format(len(enc_data), key_id)) m = PyKCS11.Mechanism(PyKCS11.CKM_AES_CBC_PAD, iv) retries = 0 while True: try: k = self.key_handles[key_id] - r = self.session.decrypt(k, bytes(data), m) + r = self.session.decrypt(k, bytes(enc_data), m) break except PyKCS11.PyKCS11Error as exx: log.warning(u"Decryption failed: {0!s}".format(exx)) @@ -309,13 +309,13 @@ def create_keys(self): p.setup_module({"password": "12345678"}) # random - iv = p.random(16) + tmp_iv = p.random(16) plain = p.random(128) log.info("random test successful") # generic encrypt / decrypt - cipher = p.encrypt(plain, iv) + cipher = p.encrypt(plain, tmp_iv) assert (plain != cipher) - text = p.decrypt(cipher, iv) + text = p.decrypt(cipher, tmp_iv) assert (text == plain) log.info("generic encrypt/decrypt test successful") diff --git a/privacyidea/lib/security/default.py b/privacyidea/lib/security/default.py index 9e10ef9f0a..b9b7e8ef23 100644 --- a/privacyidea/lib/security/default.py +++ b/privacyidea/lib/security/default.py @@ -41,11 +41,11 @@ import logging import binascii import os -import six -from Crypto.Cipher import AES from hashlib import sha256 -from privacyidea.lib.crypto import geturandom, zerome + +from privacyidea.lib.crypto import (geturandom, zerome, aes_cbc_encrypt, + aes_cbc_decrypt) from privacyidea.lib.error import HSMException from privacyidea.lib.utils import (is_true, to_unicode, to_bytes, hexlify_and_unicode) @@ -61,11 +61,11 @@ def create_key_from_password(password): This is used to encrypt and decrypt the enckey file. :param password: - :type password: bytes + :type password: str or bytes :return: the generated key :rtype: bytes """ - key = sha256(password).digest()[0:32] + key = sha256(to_bytes(password)).digest()[0:32] return key @@ -101,13 +101,13 @@ def random(self, length): "the method : %s " % (fname,)) raise NotImplementedError("Should have been implemented {0!s}".format(fname)) - def encrypt(self, value, iv=None): + def encrypt(self, data, iv, key_id=TOKEN_KEY): fname = 'encrypt' log.error("This is the base class. You should implement " "the method : %s " % (fname,)) raise NotImplementedError("Should have been implemented {0!s}".format(fname)) - def decrypt(self, value, iv=None): + def decrypt(self, enc_data, iv, key_id=TOKEN_KEY): fname = 'decrypt' log.error("This is the base class. You should implement " "the method : %s " % (fname,)) @@ -376,7 +376,7 @@ def random(length=32): """ return os.urandom(length) - def encrypt(self, data, iv, slot_id=SecurityModule.TOKEN_KEY): + def encrypt(self, data, iv, key_id=SecurityModule.TOKEN_KEY): """ security module methods: encrypt @@ -386,9 +386,9 @@ def encrypt(self, data, iv, slot_id=SecurityModule.TOKEN_KEY): :param iv: initialisation vector :type iv: bytes - :param slot_id: slot of the key array. The key file contains 96 + :param key_id: slot of the key array. The key file contains 96 bytes, which are made up of 3 32byte keys. - :type slot_id: int + :type key_id: int :return: encrypted data :rtype: bytes @@ -396,15 +396,15 @@ def encrypt(self, data, iv, slot_id=SecurityModule.TOKEN_KEY): if self.is_ready is False: raise HSMException('setup of security module incomplete') - key = self._get_secret(slot_id) - # convert input to ascii, so we can securely append bin data + key = self._get_secret(key_id) + + # convert input to ascii, so we can securely append bin data for padding input_data = binascii.b2a_hex(data) input_data += b"\x01\x02" padding = (16 - len(input_data) % 16) % 16 input_data += padding * b"\0" - aes = AES.new(key, AES.MODE_CBC, iv) - res = aes.encrypt(input_data) + res = aes_cbc_encrypt(key, iv, input_data) if self.crypted is False: zerome(key) @@ -412,81 +412,71 @@ def encrypt(self, data, iv, slot_id=SecurityModule.TOKEN_KEY): return res @staticmethod - def password_encrypt(text, password): + def password_encrypt(data, password): """ Encrypt the given text with the password. A key is derived from the password and used to encrypt the text in AES MODE_CBC. The IV is returned together with the cipher text. - :param text: The text to encrypt - :type text: str or bytes + :param data: The text to encrypt + :type data: str or bytes :param password: The password to derive a key from :type password: str or bytes :return: IV and cipher text :rtype: str """ - # TODO rewrite this when updating the crypto library (maybe externalise the AES) - # TODO replace with to_bytes() when available in utils.py - if isinstance(password, six.text_type): - password = password.encode('utf8') - if isinstance(text, six.text_type): - text = text.encode('utf8') bkey = create_key_from_password(password) - # convert input to ascii, so we can securely append bin data - input_data = binascii.hexlify(text) + # convert input to ascii, so we can securely append bin data for padding + input_data = binascii.hexlify(to_bytes(data)) input_data += b"\x01\x02" padding = (16 - len(input_data) % 16) % 16 input_data += padding * b"\0" iv = geturandom(16) - aes = AES.new(bkey, AES.MODE_CBC, iv) - cipher = aes.encrypt(input_data) + cipher = aes_cbc_encrypt(bkey, iv, input_data) iv_hex = hexlify_and_unicode(iv) cipher_hex = hexlify_and_unicode(cipher) return "{0!s}:{1!s}".format(iv_hex, cipher_hex) @staticmethod - def password_decrypt(data, password): + def password_decrypt(enc_data, password): """ Decrypt the given data with the password. A key is derived from the password. The data is hexlified data, the IV is the first part, separated with a ":". - :param data: The hexlified data - :type data: str + :param enc_data: The hexlified data + :type enc_data: str :param password: The password, that is used to decrypt the data - :type password: str + :type password: str or bytes :return: The clear test :rtype: bytes """ - # TODO replace with to_bytes() when available in utils.py - if isinstance(password, six.text_type): - password = password.encode('utf8') bkey = create_key_from_password(password) # split the input data - iv_hex, cipher_hex = data.strip().split(":") + iv_hex, cipher_hex = enc_data.strip().split(":") iv_bin = binascii.unhexlify(iv_hex) cipher_bin = binascii.unhexlify(cipher_hex) - aes = AES.new(bkey, AES.MODE_CBC, iv_bin) - output = aes.decrypt(cipher_bin) + output = aes_cbc_decrypt(bkey, iv_bin, cipher_bin) + # remove padding eof = output.rfind(b"\x01\x02") if eof >= 0: output = output[:eof] cleartext = binascii.unhexlify(output) return cleartext - def decrypt(self, input_data, iv, slot_id=SecurityModule.TOKEN_KEY): + def decrypt(self, enc_data, iv, key_id=SecurityModule.TOKEN_KEY): """ Decrypt the given data with the key from the key slot - :param input_data: the to be decrypted data - :type input_data: bytes + :param enc_data: the to be decrypted data + :type enc_data: bytes :param iv: initialisation vector (salt) :type iv: bytes - :param slot_id: slot of the key array - :type slot_id: int + :param key_id: slot of the key array + :type key_id: int :return: decrypted data :rtype: bytes @@ -494,9 +484,9 @@ def decrypt(self, input_data, iv, slot_id=SecurityModule.TOKEN_KEY): if self.is_ready is False: raise HSMException('setup of security module incomplete') - key = self._get_secret(slot_id) - aes = AES.new(key, AES.MODE_CBC, iv) - output = aes.decrypt(input_data) + key = self._get_secret(key_id) + output = aes_cbc_decrypt(key, iv, enc_data) + # remove padding eof = output.rfind(b"\x01\x02") if eof >= 0: output = output[:eof] diff --git a/privacyidea/lib/security/pkcs11.py b/privacyidea/lib/security/pkcs11.py deleted file mode 100644 index 09b8fc8084..0000000000 --- a/privacyidea/lib/security/pkcs11.py +++ /dev/null @@ -1,188 +0,0 @@ -# -*- coding: utf-8 -*- -# -# 2016-04-15 Cornelius Kölbel -# PKCS11 Security Module -# -# This code is free software; you can redistribute it and/or -# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE -# License as published by the Free Software Foundation; either -# version 3 of the License, or any later version. -# -# This code is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU AFFERO GENERAL PUBLIC LICENSE for more details. -# -# You should have received a copy of the GNU Affero General Public -# License along with this program. If not, see . -# -# -__doc__ = """ -This is a PKCS11 Security module that encrypts and decrypts the data on a Smartcard/HSM that is connected via PKCS11. -""" -import logging -from privacyidea.lib.security.default import SecurityModule -from privacyidea.lib.security.password import PASSWORD -from privacyidea.lib.error import HSMException -from Crypto.PublicKey import RSA -from Crypto.Cipher import PKCS1_v1_5 - -log = logging.getLogger(__name__) - -try: - import PyKCS11 -except ImportError: - log.info("The python module PyKCS11 is not available. So we can not use the PKCS11 security module.") - - -# FIXME: At the moment this only works with one (1) wsgi process! - - -def int_list_to_bytestring(int_list): # pragma: no cover - r = "" - for i in int_list: - r += chr(i) - return r - - -class PKCS11SecurityModule(SecurityModule): # pragma: no cover - - def __init__(self, config=None): - """ - Initialize the PKCS11 Security Module. - The configuration needs to contain the pkcs11 module and the ID of the key. - - {"module": "/usr/lib/x86_64-linux-gnu/opensc-pkcs11.so", - "key_id": 10} - - The HSM is not directly ready, since the HSM is protected by a password. - The function setup_module({"password": "HSM User password"}) needs to be called. - - :param config: contains the HSM configuration - :type config: dict - - :return: The Security Module object - """ - config = config or {} - self.name = "PKCS11" - self.config = config - self.is_ready = False - - if "module" not in config: - log.error("No PKCS11 module defined!") - raise HSMException("No PKCS11 module defined.") - self.key_id = config.get("key_id", 16) - self.pkcs11 = PyKCS11.PyKCS11Lib() - self.pkcs11.load(config.get("module")) - self.session = None - self.key_handle = None - - def setup_module(self, params): - """ - callback, which is called during the runtime to initialze the - security module. - - Here the password for the PKCS11 HSM can be provided - - {"password": "top secreT"} - - :param params: The password for the HSM - :type params: dict - - :return: - - """ - if "password" in params: - PASSWORD = str(params.get("password")) - else: - raise HSMException("missing password") - - slotlist = self.pkcs11.getSlotList() - if not len(slotlist): - raise HSMException("No HSM connected") - slot_id = slotlist[0] - # If the HSM is not connected at this point, it will fail - self.session = self.pkcs11.openSession(slot=slot_id) - - slotinfo = self.pkcs11.getSlotInfo(1) - log.info("Setting up {}".format(slotinfo.fields.get("slotDescription"))) - # get the public key - objs = self.session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY), - (PyKCS11.CKA_ID, (self.key_id,))]) - self.key_handle = objs[0] - pubkey_list = self.session.getAttributeValue(self.key_handle, [PyKCS11.CKA_VALUE], allAsBinary=True) - pubkey_bin = int_list_to_bytestring(pubkey_list[0]) - self.rsa_pubkey = RSA.importKey(pubkey_bin) - - # We do a login and logout to see if - self.session.login(PASSWORD) - self.session.logout() - self.is_ready = True - log.info("Successfully setup the security module.") - - return self.is_ready - - def random(self, length): - """ - Return a random bytestring - :param length: length of the random bytestring - :return: - """ - r = '' - r_integers = self.session.generateRandom(length) - # convert the array of the random integers to a string - return int_list_to_bytestring(r_integers) - - def encrypt(self, value, iv=None): - cipher = PKCS1_v1_5.new(self.rsa_pubkey) - encrypted = cipher.encrypt(value) - return encrypted - - def decrypt(self, value, iv=None): - if not PASSWORD: - log.error("empty PASSWORD. Your security module is probably not " - "initialized.") - self.session.login(PASSWORD) - objs = self.session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY), - (PyKCS11.CKA_ID, (self.key_id,))]) - - key_handle_private = objs[0] - text = self.session.decrypt(key_handle_private, value) - text = int_list_to_bytestring(text) - self.session.logout() - return text - - -if __name__ == "__main__": # pragma: no cover - - module = "/usr/local/lib/opensc-pkcs11.so" - #module = "/usr/lib/x86_64-linux-gnu/opensc-pkcs11.so" - - p = PKCS11SecurityModule({"module": module, - "key_id": 17}) - p.setup_module({"password": "123456"}) - - cleartext = "Hello there!" - cipher = p.encrypt(cleartext) - text = p.decrypt(cipher) - print(text) - assert(text == cleartext) - - cleartext = "Hello, this is a really long text and so and and so on..." - cipher = p.encrypt(cleartext) - text = p.decrypt(cipher) - print(text) - assert (text == cleartext) - - # password - password = "topSekr3t" - crypted = p.encrypt_password(password) - text = p.decrypt_password(crypted) - print(text) - assert(text == password) - - # pin - password = "topSekr3t" - crypted = p.encrypt_pin(password) - text = p.decrypt_pin(crypted) - print(text) - assert (text == password) diff --git a/privacyidea/lib/subscriptions.py b/privacyidea/lib/subscriptions.py index 2a72cdf1ee..0ebcbdde8b 100644 --- a/privacyidea/lib/subscriptions.py +++ b/privacyidea/lib/subscriptions.py @@ -28,6 +28,7 @@ import logging import datetime import random +from hashlib import sha256 from .log import log_with from ..models import Subscription from privacyidea.lib.error import SubscriptionError @@ -35,7 +36,6 @@ import functools from privacyidea.lib.framework import get_app_config_value import os -from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA import traceback from sqlalchemy import func @@ -290,7 +290,7 @@ def check_signature(subscription): subscription["date_till"] = subscription.get("date_till").strftime(SUBSCRIPTION_DATE_FORMAT) sign_string = SIGN_FORMAT.format(**subscription) RSAkey = RSA.importKey(public) - hashvalue = SHA256.new(sign_string.encode("utf-8")).digest() + hashvalue = sha256(sign_string.encode("utf-8")).digest() signature = long(subscription.get("signature") or "100") r = RSAkey.verify(hashvalue, (signature,)) subscription["date_from"] = datetime.datetime.strptime( diff --git a/privacyidea/lib/tokens/yubikeytoken.py b/privacyidea/lib/tokens/yubikeytoken.py index 45b35531df..4dcf93126e 100644 --- a/privacyidea/lib/tokens/yubikeytoken.py +++ b/privacyidea/lib/tokens/yubikeytoken.py @@ -262,7 +262,7 @@ def check_otp(self, anOtpVal, counter=None, window=None, options=None): # The OTP value is no yubikey aes otp value and can not be decoded return -4 - msg_bin = secret.aes_decrypt(otp_bin) + msg_bin = secret.aes_ecb_decrypt(otp_bin) msg_hex = hexlify_and_unicode(msg_bin) # The checksum is a CRC-16 (16-bit ISO 13239 1st complement) that diff --git a/tests/test_lib_crypto.py b/tests/test_lib_crypto.py index 7ba67bc409..b9fde3f5e0 100644 --- a/tests/test_lib_crypto.py +++ b/tests/test_lib_crypto.py @@ -36,8 +36,8 @@ def test_00_security_module_base_class(self): self.assertRaises(NotImplementedError, hsm.setup_module, {}) self.assertRaises(NotImplementedError, hsm.random, 20) - self.assertRaises(NotImplementedError, hsm.encrypt, "20") - self.assertRaises(NotImplementedError, hsm.decrypt, "20") + self.assertRaises(NotImplementedError, hsm.encrypt, "20", 'abcd') + self.assertRaises(NotImplementedError, hsm.decrypt, "20", 'abcd') def test_01_default_security_module(self): config = current_app.config @@ -45,6 +45,7 @@ def test_01_default_security_module(self): hsm.setup_module({"file": config.get("PI_ENCFILE")}) self.assertTrue(hsm is not None, hsm) self.assertTrue(hsm.secFile is not None, hsm.secFile) + self.assertTrue(hsm.is_ready) def test_01_no_file_in_config(self): self.assertRaises(Exception, DefaultSecurityModule, {}) @@ -55,6 +56,7 @@ def test_04_random(self): "crypted": True}) r = hsm.random(20) self.assertTrue(len(r) == 20, r) + self.assertFalse(hsm.is_ready) def test_05_encrypt_decrypt(self): config = current_app.config From f2e066a679e83abaccaa9b3eb1897f0fba72a948 Mon Sep 17 00:00:00 2001 From: Paul Lettich Date: Fri, 8 Mar 2019 15:32:04 +0100 Subject: [PATCH 2/4] Fix remarks from review - Switch to PKCS7 padding/unpadding in aes_(encrypt|decrypt)_b64 - Add tests for decrypting values generated with previous versions --- privacyidea/lib/crypto.py | 14 ++++++------ tests/test_lib_crypto.py | 47 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/privacyidea/lib/crypto.py b/privacyidea/lib/crypto.py index 4dc10e0129..986bf048d7 100644 --- a/privacyidea/lib/crypto.py +++ b/privacyidea/lib/crypto.py @@ -55,6 +55,7 @@ import ctypes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives import padding from Crypto.PublicKey import RSA import base64 try: @@ -414,11 +415,10 @@ def aes_encrypt_b64(key, data): :rtype: str """ # pad data - block_size = algorithms.AES.block_size // 8 - num_pad = block_size - (len(data) % block_size) - data = data + six.int2byte(num_pad) * num_pad + padder = padding.PKCS7(algorithms.AES.block_size).padder() + padded_data = padder.update(data) + padder.finalize() iv = geturandom(16) - encdata = aes_cbc_encrypt(key, iv, data) + encdata = aes_cbc_encrypt(key, iv, padded_data) return b64encode_and_unicode(iv + encdata) @@ -435,11 +435,11 @@ def aes_decrypt_b64(key, enc_data_b64): data_bin = base64.b64decode(enc_data_b64) iv = data_bin[:16] encdata = data_bin[16:] - output = aes_cbc_decrypt(key, iv, encdata) + padded_data = aes_cbc_decrypt(key, iv, encdata) # remove padding - padding = six.indexbytes(output, len(output) - 1) - output = output[0:-padding] + unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() + output = unpadder.update(padded_data) + unpadder.finalize() return output diff --git a/tests/test_lib_crypto.py b/tests/test_lib_crypto.py index b9fde3f5e0..9c34a4d86a 100644 --- a/tests/test_lib_crypto.py +++ b/tests/test_lib_crypto.py @@ -145,6 +145,14 @@ def test_00_encrypt_decrypt_pin(self): pin = decryptPin(r) self.assertTrue(pin == "test", (r, pin)) + # decrypt some pins generated with 2.23 + pin1 = 'd2c920ad10513c8ea322b522751185a3:54f068cffb43ada1edd024087da614ec' + self.assertEquals(decryptPin(pin1), 'test') + pin2 = '223f414872122ad112eb9f17b05da0b8:123079d997cd18601414830ab7c97678' + self.assertEquals(decryptPin(pin2), 'test') + pin3 = '4af7590600286becde70b99b10493104:09e4133652c609f9697e1923cde72904' + self.assertEquals(decryptPin(pin3), '1234') + def test_01_encrypt_decrypt_pass(self): r = encryptPassword(u"passwörd".encode('utf8')) # encryptPassword returns unicode @@ -157,6 +165,16 @@ def test_01_encrypt_decrypt_pass(self): pin = decryptPassword(r) self.assertEqual(pin, u"passwörd") + # decrypt some passwords generated with 2.23 + pw1 = '3d1bf9db4c75469b4bb0bc7c70133181:2c27ac3839ed2213b8399d0471b17136' + self.assertEquals(decryptPassword(pw1), 'test123') + pw2 = '3a1be65a234f723fe5c6969b818582e1:08e51d1c65aa74c4988d094c40cb972c' + self.assertEquals(decryptPassword(pw2), 'test123') + pw3 = '7a4d5e2f26978394e33715bc3e8188a3:90b2782112ad7bbc5b48bd10e5c7c096cfe4ef7d9d11272595dc5b6c7f21d98a' + self.assertEquals(decryptPassword(pw3, ), u'passwörd') + + # TODO: add checks for broken paddings/encrypted values and malformed enc_data + not_valid_password = b"\x01\x02\x03\x04\xff" r = encryptPassword(not_valid_password) # A non valid password will raise an exception during decryption @@ -183,6 +201,18 @@ def test_02_encrypt_decrypt_eas_base64(self): d = aes_decrypt_b64(key, s) self.assertEqual(otp_seed, d) + # check some data generated with 2.23 + hex_key = 'f84c2ddb09dee2a88194d5ac2156a8e4' + data = b'secret data' + enc_data = 'WNfUSNBNZF5kaPfujW8ueUi5Afas47pQ/3FHc3VymWM=' + d = aes_decrypt_b64(binascii.unhexlify(hex_key), enc_data) + self.assertEquals(data, d) + enc_data = 'RDDvdAJhCnw/tlYscTxv+6idHAQnQFY5VpUK8SFflYQ=' + d = aes_decrypt_b64(binascii.unhexlify(hex_key), enc_data) + self.assertEquals(data, d) + + # TODO: add checks for broken paddings/encrypted values and malformed enc_data + def test_03_hash(self): import os val = os.urandom(16) @@ -206,6 +236,23 @@ def test_04_encrypt_decrypt_data(self): d = decrypt(binascii.unhexlify(c), iv) self.assertEqual(s, d.decode('utf8')) + # TODO: add checks for broken paddings/encrypted values and malformed enc_data + + # check some data generated with 2.23 + s = u'passwörd'.encode('utf8') + iv_hex = 'cd5245a2875007d30cc049c2e7eca0c5' + enc_data_hex = '7ea55168952b33131077f4249cf9e52b5f2b572214ace13194c436451fe3788c' + self.assertEquals(s, decrypt(binascii.unhexlify(enc_data_hex), + binascii.unhexlify(iv_hex))) + enc_data_hex = 'fb79a04d69e832aec8ffb4bbfe031b3bd28a2840150212d8c819e' \ + '362b1711cc389aed70eaf27af53131ea446095da80e88c4caf791' \ + 'c709e9581ff0a5f1e19228dc4c3c278d148951acaab9a164c1770' \ + '7166134f4ba6111055c65d72771c6f59c2dc150a53753f2cf4c47' \ + 'ec02901022f02a054d1fc7678fd4f66b47967a5d222a' + self.assertEquals(b'\x01\x02' * 30, + decrypt(binascii.unhexlify(enc_data_hex), + binascii.unhexlify(iv_hex))) + def test_05_encode_decode(self): b_str = b'Hello World' self.assertEqual(to_unicode(b_str), b_str.decode('utf8')) From 803fa69b528e2584cfee68bcb16218a61422be2b Mon Sep 17 00:00:00 2001 From: Paul Lettich Date: Mon, 11 Mar 2019 17:06:34 +0100 Subject: [PATCH 3/4] Migrate from PyCrypto to pyca/cryptography Part 2: RSA This commit removes all RSA related functionality based on PyCrypto and replaces them with `pyca/cryptography` The RSA functionality is changed to mitigate cryptographic weaknesses: - All Signatures are calculated using the PSS padding scheme [1] and are prefixed with a version string to identify the type. - Old style textbook-RSA signatures in the audit log can still be validated by setting `PI_CHECK_OLD_SIGNATURES` to `True` in the configuration - The `Sign` Object now accepts only the key data (not the file names) and is possible to use it either with only a private key or a public key - Some tests now check for old style (textbook RSA) signatures. [1] https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa/#signing --- privacyidea/api/lib/postpolicy.py | 18 ++- privacyidea/lib/auditmodules/sqlaudit.py | 26 ++-- privacyidea/lib/crypto.py | 157 ++++++++++++++--------- privacyidea/lib/subscriptions.py | 17 +-- tests/test_api_lib_policy.py | 9 +- tests/test_lib_crypto.py | 68 +++++++++- tests/test_lib_subscriptions.py | 20 ++- 7 files changed, 228 insertions(+), 87 deletions(-) diff --git a/privacyidea/api/lib/postpolicy.py b/privacyidea/api/lib/postpolicy.py index 5a51d74518..3f7954c529 100644 --- a/privacyidea/api/lib/postpolicy.py +++ b/privacyidea/api/lib/postpolicy.py @@ -41,11 +41,10 @@ """ import datetime import logging -log = logging.getLogger(__name__) +import traceback from privacyidea.lib.error import PolicyError from flask import g, current_app, make_response from privacyidea.lib.policy import SCOPE, ACTION, AUTOASSIGNVALUE -from privacyidea.lib.user import get_user_from_param from privacyidea.lib.token import get_tokens, assign_token, get_realms_of_token, get_one_token from privacyidea.lib.machine import get_hostname, get_auth_items from .prepolicy import check_max_token_user, check_max_token_realm @@ -60,6 +59,7 @@ from privacyidea.lib.realm import get_default_realm from privacyidea.lib.subscriptions import subscription_status +log = logging.getLogger(__name__) optional = True required = False @@ -148,9 +148,17 @@ def after_request(response): if current_app.config.get("PI_NO_RESPONSE_SIGN"): return response - priv_file = current_app.config.get("PI_AUDIT_KEY_PRIVATE") - pub_file = current_app.config.get("PI_AUDIT_KEY_PUBLIC") - sign_object = Sign(priv_file, pub_file) + priv_file_name = current_app.config.get("PI_AUDIT_KEY_PRIVATE") + try: + with open(priv_file_name, 'rb') as priv_file: + priv_key = priv_file.read() + sign_object = Sign(priv_key, public_key=None) + except (IOError, ValueError, TypeError) as e: + log.info('Could not load private key from ' + 'file {0!s}: {1!r}!'.format(priv_file_name, e)) + log.debug(traceback.format_exc()) + return response + request.all_data = get_all_params(request.values, request.data) # response can be either a Response object or a Tuple (Response, ErrorID) response_value = 200 diff --git a/privacyidea/lib/auditmodules/sqlaudit.py b/privacyidea/lib/auditmodules/sqlaudit.py index 0670427ff1..313e2a0f89 100755 --- a/privacyidea/lib/auditmodules/sqlaudit.py +++ b/privacyidea/lib/auditmodules/sqlaudit.py @@ -45,23 +45,21 @@ from privacyidea.lib.utils import censor_connect_string from privacyidea.lib.lifecycle import register_finalizer from privacyidea.lib.utils import truncate_comma_list +from privacyidea.lib.framework import get_app_config_value from sqlalchemy import MetaData, cast, String from sqlalchemy import asc, desc, and_, or_ import datetime import traceback from six import string_types - - -log = logging.getLogger(__name__) - -metadata = MetaData() - from privacyidea.models import audit_column_length as column_length -from privacyidea.models import AUDIT_TABLE_NAME as TABLE_NAME from privacyidea.models import Audit as LogEntry from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session +log = logging.getLogger(__name__) + +metadata = MetaData() + class Audit(AuditBase): """ @@ -271,7 +269,16 @@ def read_keys(self, pub, priv): :type priv: string with filename :return: None """ - self.sign_object = Sign(priv, pub) + try: + with open(priv, "rb") as privkey_file: + private_key = privkey_file.read() + with open(pub, 'rb') as pubkey_file: + public_key = pubkey_file.read() + self.sign_object = Sign(private_key, public_key) + except Exception as e: + log.error("Error reading key file: {0!r})".format(e)) + log.debug(traceback.format_exc()) + raise e def _check_missing(self, audit_id): """ @@ -481,9 +488,10 @@ def clear(self): def audit_entry_to_dict(self, audit_entry): sig = None + verify_old_sig = get_app_config_value('PI_CHECK_OLD_SIGNATURES', False) if self.sign_data: sig = self.sign_object.verify(self._log_to_string(audit_entry), - audit_entry.signature) + audit_entry.signature, verify_old_sig) is_not_missing = self._check_missing(int(audit_entry.id)) # is_not_missing = True diff --git a/privacyidea/lib/crypto.py b/privacyidea/lib/crypto.py index 986bf048d7..db5cdf98fa 100644 --- a/privacyidea/lib/crypto.py +++ b/privacyidea/lib/crypto.py @@ -53,20 +53,18 @@ import binascii import six import ctypes + from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives import padding -from Crypto.PublicKey import RSA +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding as asym_padding + import base64 -try: - from Crypto.Signature import pkcs1_15 - SIGN_WITH_RSA = False -except ImportError: - # Bummer the version of PyCrypto has no PKCS1_15 - SIGN_WITH_RSA = True import passlib.hash import traceback -from six import PY2, text_type +from six import PY2 from privacyidea.lib.log import log_with from privacyidea.lib.error import HSMException @@ -632,35 +630,55 @@ def zerome(bufferObject): return +def _slow_rsa_verify_raw(key, sig, msg): + assert isinstance(sig, six.integer_types) + assert isinstance(msg, six.integer_types) + if hasattr(key, 'private_numbers'): + pn = key.private_numbers().public_numbers + elif hasattr(key, 'public_numbers'): + pn = key.public_numbers() + else: + raise TypeError('No public key') + + # compute m**d (mod n) + return msg == pow(sig, pn.e, pn.n) + + class Sign(object): """ Signing class that is used to sign Audit Entries and to sign API responses. """ - def __init__(self, private_file, public_file): + sig_ver = 'rsa_sha256_pss' + + def __init__(self, private_key=None, public_key=None): """ - :param private_file: The privacy Key file - :type private_file: filename - :param public_file: The public key file - :type public_file: filename + :param private_key: The private Key data in PEM format + :type private_key: bytes or None + :param public_key: The public key data in PEM format + :type public_key: bytes or None :return: Sign Object """ - self.private = "" - self.public = "" - try: - f = open(private_file, "r") - self.private = f.read() - f.close() - except Exception as e: - log.error("Error reading private key {0!s}: ({1!r})".format(private_file, e)) - raise e - - try: - f = open(public_file, "r") - self.public = f.read() - f.close() - except Exception as e: - log.error("Error reading public key {0!s}: ({1!r})".format(public_file, e)) - raise e + self.private = None + self.public = None + backend = default_backend() + if private_key: + try: + self.private = serialization.load_pem_private_key(private_key, + password=None, + backend=backend) + except Exception as e: + log.error("Error loading private key: ({0!r})".format(e)) + log.debug(traceback.format_exc()) + raise e + + if public_key: + try: + self.public = serialization.load_pem_public_key(public_key, + backend=backend) + except Exception as e: + log.error("Error loading public key: ({0!r})".format(e)) + log.debug(traceback.format_exc()) + raise e def sign(self, s): """ @@ -668,45 +686,69 @@ def sign(self, s): :param s: String to sign :type s: str - :return: The signature of the string - :rtype: long + :return: The hexlified and versioned signature of the string + :rtype: str """ - if isinstance(s, text_type): - s = s.encode('utf8') - RSAkey = RSA.importKey(self.private) - if SIGN_WITH_RSA: - hashvalue = sha256(s).digest() - signature = RSAkey.sign(hashvalue, 1) - else: - hashvalue = sha256(s) - signature = pkcs1_15.new(RSAkey).sign(hashvalue) - s_signature = str(signature[0]) - return s_signature - - def verify(self, s, signature): + if not self.private: + log.info('Could not sign message {0!s}, no private key!'.format(s)) + return '' + + signature = self.private.sign( + to_bytes(s), + asym_padding.PSS( + mgf=asym_padding.MGF1(hashes.SHA256()), + salt_length=asym_padding.PSS.MAX_LENGTH), + hashes.SHA256()) + res = ':'.join([self.sig_ver, hexlify_and_unicode(signature)]) + return res + + def verify(self, s, signature, verify_old_sigs=False): """ Check the signature of the string s :param s: String to check - :type s: str + :type s: str or bytes :param signature: the signature to compare - :type signature: str + :type signature: str or int + :param verify_old_sigs: whether to check for old style signatures as well + :type verify_old_sigs: bool + :return: True if the signature is valid, false otherwise. + :rtype: bool """ - if isinstance(s, text_type): - s = s.encode('utf8') r = False + if not self.public: + log.info('Could not verify signature for message {0!s}, ' + 'no public key!'.format(s)) + return r + + sver = '' + try: + sver, signature = six.text_type(signature).split(':') + except ValueError: + # if the signature does not contain a colon we assume an old style signature. + pass + try: - RSAkey = RSA.importKey(self.public) - signature = long(signature) - if SIGN_WITH_RSA: - hashvalue = sha256(s).digest() - r = RSAkey.verify(hashvalue, (signature,)) + if sver == self.sig_ver: + self.public.verify( + binascii.unhexlify(signature), + to_bytes(s), + asym_padding.PSS( + mgf=asym_padding.MGF1(hashes.SHA256()), + salt_length=asym_padding.PSS.MAX_LENGTH), + hashes.SHA256()) + r = True else: - hashvalue = sha256(s) - pkcs1_15.new(RSAkey).verify(hashvalue, signature) - except Exception as _e: # pragma: no cover + if verify_old_sigs: + int_s = int(binascii.hexlify(sha256(to_bytes(s)).digest()), 16) + r = _slow_rsa_verify_raw(self.public, int(signature), int_s) + else: + log.debug('Could not verify old style signature {0!s} ' + 'for data {1:s}'.format(signature, s)) + except Exception: log.error("Failed to verify signature: {0!r}".format(s)) log.debug("{0!s}".format(traceback.format_exc())) + return r @@ -728,7 +770,6 @@ def create_hsm_object(config): package_name, class_name = hsm_module_name.rsplit(".", 1) hsm_class = get_module_class(package_name, class_name, "setup_module") log.info("initializing HSM class: {0!s}".format(hsm_class)) - hsm_parameters = {} if class_name == "DefaultSecurityModule": hsm_parameters = {"file": config.get("PI_ENCFILE")} else: diff --git a/privacyidea/lib/subscriptions.py b/privacyidea/lib/subscriptions.py index 0ebcbdde8b..7daf990e21 100644 --- a/privacyidea/lib/subscriptions.py +++ b/privacyidea/lib/subscriptions.py @@ -28,15 +28,14 @@ import logging import datetime import random -from hashlib import sha256 from .log import log_with from ..models import Subscription from privacyidea.lib.error import SubscriptionError from privacyidea.lib.token import get_tokens +from privacyidea.lib.crypto import Sign import functools from privacyidea.lib.framework import get_app_config_value import os -from Crypto.PublicKey import RSA import traceback from sqlalchemy import func from six import PY2, string_types @@ -280,26 +279,24 @@ def check_signature(subscription): dirname = os.path.dirname(enckey) # In dirname we are searching for .pem filename = u"{0!s}/{1!s}.pem".format(dirname, vendor) - with open(filename, "r") as file_handle: - public = file_handle.read() - r = False try: # remove the minutes 00:00:00 subscription["date_from"] = subscription.get("date_from").strftime(SUBSCRIPTION_DATE_FORMAT) subscription["date_till"] = subscription.get("date_till").strftime(SUBSCRIPTION_DATE_FORMAT) sign_string = SIGN_FORMAT.format(**subscription) - RSAkey = RSA.importKey(public) - hashvalue = sha256(sign_string.encode("utf-8")).digest() - signature = long(subscription.get("signature") or "100") - r = RSAkey.verify(hashvalue, (signature,)) + with open(filename, 'rb') as key_file: + sign_obj = Sign(private_key=None, public_key=key_file.read()) + + signature = subscription.get('signature', '100') + r = sign_obj.verify(sign_string, signature, verify_old_sigs=True) subscription["date_from"] = datetime.datetime.strptime( subscription.get("date_from"), SUBSCRIPTION_DATE_FORMAT) subscription["date_till"] = datetime.datetime.strptime( subscription.get("date_till"), SUBSCRIPTION_DATE_FORMAT) - except Exception as exx: + except Exception as _e: log.debug(traceback.format_exc()) raise SubscriptionError("Verifying the signature of your subscription " "failed.", diff --git a/tests/test_api_lib_policy.py b/tests/test_api_lib_policy.py index 0c81551f35..ca980d82ca 100644 --- a/tests/test_api_lib_policy.py +++ b/tests/test_api_lib_policy.py @@ -1850,13 +1850,16 @@ def test_07_sign_response(self): "id": 1} resp = jsonify(res) from privacyidea.lib.crypto import Sign - g.sign_object = Sign("tests/testdata/private.pem", - "tests/testdata/public.pem") + sign_object = Sign(private_key=None, + public_key=open("tests/testdata/public.pem", 'rb').read()) new_response = sign_response(req, resp) jresult = json.loads(new_response.data) self.assertEqual(jresult.get("nonce"), "12345678") - self.assertEqual(jresult.get("signature"), "11355158914966210201410734667484298031497086510917116878993822963793177737963323849914979806826759273431791474575057946263651613906587629736481370983420295626001055840803201448376203681672140726404056349423937599480275853513810616624349811159346536182220806878464577429106150903913526744093300868582898892977164229848617413618851794501457802670374543399415905458325601994002527427083792164898293507308423780001137468154518279116138010266341425663850327379848131113626641510715557748879427991785684858504631545256553961505159377600982900016536629720752767147086708626971940835730555782551222922985302674756190839458609") + # After switching to the PSS signature scheme, each signature will be + # different. So we have to verify the signature through the sign object + sig = jresult.pop('signature') + self.assertTrue(sign_object.verify(json.dumps(jresult, sort_keys=True), sig)) def test_08_get_webui_settings(self): # Test that a machine definition will return offline hashes diff --git a/tests/test_lib_crypto.py b/tests/test_lib_crypto.py index 9c34a4d86a..8c227dbcd4 100644 --- a/tests/test_lib_crypto.py +++ b/tests/test_lib_crypto.py @@ -14,13 +14,14 @@ geturandom, get_alphanum_str, hash_with_pepper, verify_with_pepper, aes_encrypt_b64, aes_decrypt_b64, get_hsm, init_hsm, set_hsm_password, hash, - encrypt, decrypt) + encrypt, decrypt, Sign) from privacyidea.lib.utils import to_bytes, to_unicode from privacyidea.lib.security.default import (SecurityModule, DefaultSecurityModule) from privacyidea.lib.security.aeshsm import AESHardwareSecurityModule from flask import current_app +import six from six import text_type from PyKCS11 import PyKCS11Error @@ -559,3 +560,68 @@ def test_01_set_password(self): self.assertTrue(ready) self.assertIs(hsm, init_hsm()) self.assertIs(get_hsm(), hsm) + + +class SignObjectTestCase(MyTestCase): + """ tests for the SignObject which signs/verifies using RSA """ + + def test_00_create_sign_object(self): + # test with invalid key data + with self.assertRaises(Exception): + Sign(b'This is not a private key', b'This is not a public key') + with self.assertRaises(Exception): + priv_key = open(current_app.config.get("PI_AUDIT_KEY_PRIVATE"), 'rb').read() + Sign(private_key=priv_key, + public_key=b'Still not a public key') + # this should work + priv_key = open(current_app.config.get("PI_AUDIT_KEY_PRIVATE"), 'rb').read() + pub_key = open(current_app.config.get("PI_AUDIT_KEY_PUBLIC"), 'rb').read() + so = Sign(priv_key, pub_key) + self.assertEquals(so.sig_ver, 'rsa_sha256_pss') + + def test_01_sign_and_verify_data(self): + priv_key = open(current_app.config.get("PI_AUDIT_KEY_PRIVATE"), 'rb').read() + pub_key = open(current_app.config.get("PI_AUDIT_KEY_PUBLIC"), 'rb').read() + so = Sign(priv_key, pub_key) + data = 'short text' + sig = so.sign(data) + self.assertTrue(sig.startswith(so.sig_ver), sig) + self.assertTrue(so.verify(data, sig)) + + data = b'A slightly longer text, this time in binary format.' + sig = so.sign(data) + self.assertTrue(so.verify(data, sig)) + + # test with text larger than RSA key size + data = b'\x01\x02' * 5000 + sig = so.sign(data) + self.assertTrue(so.verify(data, sig)) + + # now test a broken signature + data = 'short text' + sig = so.sign(data) + sig_broken = sig[:-1] + '{:x}'.format((int(sig[-1], 16) + 1) % 16) + self.assertFalse(so.verify(data, sig_broken)) + + # test with non hex string + sig_broken = sig[:-1] + 'x' + self.assertFalse(so.verify(data, sig_broken)) + + # now try to verify old signatures + # first without enabling old signatures in config + short_text_sig = 15197717811878792093921885389298262311612396877333963031070812155820116863657342817645537537961773450510020137791036591085713379948816070430789598146539509027948592633362217308056639775153575635684961642110792013775709164803544619582232081442445758263838942315386909453927493644845757192298617925455779136340217255670113943560463286896994555184188496806420559078552626485909484729552861477888246423469461421103010299470836507229490718177625822972845024556897040292571751452383573549412451282884349017186147757238775308192484937929135306435242403555592741059466194258607967889051881221759976135386624406095324595765010 + data = 'short text' + self.assertFalse(so.verify(data, short_text_sig)) + + # now we enable the checking of old signatures + short_text_sig = 15197717811878792093921885389298262311612396877333963031070812155820116863657342817645537537961773450510020137791036591085713379948816070430789598146539509027948592633362217308056639775153575635684961642110792013775709164803544619582232081442445758263838942315386909453927493644845757192298617925455779136340217255670113943560463286896994555184188496806420559078552626485909484729552861477888246423469461421103010299470836507229490718177625822972845024556897040292571751452383573549412451282884349017186147757238775308192484937929135306435242403555592741059466194258607967889051881221759976135386624406095324595765010 + data = 'short text' + self.assertTrue(so.verify(data, short_text_sig, verify_old_sigs=True)) + + # verify a broken old signature + broken_short_text_sig = short_text_sig + 1 + self.assertFalse(so.verify(data, broken_short_text_sig, verify_old_sigs=True)) + + long_data_sig = 991763198885165486007338893972384496025563436289154190056285376683148093829644985815692167116166669178171916463844829424162591848106824431299796818231239278958776853940831433819576852350691126984617641483209392489383319296267416823194661791079316704545017249491961092046751201670544843607206698682190381208022128216306635574292359600514603728560982584561531193227312370683851459162828981766836503134221347324867936277484738573153562229478151744446530191383660477390958159856842222437156763388859923477183453362567547792824054461704970820770533637185477922709297916275611571003099205429044820469679520819043851809079 + long_data = b'\x01\x02' * 5000 + self.assertTrue(so.verify(long_data, long_data_sig, verify_old_sigs=True)) diff --git a/tests/test_lib_subscriptions.py b/tests/test_lib_subscriptions.py index 149d9006da..982755a247 100644 --- a/tests/test_lib_subscriptions.py +++ b/tests/test_lib_subscriptions.py @@ -105,4 +105,22 @@ def test_03_check_subscription(self): init_token({"type": "spass"}, user=User("nopw", self.realm1)) # Now we have three users with tokens, subscription will fail self.assertRaises(SubscriptionError, check_subscription, - "demo_application") \ No newline at end of file + "demo_application") + + # try to save some broken sunbscriptions + sub1 = SUBSCRIPTION1.copy() + sub1['date_from'] = '1234' + with self.assertRaises(ValueError): + save_subscription(sub1) + + sub1 = SUBSCRIPTION1.copy() + sub1['by_name'] = 'unknown vendor' + with self.assertRaisesRegexp(SubscriptionError, 'Verifying the signature ' + 'of your subscription'): + save_subscription(sub1) + + sub1 = SUBSCRIPTION1.copy() + sub1['signature'] = str(int(sub1['signature']) + 1) + with self.assertRaisesRegexp(SubscriptionError, 'Signature of your ' + 'subscription does not'): + save_subscription(sub1) From 4453dfdca5c4d67947cebacdef7cd35d110eea0a Mon Sep 17 00:00:00 2001 From: Paul Lettich Date: Tue, 26 Mar 2019 15:00:58 +0100 Subject: [PATCH 4/4] Add some more tests and improve coverage - Add tests for missing crypto keys and external audit DBs - Fix warning about deprecated pbkdf2 module --- privacyidea/lib/crypto.py | 9 ++-- privacyidea/lib/importotp.py | 8 ++-- privacyidea/lib/resolvers/SQLIdResolver.py | 3 +- tests/base.py | 2 + tests/test_api_lib_policy.py | 15 +++++++ tests/test_lib_audit.py | 49 +++++++++++++++++++-- tests/test_lib_crypto.py | 31 ++++++++----- tests/testdata/audit.sqlite | Bin 0 -> 12288 bytes 8 files changed, 92 insertions(+), 25 deletions(-) create mode 100644 tests/testdata/audit.sqlite diff --git a/privacyidea/lib/crypto.py b/privacyidea/lib/crypto.py index db5cdf98fa..8a7a2c0ded 100644 --- a/privacyidea/lib/crypto.py +++ b/privacyidea/lib/crypto.py @@ -633,11 +633,11 @@ def zerome(bufferObject): def _slow_rsa_verify_raw(key, sig, msg): assert isinstance(sig, six.integer_types) assert isinstance(msg, six.integer_types) - if hasattr(key, 'private_numbers'): - pn = key.private_numbers().public_numbers - elif hasattr(key, 'public_numbers'): + if hasattr(key, 'public_numbers'): pn = key.public_numbers() - else: + elif hasattr(key, 'private_numbers'): # pragma: no cover + pn = key.private_numbers().public_numbers + else: # pragma: no cover raise TypeError('No public key') # compute m**d (mod n) @@ -691,6 +691,7 @@ def sign(self, s): """ if not self.private: log.info('Could not sign message {0!s}, no private key!'.format(s)) + # TODO: should we throw an exception in this case? return '' signature = self.private.sign( diff --git a/privacyidea/lib/importotp.py b/privacyidea/lib/importotp.py index 84dccf4b41..d6a1d2a7e6 100644 --- a/privacyidea/lib/importotp.py +++ b/privacyidea/lib/importotp.py @@ -61,7 +61,7 @@ from privacyidea.lib.crypto import (aes_decrypt_b64, aes_encrypt_b64, geturandom) from bs4 import BeautifulSoup import traceback -from passlib.utils.pbkdf2 import pbkdf2 +from passlib.crypto.digest import pbkdf2_hmac import gnupg import logging @@ -134,7 +134,7 @@ def parseOATHcsv(csv): csv_array = csv.split('\n') - m = re.match("^#\s*version:\s*(\d+)", csv_array[0]) + m = re.match(r"^#\s*version:\s*(\d+)", csv_array[0]) if m: version = m.group(1) log.debug("the file is version {0}.".format(version)) @@ -430,8 +430,8 @@ def derive_key(xml, password): salt = keymeth.find("salt").text.strip() keylength = keymeth.find("keylength").text.strip() rounds = keymeth.find("iterationcount").text.strip() - r = pbkdf2(to_utf8(password), base64.b64decode(salt), int(rounds), - int(keylength)) + r = pbkdf2_hmac('sha1', to_utf8(password), base64.b64decode(salt), + rounds=int(rounds), keylen=int(keylength)) return binascii.hexlify(r) diff --git a/privacyidea/lib/resolvers/SQLIdResolver.py b/privacyidea/lib/resolvers/SQLIdResolver.py index 57c6c1f1cf..f9dce39ba2 100644 --- a/privacyidea/lib/resolvers/SQLIdResolver.py +++ b/privacyidea/lib/resolvers/SQLIdResolver.py @@ -48,7 +48,8 @@ from privacyidea.lib.utils import (is_true, censor_connect_string, to_utf8) from passlib.context import CryptContext from base64 import b64decode, b64encode -from passlib.utils.binary import h64 +from passlib.utils import h64 + from passlib.utils.compat import uascii_to_str, u from passlib.utils.compat import unicode as pl_unicode from passlib.utils import to_unicode diff --git a/tests/base.py b/tests/base.py index 436724c7d2..5ef765462f 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + import unittest import json from privacyidea.app import create_app diff --git a/tests/test_api_lib_policy.py b/tests/test_api_lib_policy.py index ca980d82ca..d848a3a552 100644 --- a/tests/test_api_lib_policy.py +++ b/tests/test_api_lib_policy.py @@ -1853,6 +1853,21 @@ def test_07_sign_response(self): sign_object = Sign(private_key=None, public_key=open("tests/testdata/public.pem", 'rb').read()) + # check that we don't sign if 'PI_NO_RESPONSE_SIGN' is set + current_app.config['PI_NO_RESPONSE_SIGN'] = True + new_response = sign_response(req, resp) + self.assertEqual(new_response, resp, new_response) + current_app.config['PI_NO_RESPONSE_SIGN'] = False + + # set a broken signing key path. The function should return without + # changing the response + orig_key_path = current_app.config['PI_AUDIT_KEY_PRIVATE'] + current_app.config['PI_AUDIT_KEY_PRIVATE'] = '/path/does/not/exist' + new_response = sign_response(req, resp) + self.assertEqual(new_response, resp, new_response) + current_app.config['PI_AUDIT_KEY_PRIVATE'] = orig_key_path + + # signing of API responses is the default new_response = sign_response(req, resp) jresult = json.loads(new_response.data) self.assertEqual(jresult.get("nonce"), "12345678") diff --git a/tests/test_lib_audit.py b/tests/test_lib_audit.py index f4a65c4025..07918a7086 100644 --- a/tests/test_lib_audit.py +++ b/tests/test_lib_audit.py @@ -11,12 +11,11 @@ from privacyidea.lib.auditmodules.sqlaudit import column_length import datetime import time -from privacyidea.models import db -from privacyidea.app import create_app PUBLIC = "tests/testdata/public.pem" PRIVATE = "tests/testdata/private.pem" +AUDIT_DB = 'sqlite:///tests/testdata//audit.sqlite' class AuditTestCase(MyTestCase): @@ -210,7 +209,13 @@ def test_06_truncate_data(self): u"Mannheim,Augsburg,Wiesbaden,Gelsenki+,Möncheng+,Braunsch+,Kiel,Chemnitz,Aachen,Magdeburg", self.Audit.audit_data.get("policies")) - def test_07_sign_non_ascii_entry(self): + def test_07_sign_and_verify(self): + # Test with broken key file paths + self.app.config["PI_AUDIT_KEY_PUBLIC"] = PUBLIC + self.app.config["PI_AUDIT_KEY_PRIVATE"] = '/path/not/valid' + with self.assertRaises(Exception): + getAudit(self.app.config) + self.app.config["PI_AUDIT_KEY_PRIVATE"] = PRIVATE # Log a username as unicode with a non-ascii character self.Audit.log({"serial": "1234", "action": "token/assign", @@ -221,6 +226,18 @@ def test_07_sign_non_ascii_entry(self): self.assertEqual(audit_log.total, 1) self.assertEqual(audit_log.auditdata[0].get("user"), u"kölbel") self.assertEqual(audit_log.auditdata[0].get("sig_check"), "OK") + # check the raw data from DB + db_entries = self.Audit.search_query({'user': u'kölbel'}) + db_entry = next(db_entries) + self.assertTrue(db_entry.signature.startswith('rsa_sha256_pss'), db_entry) + # modify the table data + db_entry.realm = 'realm1' + self.Audit.session.merge(db_entry) + self.Audit.session.commit() + # and check if we get a failed signature check + audit_log = self.Audit.search({"user": u"kölbel"}) + self.assertEquals(audit_log.total, 1) + self.assertEquals(audit_log.auditdata[0].get("sig_check"), "FAIL") def test_08_policies(self): self.Audit.log({"action": "validate/check"}) @@ -235,4 +252,28 @@ def test_08_policies(self): self.Audit.finalize_log() audit_log = self.Audit.search({"policies": "*rule4*"}) self.assertEqual(audit_log.total, 1) - self.assertEqual(audit_log.auditdata[0].get("policies"), "rule4,rule5") \ No newline at end of file + self.assertEqual(audit_log.auditdata[0].get("policies"), "rule4,rule5") + + def test_09_check_external_audit_db(self): + self.app.config["PI_AUDIT_SQL_URI"] = AUDIT_DB + audit = getAudit(self.app.config) + total = audit.get_count({}) + self.assertEquals(total, 5) + # check that we have old style signatures in the DB + db_entries = audit.search_query({"user": "testuser"}) + db_entry = next(db_entries) + self.assertTrue(db_entry.signature.startswith('213842441384'), db_entry) + # by default, PI_CHECK_OLD_SIGNATURES is false and thus the signature check fails + audit_log = audit.search({"user": "testuser"}) + self.assertEquals(audit_log.total, 1) + self.assertEquals(audit_log.auditdata[0].get("sig_check"), "FAIL") + # but they validate correctly when PI_CHECK_OLD_SIGNATURES is true + self.app.config['PI_CHECK_OLD_SIGNATURES'] = True + audit_log = audit.search({"user": "testuser"}) + self.assertEquals(audit_log.total, 1) + self.assertEquals(audit_log.auditdata[0].get("sig_check"), "OK") + # except for entry number 4 where the 'realm' was added afterwards + audit_log = audit.search({"realm": "realm1"}) + self.assertEquals(audit_log.total, 1) + self.assertEquals(audit_log.auditdata[0].get("sig_check"), "FAIL") + # TODO: add new audit entry and check for new style signature diff --git a/tests/test_lib_crypto.py b/tests/test_lib_crypto.py index 8c227dbcd4..b67d621c1f 100644 --- a/tests/test_lib_crypto.py +++ b/tests/test_lib_crypto.py @@ -21,7 +21,6 @@ from privacyidea.lib.security.aeshsm import AESHardwareSecurityModule from flask import current_app -import six from six import text_type from PyKCS11 import PyKCS11Error @@ -148,11 +147,11 @@ def test_00_encrypt_decrypt_pin(self): # decrypt some pins generated with 2.23 pin1 = 'd2c920ad10513c8ea322b522751185a3:54f068cffb43ada1edd024087da614ec' - self.assertEquals(decryptPin(pin1), 'test') + self.assertEqual(decryptPin(pin1), 'test') pin2 = '223f414872122ad112eb9f17b05da0b8:123079d997cd18601414830ab7c97678' - self.assertEquals(decryptPin(pin2), 'test') + self.assertEqual(decryptPin(pin2), 'test') pin3 = '4af7590600286becde70b99b10493104:09e4133652c609f9697e1923cde72904' - self.assertEquals(decryptPin(pin3), '1234') + self.assertEqual(decryptPin(pin3), '1234') def test_01_encrypt_decrypt_pass(self): r = encryptPassword(u"passwörd".encode('utf8')) @@ -168,11 +167,11 @@ def test_01_encrypt_decrypt_pass(self): # decrypt some passwords generated with 2.23 pw1 = '3d1bf9db4c75469b4bb0bc7c70133181:2c27ac3839ed2213b8399d0471b17136' - self.assertEquals(decryptPassword(pw1), 'test123') + self.assertEqual(decryptPassword(pw1), 'test123') pw2 = '3a1be65a234f723fe5c6969b818582e1:08e51d1c65aa74c4988d094c40cb972c' - self.assertEquals(decryptPassword(pw2), 'test123') + self.assertEqual(decryptPassword(pw2), 'test123') pw3 = '7a4d5e2f26978394e33715bc3e8188a3:90b2782112ad7bbc5b48bd10e5c7c096cfe4ef7d9d11272595dc5b6c7f21d98a' - self.assertEquals(decryptPassword(pw3, ), u'passwörd') + self.assertEqual(decryptPassword(pw3, ), u'passwörd') # TODO: add checks for broken paddings/encrypted values and malformed enc_data @@ -207,10 +206,10 @@ def test_02_encrypt_decrypt_eas_base64(self): data = b'secret data' enc_data = 'WNfUSNBNZF5kaPfujW8ueUi5Afas47pQ/3FHc3VymWM=' d = aes_decrypt_b64(binascii.unhexlify(hex_key), enc_data) - self.assertEquals(data, d) + self.assertEqual(data, d) enc_data = 'RDDvdAJhCnw/tlYscTxv+6idHAQnQFY5VpUK8SFflYQ=' d = aes_decrypt_b64(binascii.unhexlify(hex_key), enc_data) - self.assertEquals(data, d) + self.assertEqual(data, d) # TODO: add checks for broken paddings/encrypted values and malformed enc_data @@ -243,14 +242,14 @@ def test_04_encrypt_decrypt_data(self): s = u'passwörd'.encode('utf8') iv_hex = 'cd5245a2875007d30cc049c2e7eca0c5' enc_data_hex = '7ea55168952b33131077f4249cf9e52b5f2b572214ace13194c436451fe3788c' - self.assertEquals(s, decrypt(binascii.unhexlify(enc_data_hex), + self.assertEqual(s, decrypt(binascii.unhexlify(enc_data_hex), binascii.unhexlify(iv_hex))) enc_data_hex = 'fb79a04d69e832aec8ffb4bbfe031b3bd28a2840150212d8c819e' \ '362b1711cc389aed70eaf27af53131ea446095da80e88c4caf791' \ 'c709e9581ff0a5f1e19228dc4c3c278d148951acaab9a164c1770' \ '7166134f4ba6111055c65d72771c6f59c2dc150a53753f2cf4c47' \ 'ec02901022f02a054d1fc7678fd4f66b47967a5d222a' - self.assertEquals(b'\x01\x02' * 30, + self.assertEqual(b'\x01\x02' * 30, decrypt(binascii.unhexlify(enc_data_hex), binascii.unhexlify(iv_hex))) @@ -577,7 +576,15 @@ def test_00_create_sign_object(self): priv_key = open(current_app.config.get("PI_AUDIT_KEY_PRIVATE"), 'rb').read() pub_key = open(current_app.config.get("PI_AUDIT_KEY_PUBLIC"), 'rb').read() so = Sign(priv_key, pub_key) - self.assertEquals(so.sig_ver, 'rsa_sha256_pss') + self.assertEqual(so.sig_ver, 'rsa_sha256_pss') + + # test missing keys + so = Sign(public_key=pub_key) + res = so.sign('testdata') + self.assertEqual(res, '') + so = Sign(private_key=priv_key) + res = so.verify('testdata', 'testsig') + self.assertFalse(res) def test_01_sign_and_verify_data(self): priv_key = open(current_app.config.get("PI_AUDIT_KEY_PRIVATE"), 'rb').read() diff --git a/tests/testdata/audit.sqlite b/tests/testdata/audit.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..744d06f3533fcf1914dd17c9cb2c79d03e10287a GIT binary patch literal 12288 zcmeI1&2C#&6os$j1louw41fxiQllc&NVGctKOGTT3o6nSLO^tYQe)qiD_lFWovNJ^ zA)bI2-~kvhWzZoHzzgsMOb`+SzJ24kfl?S(x~`fy*XQiB_gZVOebRpX(N;ApJ9oyD zd%ao57B3d9R`E`!Qxrvuzis{=zm|BjeEb97<@5i0wTjEH{jkQWrM2&hwLkdMycz@= z1R4Yy1R4Yy1R4Yy1R4Yy1R4Yy1WqDwd+E6|_Co7nH5!x;st0@f)u8O{^$rHrZ0}%N zPEP#XzPi)h-0gO*Z(r+v*7=vUotxX86YDz{CBVkl%dO|nT)5Es=H{&T#jrfF;KbXd zdF+f&Iv(CXv3Q~Rnys|Hyb|@=_u{JK{@MHkCU43 ztVVani&N&$_9xZ-UjJduA(@>#&v;*F+#gouX!g&I!|~l=dB1#o$h^M(uLN;WVl2tK#tDN@MCYK21olQCz z6BS2<>`e5E#a4%qdJd2iMocAtz>#Wh1r`}Nvv9r@A z=NOz)mdrG}!6I_Hq8?+(HF)P-wvL=ZLi@}Ut#kz0ttpjKQ%m%hu&Hk7Sx#jE)g(Z-3(2@;=<){Hk3LsdriP(Yn+EI^Jlr8zhptH(} z>~jEM8+B6DX@uTHL0Lmat{EX)%2Y~`xh4uBd>{unj{uyIfln&spxI|V5tGypsDcuQ z*_ecN?<1r|5ay}i$s5v&(Rp^|ac}nJ=+TdlzMuB1>hO)_g-nZAYn?t#rbhUpv|vSn zit2o-ArlOs6rdm@gSSRx@NFPJAzEz=sVR}PXMqn!NL4D3CPGEMU;+KfcxoYK3n__8 z6%n;j-q(UgxMTp-8b*i^jYtf#kQj|atsEDD2>ubeGz{?(U4|jD4*bk^9YYE(fpFHr zAs3e-+X0Ow(J5*QNE*E)L(T+3Qxwcrm(d5>A%;RaNEB80R0}Afa}O{i#%E+gJyAmn z>`P845Dg-W6rd7XgbTWxQ&AVpNnb<@P$+{?APb@#j4ud>bObw`CvO#u)=?A3p+9R( zaE`76DakX?o*af$opCsd;!M=4Ko&8gc}1}3 zj0jNR1R}wiKESQem+(S;@EvI%%mRTUZU~IfY^e&9P`E~h(v-jwfI)?X;K4=dBE;kp zN}#W>8dOA!O5YlhX?;u5hi|p#8a-Q_`)?~1p@BJIQM_O#wxvXdGz1ohC^;Bi(U>$1 zdW)Q>s1H}6`$=9KS|??PgNPJxM~fpkVj$dFZ<&5#D>jBhN=M@dxU@GeaH0Y?K;#tg zB83b$Uc519(8Ik+L_8MzG9YfV~lC{2}!fCMR)?8I=`Zjo&OmPn=mP6sn27!C=kAsQY?O_&tMNgsG|V}_6c zBZf-i2u@@#HNz(|SEv%B10P7#C_uJS8wPEdIZ{!kxiCdmB4g1gB-GYN(}}6v?>Tt4kfW0!07@lI>NmpmL;?Y zFaQ<8CeI_FbkYh;JhG-6(MoOA34(D^K4&B-nvodOz zm{~cU$tC~r^`(VS=XaK;nJLr&IK_$5@N@=o5TYm(*Ty3?jMd`;heF1%j*M^#DX4a0 zvOm|#QECmrW zikW&m0J7NPl5l z@{%hR@{r-c4GNutq>M`t5(&U6NrN>X$&H55uDQoB&BW~RSUARySpWyjSu6!xhkv<{ z0W}ki$MyV$0LBg%61vEuEJ8EEa9T{%GRaUihm$!WNCix&lRoE$Bs#`JxEKPZqAtpy zR2lKiICA0`A2pW4TiC^0Q3zedLBbNIR1d1{0OY4YD5Ij5TZ;eR4BEh;lVV$7~S*aX6$Uw^BCYpcEl9lKZR} zmCQgo3`g$$W^IEHji(P@p7_ z#<@Z=C-5mGf$r%ptPfzs=K%wSN)yYiNv<_=LBVBFCYs?KCPgj!DA#WuQzw@eQ8C>v zvFTwpVVUGYg=~_D$O^qt2m>AUaWsqBO9x;p+&RF8r*bhM6WEoT8NVrT$Hj$ZtjSbi zW>72uatA~m7$`1`Rq#6kV~Xj)B@2g;E1vYQ