In [None]:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.backends import default_backend
from cryptography import x509
from cryptography.x509.oid import NameOID
from datetime import datetime, timedelta
import os

def generate_test_files():
    os.makedirs("ca", exist_ok=True)
    os.makedirs("certs", exist_ok=True)
    os.makedirs("apps", exist_ok=True)

    # 1. Генерация корневого ключа и сертификата
    root_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    
    root_subject = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, "Test Root CA"),
    ])
    
    root_cert = x509.CertificateBuilder().subject_name(
        root_subject
    ).issuer_name(
        root_subject
    ).public_key(
        root_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.utcnow()
    ).not_valid_after(
        datetime.utcnow() + timedelta(days=365)
    ).add_extension(
        x509.BasicConstraints(ca=True, path_length=None),
        critical=True,
    ).sign(root_key, hashes.SHA256(), default_backend())

    with open("ca/root_ca.crt", "wb") as f:
        f.write(root_cert.public_bytes(serialization.Encoding.PEM))

    # 2. Генерация промежуточного ключа и сертификата
    intermediate_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    
    intermediate_subject = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, "Test Intermediate CA"),
    ])
    
    intermediate_cert = x509.CertificateBuilder().subject_name(
        intermediate_subject
    ).issuer_name(
        root_subject
    ).public_key(
        intermediate_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.utcnow()
    ).not_valid_after(
        datetime.utcnow() + timedelta(days=180)
    ).add_extension(
        x509.BasicConstraints(ca=True, path_length=0),
        critical=True,
    ).sign(root_key, hashes.SHA256(), default_backend())

    with open("certs/intermediate.crt", "wb") as f:
        f.write(intermediate_cert.public_bytes(serialization.Encoding.PEM))

    # 3. Генерация конечного сертификата (для разработчика)
    dev_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    
    dev_subject = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, "Test Developer"),
    ])
    
    dev_cert = x509.CertificateBuilder().subject_name(
        dev_subject
    ).issuer_name(
        intermediate_subject
    ).public_key(
        dev_key.public_key()
    ).serial_number(
        x509.random_serial_number()
    ).not_valid_before(
        datetime.utcnow()
    ).not_valid_after(
        datetime.utcnow() + timedelta(days=90)
    ).sign(intermediate_key, hashes.SHA256(), default_backend())

    with open("certs/developer.crt", "wb") as f:
        f.write(dev_cert.public_bytes(serialization.Encoding.PEM))

    # 4. Генерация тестового приложения и подписи
    app_data = b"This is a test application data for verification"
    with open("apps/secure_app.bin", "wb") as f:
        f.write(app_data)
    
    signature = dev_key.sign(
        app_data,
        padding.PSS(
            mgf=padding.MGF1(hashes.SHA256()),
            salt_length=padding.PSS.MAX_LENGTH
        ),
        hashes.SHA256()
    )
    
    with open("apps/secure_app.sig", "wb") as f:
        f.write(signature)

    # 5. Генерация тестового CRL
    builder = x509.CertificateRevocationListBuilder()
    builder = builder.issuer_name(root_subject)
    builder = builder.last_update(datetime.utcnow())
    builder = builder.next_update(datetime.utcnow() + timedelta(days=1))
    
    crl = builder.sign(
        private_key=root_key,
        algorithm=hashes.SHA256(),
        backend=default_backend()
    )
    
    with open("ca/test_revocation.crl", "wb") as f:
        f.write(crl.public_bytes(serialization.Encoding.PEM))

    print("Тестовые данные успешно сгенерированы!")

if __name__ == "__main__":
    generate_test_files()

Тестовые данные успешно сгенерированы!


  datetime.utcnow()
  datetime.utcnow() + timedelta(days=365)
  datetime.utcnow()
  datetime.utcnow() + timedelta(days=180)
  datetime.utcnow()
  datetime.utcnow() + timedelta(days=90)
  builder = builder.last_update(datetime.utcnow())
  builder = builder.next_update(datetime.utcnow() + timedelta(days=1))


In [None]:
import os
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import unittest
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509 import load_pem_x509_certificate, load_pem_x509_crl
from cryptography.exceptions import InvalidSignature

class CertificateTrustSystem:
    
    def __init__(self, root_cert_path, revocation_list_path=None):
        self.root_cert = self._load_x509_certificate(root_cert_path)
        self.revocation_list = self._load_crl(revocation_list_path) if revocation_list_path else None
        self.trust_chain = [self.root_cert]

    def add_certificate_authority(self, cert_path):
        intermediate_cert = self._load_x509_certificate(cert_path)
        self._validate_certificate_chain(intermediate_cert)
        self.trust_chain.append(intermediate_cert)

    def verify_signed_application(self, app_path, signature_path, cert_path):
        self._verify_file_accessibility(app_path)
        self._verify_file_accessibility(signature_path)
        self._verify_file_accessibility(cert_path)

        end_entity_cert = self._load_x509_certificate(cert_path)
        self._validate_certificate_chain(end_entity_cert)
        self._validate_digital_signature(app_path, signature_path, end_entity_cert)
        self._check_application_integrity(app_path)
        return True

    def _validate_certificate_chain(self, certificate):
        current_cert = certificate
        chain_verified = False

        for trusted_cert in reversed(self.trust_chain):
            try:
                trusted_cert.public_key().verify(
                    current_cert.signature,
                    current_cert.tbs_certificate_bytes,
                    padding.PKCS1v15(),
                    current_cert.signature_hash_algorithm
                )
                current_cert = trusted_cert
                chain_verified = True
            except InvalidSignature:
                continue

        if not chain_verified:
            raise TrustVerificationError("Certificate chain validation unsuccessful")
        self._validate_certificate_period(certificate)

    def _validate_certificate_period(self, certificate):
        current_time = datetime.now(timezone.utc)
        valid_from = certificate.not_valid_before_utc
        valid_to = certificate.not_valid_after_utc
        
        if not (valid_from <= current_time <= valid_to):
            raise TrustVerificationError(
                f"Certificate not valid. Validity period: {valid_from} to {valid_to}"
            )

        if (self.revocation_list and 
            self.revocation_list.get_revoked_certificate_by_serial_number(certificate.serial_number)):
            raise TrustVerificationError(
                f"Certificate revoked (serial: {certificate.serial_number})"
            )

    def _validate_digital_signature(self, data_path, signature_path, certificate):
        with open(data_path, "rb") as data_file:
            application_data = data_file.read()

        with open(signature_path, "rb") as sig_file:
            signature_data = sig_file.read()

        try:
            certificate.public_key().verify(
                signature_data,
                application_data,
                padding.PKCS1v15(),
                hashes.SHA256()
            )
        except InvalidSignature:
            try:
                certificate.public_key().verify(
                    signature_data,
                    application_data,
                    padding.PSS(
                        mgf=padding.MGF1(hashes.SHA256()),
                        salt_length=padding.PSS.MAX_LENGTH
                    ),
                    hashes.SHA256()
                )
            except InvalidSignature as err:
                raise TrustVerificationError(
                    f"Digital signature validation failed: {str(err)}"
                )

    def _check_application_integrity(self, file_path):
        with open(file_path, "rb") as file:
            application_data = file.read()

        hash_processor = hashes.Hash(hashes.SHA256())
        hash_processor.update(application_data)
        return hash_processor.finalize()

    def _load_x509_certificate(self, file_path):
        try:
            with open(file_path, "rb") as cert_file:
                return load_pem_x509_certificate(cert_file.read())
        except Exception as err:
            raise TrustVerificationError(f"Certificate loading error: {str(err)}")

    def _load_crl(self, file_path):
        try:
            with open(file_path, "rb") as crl_file:
                return load_pem_x509_crl(crl_file.read())
        except Exception as err:
            raise TrustVerificationError(f"CRL loading error: {str(err)}")

    def _verify_file_accessibility(self, path):
        if not os.path.exists(path):
            raise TrustVerificationError(f"Required file not found: {path}")

class TrustVerificationError(Exception):
    pass

class MockTrustSystem(CertificateTrustSystem):
    def __init__(self, root_cert=None, crl=None):
        self.root_cert = root_cert
        self.revocation_list = crl
        self.trust_chain = [self.root_cert] if root_cert else []
        self.test_intermediate_cert = None
        self.test_end_entity_cert = None
        self.simulate_integrity_failure = False
    
    def _load_x509_certificate(self, path):
        if path == "test_root.crt":
            return self.root_cert
        elif path == "test_intermediate.crt":
            return self.test_intermediate_cert
        elif path == "test_entity.crt":
            return self.test_end_entity_cert
        raise TrustVerificationError(f"Test certificate {path} not available")
    
    def _load_crl(self, path):
        if path == "test_revocation.crl":
            return self.revocation_list
        raise TrustVerificationError(f"Test CRL {path} not available")
    
    def _verify_file_accessibility(self, path):
        pass
    
    def _check_application_integrity(self, path):
        if self.simulate_integrity_failure:
            raise TrustVerificationError("Application integrity verification failed")
        return b'valid_hash_value'

class TrustSystemTests(unittest.TestCase):
    def setUp(self):
        self.mock_root = MagicMock()
        self.mock_intermediate = MagicMock()
        self.mock_end_entity = MagicMock()
        self.mock_crl = MagicMock()
        
        self.mock_root.public_key.return_value.verify.return_value = None
        self.mock_root.serial_number = 1001
        self.mock_root.tbs_certificate_bytes = b'root_cert_data'
        self.mock_root.signature = b'root_signature'
        self.mock_root.signature_hash_algorithm = hashes.SHA256()
        
        self.mock_intermediate.public_key.return_value.verify.return_value = None
        self.mock_intermediate.serial_number = 1002
        self.mock_intermediate.tbs_certificate_bytes = b'intermediate_cert_data'
        self.mock_intermediate.signature = b'intermediate_signature'
        self.mock_intermediate.signature_hash_algorithm = hashes.SHA256()
        
        self.mock_end_entity.public_key.return_value.verify.return_value = None
        self.mock_end_entity.serial_number = 1003
        self.mock_end_entity.tbs_certificate_bytes = b'end_entity_data'
        self.mock_end_entity.signature = b'end_entity_signature'
        self.mock_end_entity.signature_hash_algorithm = hashes.SHA256()
        
        self.mock_crl.get_revoked_certificate_by_serial_number.return_value = None
        
        current_time = datetime.now(timezone.utc)
        for cert in [self.mock_root, self.mock_intermediate, self.mock_end_entity]:
            cert.not_valid_before_utc = current_time - timedelta(days=1)
            cert.not_valid_after_utc = current_time + timedelta(days=1)

    def test_trust_chain_initialization(self):
        trust_system = MockTrustSystem(root_cert=self.mock_root)
        self.assertEqual(len(trust_system.trust_chain), 1)

    def test_intermediate_ca_addition(self):
        trust_system = MockTrustSystem(root_cert=self.mock_root)
        trust_system.test_intermediate_cert = self.mock_intermediate
        trust_system.add_certificate_authority("test_intermediate.crt")
        self.assertEqual(len(trust_system.trust_chain), 2)

    def test_expired_certificate_verification(self):
        trust_system = MockTrustSystem(root_cert=self.mock_root)
        self.mock_end_entity.not_valid_after_utc = datetime.now(timezone.utc) - timedelta(days=1)
        trust_system.test_end_entity_cert = self.mock_end_entity
        
        with patch.object(trust_system, '_validate_digital_signature', return_value=None):
            with patch.object(trust_system, '_check_application_integrity', return_value=b'hash'):
                with self.assertRaises(TrustVerificationError):
                    trust_system.verify_signed_application("app.exe", "app.sig", "test_entity.crt")

    def test_revoked_certificate_verification(self):
        trust_system = MockTrustSystem(root_cert=self.mock_root, crl=self.mock_crl)
        trust_system.test_end_entity_cert = self.mock_end_entity
        self.mock_crl.get_revoked_certificate_by_serial_number.return_value = MagicMock()
        
        with patch.object(trust_system, '_validate_digital_signature', return_value=None):
            with patch.object(trust_system, '_check_application_integrity', return_value=b'hash'):
                with self.assertRaises(TrustVerificationError):
                    trust_system.verify_signed_application("app.exe", "app.sig", "test_entity.crt")

    def test_invalid_signature_verification(self):
        trust_system = MockTrustSystem(root_cert=self.mock_root)
        trust_system.test_end_entity_cert = self.mock_end_entity
        
        with patch.object(trust_system, '_validate_digital_signature', 
                        side_effect=TrustVerificationError("Invalid signature")):
            with patch.object(trust_system, '_check_application_integrity', return_value=b'hash'):
                with self.assertRaises(TrustVerificationError):
                    trust_system.verify_signed_application("app.exe", "app.sig", "test_entity.crt")

    def test_untrusted_certificate_verification(self):
        trust_system = MockTrustSystem(root_cert=self.mock_root)
        trust_system.test_end_entity_cert = self.mock_end_entity
        
        with patch.object(trust_system, '_validate_certificate_chain', 
                        side_effect=TrustVerificationError("Untrusted certificate")):
            with patch.object(trust_system, '_validate_digital_signature', return_value=None):
                with patch.object(trust_system, '_check_application_integrity', return_value=b'hash'):
                    with self.assertRaises(TrustVerificationError):
                        trust_system.verify_signed_application("app.exe", "app.sig", "test_entity.crt")

    def test_corrupted_application_verification(self):
        trust_system = MockTrustSystem(root_cert=self.mock_root)
        trust_system.test_end_entity_cert = self.mock_end_entity
        trust_system.simulate_integrity_failure = True
        
        with patch.object(trust_system, '_validate_digital_signature', return_value=None):
            with self.assertRaises(TrustVerificationError):
                trust_system.verify_signed_application("bad_app.exe", "app.sig", "test_entity.crt")

    
class IntegrationTests(unittest.TestCase):
    
    def setUp(self):
        self.root_cert_file = "ca/root_ca.crt"
        self.intermediate_cert_file = "certs/intermediate.crt"  # Добавлено
        self.dev_cert_file = "certs/developer.crt"
        self.app_file = "apps/secure_app.bin"
        self.signature_file = "apps/secure_app.sig"

        if not all(os.path.exists(f) for f in [
            self.root_cert_file, 
            self.intermediate_cert_file,  # Добавлено
            self.dev_cert_file
        ]):
            raise unittest.SkipTest("Required certificate files not available")

    def test_application_verification_with_real_files(self):
        trust_system = CertificateTrustSystem(self.root_cert_file)
        
        # Добавляем промежуточный сертификат в цепочку доверия
        trust_system.add_certificate_authority(self.intermediate_cert_file)
        
        try:
            result = trust_system.verify_signed_application(
                self.app_file,
                self.signature_file,
                self.dev_cert_file
            )
            self.assertTrue(result)
        except TrustVerificationError as err:
            self.fail(f"Verification process failed: {str(err)}")

    def test_certificate_chain_validation(self):
        with open(self.root_cert_file, "rb") as root_file:
            root_cert = load_pem_x509_certificate(root_file.read())

        with open(self.intermediate_cert_file, "rb") as int_file:
            intermediate_cert = load_pem_x509_certificate(int_file.read())

        with open(self.dev_cert_file, "rb") as dev_file:
            dev_cert = load_pem_x509_certificate(dev_file.read())

        trust_system = CertificateTrustSystem(self.root_cert_file)
        
        trust_system.add_certificate_authority(self.intermediate_cert_file)
        
        try:
            trust_system._validate_certificate_chain(dev_cert)
        except TrustVerificationError as err:
            self.fail(f"Certificate chain validation unsuccessful: {str(err)}")

    def test_digital_signature_validation(self):
        with open(self.dev_cert_file, "rb") as dev_file:
            dev_cert = load_pem_x509_certificate(dev_file.read())

        trust_system = CertificateTrustSystem(self.root_cert_file)
        try:
            trust_system._validate_digital_signature(
                self.app_file,
                self.signature_file,
                dev_cert
            )
        except TrustVerificationError as err:
            self.fail(f"Digital signature validation failed: {str(err)}")

def execute_test_suite():
    test_suite = unittest.TestSuite()
    test_loader = unittest.TestLoader()
    
    test_suite.addTests(test_loader.loadTestsFromTestCase(TrustSystemTests))
    
    try:
        test_suite.addTests(test_loader.loadTestsFromTestCase(IntegrationTests))
    except unittest.SkipTest:
        pass
    
    test_runner = unittest.TextTestRunner(verbosity=2)
    results = test_runner.run(test_suite)
    
    if not results.wasSuccessful():
        print("\n=== Test Failure Details ===")
        for failed_test, error_trace in results.failures + results.errors:
            print(f"\nFAILED TEST: {failed_test.id()}")
            print(error_trace)
    
    return results

if __name__ == '__main__':
    test_results = execute_test_suite()

test_corrupted_application_verification (__main__.TrustSystemTests.test_corrupted_application_verification)
Verify rejection of corrupted applications ... ok
test_expired_certificate_verification (__main__.TrustSystemTests.test_expired_certificate_verification)
Verify rejection of expired certificates ... ok
test_intermediate_ca_addition (__main__.TrustSystemTests.test_intermediate_ca_addition)
Test successful addition of intermediate CA ... ok
test_invalid_signature_verification (__main__.TrustSystemTests.test_invalid_signature_verification)
Test rejection of invalid signatures ... ok
test_revoked_certificate_verification (__main__.TrustSystemTests.test_revoked_certificate_verification)
Verify rejection of revoked certificates ... ok
test_trust_chain_initialization (__main__.TrustSystemTests.test_trust_chain_initialization)
Verify system initializes with root certificate ... ok
test_untrusted_certificate_verification (__main__.TrustSystemTests.test_untrusted_certificate_verification)
