In [9]:
from cryptography.hazmat.primitives.asymmetric import rsa, ec, padding
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
import os

### üîπ Step 1: Receiver Generates Keys (UNCHANGED)
def generate_receiver_keys():
    """Generates RSA and ECC key pairs for the receiver."""
    private_rsa = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048
    )
    public_rsa = private_rsa.public_key()

    private_ecc = ec.generate_private_key(ec.SECP256R1())
    public_ecc = private_ecc.public_key()

    return private_rsa, public_rsa, private_ecc, public_ecc

### üîπ Step 2: Sender Derives AES Key with ECDH + HKDF (FIXED)
def generate_shared_secret(private_ecc_sender, public_ecc_receiver):
    """Computes ECDH shared secret and derives AES key using HKDF."""
    shared_secret = private_ecc_sender.exchange(ec.ECDH(), public_ecc_receiver)
    
    # Secure key derivation with HKDF
    hkdf = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=b'hybrid_file_encryption',
    )
    return hkdf.derive(shared_secret)

### üîπ Step 3: Encrypt AES Key with RSA (UNCHANGED)
def encrypt_aes_key(aes_key, public_rsa_receiver):
    """Encrypts the AES key using RSA-OAEP."""
    return public_rsa_receiver.encrypt(
        aes_key,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

### üîπ Step 4: Encrypt File with AES-GCM (UNCHANGED)
def encrypt_file(file_path, aes_key):
    """Encrypts a file using AES-GCM."""
    iv = os.urandom(12)
    cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv))
    encryptor = cipher.encryptor()

    with open(file_path, "rb") as f:
        plaintext = f.read()

    ciphertext = encryptor.update(plaintext) + encryptor.finalize()
    return iv, encryptor.tag, ciphertext

### üîπ Step 5: Decrypt AES Key with RSA (UNCHANGED)
def decrypt_aes_key(encrypted_aes_key, private_rsa_receiver):
    """Decrypts AES key using RSA private key."""
    return private_rsa_receiver.decrypt(
        encrypted_aes_key,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

### üîπ Step 6: Decrypt File with AES-GCM (UNCHANGED)
def decrypt_file(encrypted_data, aes_key, iv, tag):
    """Decrypts file using AES-GCM."""
    cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv, tag))
    decryptor = cipher.decryptor()
    return decryptor.update(encrypted_data) + decryptor.finalize()

### üîπ Full Secure Workflow with FIXES
def main():
    # ‚û§ Receiver Setup
    private_rsa, public_rsa, private_ecc, public_ecc = generate_receiver_keys()

    # ‚û§ Sender Setup
    sender_ecc_private = ec.generate_private_key(ec.SECP256R1())  # Ephemeral key
    sender_ecc_public = sender_ecc_private.public_key()

    # ‚û§ Sender: Derive AES Key via ECDH
    aes_key_sender = generate_shared_secret(sender_ecc_private, public_ecc)

    # ‚û§ Sender: Encrypt AES Key with RSA
    encrypted_aes_key = encrypt_aes_key(aes_key_sender, public_rsa)

    # ‚û§ Sender: Prepare test file
    file_path = "test_file.txt"
    with open(file_path, "w") as f:
        f.write("Secure hybrid encryption test!")

    # ‚û§ Sender: Encrypt File
    iv, tag, ciphertext = encrypt_file(file_path, aes_key_sender)

    # ‚û§ Simulate Transmission: Send these to receiver
    transmission_data = {
        "encrypted_aes_key": encrypted_aes_key,
        "sender_ecc_public": sender_ecc_public.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        ),
        "iv": iv,
        "tag": tag,
        "ciphertext": ciphertext
    }

    # ‚û§ Receiver: Decrypt AES Key with RSA
    decrypted_aes_key = decrypt_aes_key(
        transmission_data["encrypted_aes_key"], private_rsa
    )

    # ‚û§ Receiver: Re-derive AES Key via ECDH (CRITICAL FIX)
    # Deserialize sender's public key
    sender_public_ecc = serialization.load_pem_public_key(
        transmission_data["sender_ecc_public"]
    )
    
    # Recompute shared secret and AES key
    aes_key_receiver = generate_shared_secret(private_ecc, sender_public_ecc)

    # Validate keys match (MITM prevention)
    if decrypted_aes_key != aes_key_receiver:
        raise ValueError("AES Key Mismatch! Potential MITM attack detected.")

    # ‚û§ Receiver: Decrypt File
    decrypted_data = decrypt_file(
        transmission_data["ciphertext"],
        aes_key_receiver,
        transmission_data["iv"],
        transmission_data["tag"]
    )

    # Save decrypted file
    with open("decrypted_file.txt", "wb") as f:
        f.write(decrypted_data)

    print("‚úÖ Secure hybrid encryption/decryption successful!")

if __name__ == "__main__":
    main()

‚úÖ Secure hybrid encryption/decryption successful!


In [11]:
import time
import os
import random
import pandas as pd
import matplotlib.pyplot as plt
from cryptography.hazmat.primitives.asymmetric import rsa, ec, padding
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF

# ========================
# üîÑ Cryptographic Functions (Optimized)
# ========================
def generate_receiver_keys():
    """Generate long-term receiver keys once."""
    private_rsa = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    public_rsa = private_rsa.public_key()
    private_ecc = ec.generate_private_key(ec.SECP256R1())
    public_ecc = private_ecc.public_key()
    return private_rsa, public_rsa, private_ecc, public_ecc

def encrypt_data(plaintext, public_rsa, public_ecc_receiver):
    """Full encryption process with timing."""
    # Preprocessing: Sender generates ephemeral ECC key
    start_preprocess = time.perf_counter()
    sender_ecc_private = ec.generate_private_key(ec.SECP256R1())
    
    # ECDH + HKDF
    shared_secret = sender_ecc_private.exchange(ec.ECDH(), public_ecc_receiver)
    hkdf = HKDF(algorithm=hashes.SHA256(), length=32, info=b'hybrid_encryption')
    aes_key = hkdf.derive(shared_secret)
    
    # Encrypt AES key with RSA
    encrypted_aes_key = public_rsa.encrypt(
        aes_key,
        padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
    )
    preprocess_time = time.perf_counter() - start_preprocess
    
    # AES Encryption
    start_encrypt = time.perf_counter()
    iv = os.urandom(12)
    cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv))
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(plaintext) + encryptor.finalize()
    encrypt_time = time.perf_counter() - start_encrypt
    
    return (
        encrypted_aes_key,
        sender_ecc_private.public_key().public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        ),
        iv,
        encryptor.tag,
        ciphertext,
        preprocess_time,
        encrypt_time
    )

def decrypt_data(encrypted_aes_key, sender_ecc_public_bytes, iv, tag, ciphertext, private_rsa, private_ecc):
    """Full decryption process with timing."""
    start_decrypt = time.perf_counter()
    
    # Decrypt AES key
    aes_key = private_rsa.decrypt(
        encrypted_aes_key,
        padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
    )
    
    # Deserialize sender's ECC public key
    sender_ecc_public = serialization.load_pem_public_key(sender_ecc_public_bytes)
    
    # Re-derive AES key via ECDH
    shared_secret = private_ecc.exchange(ec.ECDH(), sender_ecc_public)
    hkdf = HKDF(algorithm=hashes.SHA256(), length=32, info=b'hybrid_encryption')
    derived_aes_key = hkdf.derive(shared_secret)
    
    if aes_key != derived_aes_key:
        raise ValueError("MITM attack detected!")
    
    # AES Decryption
    cipher = Cipher(algorithms.AES(derived_aes_key), modes.GCM(iv, tag))
    decryptor = cipher.decryptor()
    plaintext = decryptor.update(ciphertext) + decryptor.finalize()
    
    decrypt_time = time.perf_counter() - start_decrypt
    return plaintext, decrypt_time

# ========================
# ‚è±Ô∏è Benchmarking Setup
# ========================
def generate_test_data(size_bytes):
    """Generate random test data of specified size."""
    return os.urandom(size_bytes)

def benchmark(sizes=[1 * 1024, 10 * 1024, 100 * 1024, 1 * 1024 * 1024], trials=3):
    """Run comparative analysis for different file sizes."""
    results = []
    
    # Generate receiver keys once (long-term)
    private_rsa, public_rsa, private_ecc, public_ecc = generate_receiver_keys()
    
    for size in sizes:
        print(f"\nBenchmarking {size / 1024:.1f} KB files...")
        total_preprocess = 0
        total_encrypt = 0
        total_decrypt = 0
        
        for _ in range(trials):
            plaintext = generate_test_data(size)
            
            # Encryption (Sender)
            (enc_aes_key, sender_ecc_pub, iv, tag, ciphertext, 
             preprocess_time, encrypt_time) = encrypt_data(plaintext, public_rsa, public_ecc)
            total_preprocess += preprocess_time
            total_encrypt += encrypt_time
            
            # Decryption (Receiver)
            _, decrypt_time = decrypt_data(enc_aes_key, sender_ecc_pub, iv, tag, ciphertext, 
                                          private_rsa, private_ecc)
            total_decrypt += decrypt_time
        
        # Average times
        avg_preprocess = total_preprocess / trials
        avg_encrypt = total_encrypt / trials
        avg_decrypt = total_decrypt / trials
        
        results.append({
            "File Size (KB)": size / 1024,
            "Preprocess + Encryption (s)": avg_preprocess + avg_encrypt,
            "Decryption (s)": avg_decrypt
        })
    
    return pd.DataFrame(results)

# ========================
# üìä Visualization
# ========================
def plot_results(df):
    """Plot comparative analysis."""
    plt.figure(figsize=(10, 6))
    plt.plot(df["File Size (KB)"], df["Preprocess + Encryption (s)"], marker='o', label="Encryption (Preprocess + AES)")
    plt.plot(df["File Size (KB)"], df["Decryption (s)"], marker='s', label="Decryption")
    plt.xlabel("File Size (KB)")
    plt.ylabel("Time (seconds)")
    plt.title("Hybrid RSA+ECC Encryption vs Decryption Performance")
    plt.legend()
    plt.grid(True)
    plt.show()

# ========================
# üöÄ Main Execution
# ========================
if __name__ == "__main__":
    # Benchmark for sizes: 1KB, 10KB, 100KB, 1MB
    df = benchmark(sizes=[1024, 10 * 1024, 100 * 1024, 1024 * 1024])
    
    print("\n=== üìà Results ===")
    print(df.to_string(index=False))
    
    # Generate visualization
    plot_results(df)


Benchmarking 1.0 KB files...


TypeError: HKDF.__init__() missing 1 required positional argument: 'salt'