In [None]:
# ✅ Install required library
!pip install pycryptodome pandas --quiet


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.3/2.3 MB[0m [31m92.1 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m45.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# ✅ Hybrid AES + RSA Implementation
import os
import time
import json
import csv
from pathlib import Path
from base64 import b64encode, b64decode

import pandas as pd
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes


# ---------------------------
# Key Management
# ---------------------------
def generate_rsa_keypair(bits=2048, outdir="keys"):
    os.makedirs(outdir, exist_ok=True)
    private_key = RSA.generate(bits)
    public_key = private_key.publickey()

    with open(f"{outdir}/rsa_private_{bits}.pem", "wb") as f:
        f.write(private_key.export_key())
    with open(f"{outdir}/rsa_public_{bits}.pem", "wb") as f:
        f.write(public_key.export_key())
    print(f"✅ RSA-{bits} keypair generated in {outdir}/")


def load_rsa_key(path):
    with open(path, "rb") as f:
        return RSA.import_key(f.read())


# ---------------------------
# AES Encryption / Decryption
# ---------------------------
def aes_encrypt_file(infile, outfile, rsa_pub_key_path, mode="GCM", key_size=32):
    rsa_key = load_rsa_key(rsa_pub_key_path)
    rsa_cipher = PKCS1_OAEP.new(rsa_key)

    aes_key = get_random_bytes(key_size)
    nonce = get_random_bytes(16)

    if mode == "GCM":
        cipher = AES.new(aes_key, AES.MODE_GCM, nonce=nonce[:12])
        with open(infile, "rb") as f:
            plaintext = f.read()
        ciphertext, tag = cipher.encrypt_and_digest(plaintext)
    elif mode == "CBC":
        cipher = AES.new(aes_key, AES.MODE_CBC, iv=nonce[:16])
        with open(infile, "rb") as f:
            plaintext = f.read()
        pad_len = 16 - (len(plaintext) % 16)
        padded = plaintext + bytes([pad_len]) * pad_len
        ciphertext = cipher.encrypt(padded)
        tag = b""
    elif mode == "CTR":
        cipher = AES.new(aes_key, AES.MODE_CTR, nonce=nonce[:8])
        with open(infile, "rb") as f:
            plaintext = f.read()
        ciphertext = cipher.encrypt(plaintext)
        tag = b""
    else:
        raise ValueError("Unsupported AES mode")

    enc_aes_key = rsa_cipher.encrypt(aes_key)

    package = {
        "rsa_key_size": rsa_key.size_in_bits(),
        "aes_key_size": key_size * 8,
        "aes_mode": mode,
        "enc_key": b64encode(enc_aes_key).decode(),
        "nonce": b64encode(nonce).decode(),
        "tag": b64encode(tag).decode(),
        "ciphertext": b64encode(ciphertext).decode(),
    }

    with open(outfile, "w") as f:
        json.dump(package, f)

    print(f"✅ Encrypted file saved to {outfile}")


def aes_decrypt_file(infile, outfile, rsa_priv_key_path):
    rsa_key = load_rsa_key(rsa_priv_key_path)
    rsa_cipher = PKCS1_OAEP.new(rsa_key)

    with open(infile, "r") as f:
        package = json.load(f)

    enc_aes_key = b64decode(package["enc_key"])
    aes_key = rsa_cipher.decrypt(enc_aes_key)
    nonce = b64decode(package["nonce"])
    tag = b64decode(package["tag"])
    ciphertext = b64decode(package["ciphertext"])
    mode = package["aes_mode"]

    if mode == "GCM":
        cipher = AES.new(aes_key, AES.MODE_GCM, nonce=nonce[:12])
        plaintext = cipher.decrypt_and_verify(ciphertext, tag)
    elif mode == "CBC":
        cipher = AES.new(aes_key, AES.MODE_CBC, iv=nonce[:16])
        padded = cipher.decrypt(ciphertext)
        pad_len = padded[-1]
        plaintext = padded[:-pad_len]
    elif mode == "CTR":
        cipher = AES.new(aes_key, AES.MODE_CTR, nonce=nonce[:8])
        plaintext = cipher.decrypt(ciphertext)
    else:
        raise ValueError("Unsupported AES mode")

    with open(outfile, "wb") as f:
        f.write(plaintext)

    print(f"✅ Decrypted file saved to {outfile}")


# ---------------------------
# Benchmarking
# ---------------------------
def benchmark(input_file="test.txt", out_csv="results.csv"):
    rsa_sizes = [2048, 3072]
    aes_key_sizes = [16, 24, 32]  # 128, 192, 256 bits
    modes = ["GCM", "CBC", "CTR"]

    results = []

    for rsa_bits in rsa_sizes:
        priv_path = f"keys/rsa_private_{rsa_bits}.pem"
        pub_path = f"keys/rsa_public_{rsa_bits}.pem"

        if not Path(priv_path).exists():
            generate_rsa_keypair(rsa_bits)

        for aes_size in aes_key_sizes:
            for mode in modes:
                enc_out = f"encrypted_{rsa_bits}_{aes_size*8}_{mode}.json"
                dec_out = f"decrypted_{rsa_bits}_{aes_size*8}_{mode}.txt"

                start = time.time()
                aes_encrypt_file(input_file, enc_out, pub_path, mode=mode, key_size=aes_size)
                enc_time = time.time() - start

                start = time.time()
                aes_decrypt_file(enc_out, dec_out, priv_path)
                dec_time = time.time() - start

                ct_size = os.path.getsize(enc_out)

                results.append({
                    "RSA_bits": rsa_bits,
                    "AES_bits": aes_size * 8,
                    "AES_mode": mode,
                    "Enc_time_sec": enc_time,
                    "Dec_time_sec": dec_time,
                    "Ciphertext_size_bytes": ct_size,
                })

    pd.DataFrame(results).to_csv(out_csv, index=False)
    print(f"✅ Benchmark results saved to {out_csv}")


# ---------------------------
# Demo Run
# ---------------------------
if __name__ == "__main__":
    # Create a test file
    with open("test.txt", "w") as f:
        f.write("This is a demo of AES + RSA Hybrid Encryption.\n" * 100)

    # Generate RSA keypairs
    generate_rsa_keypair(2048)
    generate_rsa_keypair(3072)

    # Encrypt / Decrypt Example
    aes_encrypt_file("test.txt", "encrypted.json", "keys/rsa_public_2048.pem", mode="GCM", key_size=32)
    aes_decrypt_file("encrypted.json", "decrypted.txt", "keys/rsa_private_2048.pem")

    # Run Benchmark
    benchmark("test.txt", "results.csv")


✅ RSA-2048 keypair generated in keys/
✅ RSA-3072 keypair generated in keys/
✅ Encrypted file saved to encrypted.json
✅ Decrypted file saved to decrypted.txt
✅ Encrypted file saved to encrypted_2048_128_GCM.json
✅ Decrypted file saved to decrypted_2048_128_GCM.txt
✅ Encrypted file saved to encrypted_2048_128_CBC.json
✅ Decrypted file saved to decrypted_2048_128_CBC.txt
✅ Encrypted file saved to encrypted_2048_128_CTR.json
✅ Decrypted file saved to decrypted_2048_128_CTR.txt
✅ Encrypted file saved to encrypted_2048_192_GCM.json
✅ Decrypted file saved to decrypted_2048_192_GCM.txt
✅ Encrypted file saved to encrypted_2048_192_CBC.json
✅ Decrypted file saved to decrypted_2048_192_CBC.txt
✅ Encrypted file saved to encrypted_2048_192_CTR.json
✅ Decrypted file saved to decrypted_2048_192_CTR.txt
✅ Encrypted file saved to encrypted_2048_256_GCM.json
✅ Decrypted file saved to decrypted_2048_256_GCM.txt
✅ Encrypted file saved to encrypted_2048_256_CBC.json
✅ Decrypted file saved to decrypted_2048

In [None]:
"""
Advanced Hybrid AES+RSA Secure File Transfer (fixed for CLI + Colab)

- Works as CLI: `python hybrid.py encrypt ...`
- Works in notebooks: call main([...]) with a list of args (see examples below)
"""

# Install pycryptodome if needed in notebook (uncomment if required)
# !pip install pycryptodome

import argparse
import csv
import hashlib
import json
import os
import sys
import time
import zipfile
from pathlib import Path
from typing import Optional, Tuple

from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Hash import HMAC, SHA256
from Crypto.Protocol.KDF import HKDF, scrypt
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
from Crypto.Signature import pkcs1_15
from Crypto.Util.Padding import pad, unpad

# --- Constants
CHUNK_SIZE = 1024 * 1024  # 1 MiB streaming chunks
HKDF_INFO_AES = b"hybrid-file-aes"
HKDF_INFO_HMAC = b"hybrid-file-hmac"


# --- Utilities
def save_bytes(path: Path, data: bytes):
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "wb") as f:
        f.write(data)


def load_bytes(path: Path) -> bytes:
    with open(path, "rb") as f:
        return f.read()


# --- RSA key handling with optional passphrase protection
def generate_rsa_keypair(bits: int = 2048, priv_passphrase: Optional[str] = None) -> Tuple[bytes, bytes]:
    key = RSA.generate(bits)
    if priv_passphrase:
        # PKCS#8 with scrypt+AES protection
        priv_pem = key.export_key(format="PEM", passphrase=priv_passphrase, pkcs=8, protection="scryptAndAES128-CBC")
    else:
        priv_pem = key.export_key(format="PEM")
    pub_pem = key.publickey().export_key(format="PEM")
    return priv_pem, pub_pem


# RSA encrypt/decrypt of session key
def rsa_encrypt_session_key(session_key: bytes, pub_pem: bytes) -> bytes:
    pub = RSA.import_key(pub_pem)
    cipher = PKCS1_OAEP.new(pub, hashAlgo=SHA256)
    return cipher.encrypt(session_key)


def rsa_decrypt_session_key(enc_session_key: bytes, priv_pem: bytes, passphrase: Optional[str] = None) -> bytes:
    priv = RSA.import_key(priv_pem, passphrase=passphrase)
    cipher = PKCS1_OAEP.new(priv, hashAlgo=SHA256)
    return cipher.decrypt(enc_session_key)


# Signing
def sign_bytes(data: bytes, priv_pem: bytes, passphrase: Optional[str] = None) -> bytes:
    priv = RSA.import_key(priv_pem, passphrase=passphrase)
    h = SHA256.new(data)
    sig = pkcs1_15.new(priv).sign(h)
    return sig


def verify_signature(data: bytes, signature: bytes, pub_pem: bytes) -> bool:
    pub = RSA.import_key(pub_pem)
    h = SHA256.new(data)
    try:
        pkcs1_15.new(pub).verify(h, signature)
        return True
    except (ValueError, TypeError):
        return False


# HKDF-based key derivation: from a random master session key produce separate AES key and HMAC key
def derive_keys(master_key: bytes, aes_bits: int) -> Tuple[bytes, bytes]:
    aes_key = HKDF(master_key, aes_bits // 8, salt=None, hashmod=SHA256, context=HKDF_INFO_AES)
    hmac_key = HKDF(master_key, 32, salt=None, hashmod=SHA256, context=HKDF_INFO_HMAC)
    return aes_key, hmac_key


# --- Streaming AES encrypt/decrypt
def encrypt_stream(in_path: Path, out_writer, mode: str, master_key: bytes, aes_bits: int) -> dict:
    """Encrypts file at in_path and writes ciphertext to out_writer (file-like write), returns metadata dict."""
    aes_key, hmac_key = derive_keys(master_key, aes_bits)
    mode = mode.upper()
    size = in_path.stat().st_size

    if mode == "GCM":
        nonce = get_random_bytes(12)
        cipher = AES.new(aes_key, AES.MODE_GCM, nonce=nonce)
        total_written = 0
        with open(in_path, "rb") as f:
            while True:
                chunk = f.read(CHUNK_SIZE)
                if not chunk:
                    break
                ct = cipher.encrypt(chunk)
                out_writer.write(ct)
                total_written += len(ct)
        tag = cipher.digest()
        metadata = {
            "mode": "GCM",
            "nonce": nonce.hex(),
            "tag": tag.hex(),
            "plaintext_bytes": size,
            "ciphertext_bytes": total_written,
        }
        return metadata

    elif mode == "CTR":
        nonce = get_random_bytes(16)
        cipher = AES.new(aes_key, AES.MODE_CTR, nonce=nonce)
        h = HMAC.new(hmac_key, digestmod=SHA256)
        h.update(nonce)
        total_written = 0
        with open(in_path, "rb") as f:
            while True:
                chunk = f.read(CHUNK_SIZE)
                if not chunk:
                    break
                ct = cipher.encrypt(chunk)
                out_writer.write(ct)
                h.update(ct)
                total_written += len(ct)
        mac = h.digest()
        metadata = {
            "mode": "CTR",
            "nonce": nonce.hex(),
            "hmac": mac.hex(),
            "plaintext_bytes": size,
            "ciphertext_bytes": total_written,
        }
        return metadata

    elif mode == "CBC":
        iv = get_random_bytes(16)
        cipher = AES.new(aes_key, AES.MODE_CBC, iv=iv)
        h = HMAC.new(hmac_key, digestmod=SHA256)
        h.update(iv)
        # For correctness we fallback to read whole file and pad once (safe, simpler).
        data = load_bytes(in_path)
        padded = pad(data, AES.block_size)
        ct = cipher.encrypt(padded)
        h.update(ct)
        out_writer.write(ct)
        total_written = len(ct)
        mac = h.digest()
        metadata = {
            "mode": "CBC",
            "iv": iv.hex(),
            "hmac": mac.hex(),
            "plaintext_bytes": size,
            "ciphertext_bytes": total_written,
        }
        return metadata

    else:
        raise ValueError("Unsupported AES mode")


def decrypt_stream(in_reader, out_path: Path, mode: str, master_key: bytes, metadata: dict, aes_bits: Optional[int] = None):
    """Decrypt ciphertext read from in_reader (file-like) and write plaintext to out_path.

    metadata may be the 'inner' metadata dict (nonce/tag/iv/hmac) or the outer package['metadata'] dict.
    aes_bits can be provided explicitly; otherwise we try to read from metadata.
    """
    # determine AES bits
    bits = None
    # metadata might be inner or outer with nested 'metadata'
    if metadata is None:
        bits = aes_bits or 256
        inner_meta = {}
    elif "aes_bits" in metadata:
        bits = metadata.get("aes_bits")
        inner_meta = metadata.get("metadata", {})
    elif "metadata" in metadata and isinstance(metadata["metadata"], dict) and "aes_bits" in metadata["metadata"]:
        bits = metadata["metadata"]["aes_bits"]
        inner_meta = metadata["metadata"]
    else:
        bits = aes_bits or 256
        inner_meta = metadata if isinstance(metadata, dict) else {}

    aes_key, hmac_key = derive_keys(master_key, bits)
    mode = mode.upper()

    if mode == "GCM":
        nonce = bytes.fromhex(inner_meta["nonce"])
        tag = bytes.fromhex(inner_meta["tag"])
        cipher = AES.new(aes_key, AES.MODE_GCM, nonce=nonce)
        with open(out_path, "wb") as out_f:
            while True:
                chunk = in_reader.read(CHUNK_SIZE)
                if not chunk:
                    break
                pt = cipher.decrypt(chunk)
                out_f.write(pt)
        # verify tag
        try:
            cipher.verify(tag)
        except ValueError as e:
            raise ValueError("GCM tag mismatch, data corrupted or wrong key") from e

    elif mode == "CTR":
        nonce = bytes.fromhex(inner_meta["nonce"])
        expected_hmac = bytes.fromhex(inner_meta["hmac"])
        cipher = AES.new(aes_key, AES.MODE_CTR, nonce=nonce)
        h = HMAC.new(hmac_key, digestmod=SHA256)
        h.update(nonce)
        with open(out_path, "wb") as out_f:
            while True:
                chunk = in_reader.read(CHUNK_SIZE)
                if not chunk:
                    break
                h.update(chunk)
                pt = cipher.decrypt(chunk)
                out_f.write(pt)
        h.verify(expected_hmac)

    elif mode == "CBC":
        iv = bytes.fromhex(inner_meta["iv"])
        expected_hmac = bytes.fromhex(inner_meta["hmac"])
        cipher = AES.new(aes_key, AES.MODE_CBC, iv=iv)
        h = HMAC.new(hmac_key, digestmod=SHA256)
        h.update(iv)
        data = in_reader.read()  # read full ciphertext to handle padding safely
        h.update(data)
        h.verify(expected_hmac)
        pt = unpad(cipher.decrypt(data), AES.block_size)
        save_bytes(out_path, pt)

    else:
        raise ValueError("Unsupported AES mode")


# --- Packaging
def package_to_zip(out_zip: Path, enc_session_key: bytes, ciphertext_path: Path, metadata: dict, signature: Optional[bytes] = None):
    out_zip.parent.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(out_zip, "w", compression=zipfile.ZIP_DEFLATED) as z:
        # store ciphertext as file to avoid loading into memory
        z.write(ciphertext_path, arcname="ciphertext.bin")
        z.writestr("enc_key.bin", enc_session_key)
        z.writestr("metadata.json", json.dumps(metadata))
        if signature:
            z.writestr("signature.sig", signature)


def unpack_zip_package(in_zip: Path, extract_dir: Path) -> Tuple[Path, bytes, dict, Optional[bytes]]:
    extract_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(in_zip, "r") as z:
        z.extract("ciphertext.bin", path=extract_dir)
        ciphertext_path = extract_dir / "ciphertext.bin"
        enc_key = z.read("enc_key.bin")
        metadata = json.loads(z.read("metadata.json"))
        signature = z.read("signature.sig") if "signature.sig" in z.namelist() else None
    return ciphertext_path, enc_key, metadata, signature


# --- High-level commands
def cmd_gen_rsa(args):
    priv, pub = generate_rsa_keypair(args.bits, args.priv_passphrase)
    outdir = Path(args.outdir)
    outdir.mkdir(parents=True, exist_ok=True)
    save_bytes(outdir / "private.pem", priv)
    save_bytes(outdir / "public.pem", pub)
    print(f"Generated RSA {args.bits}-bit keys in {outdir} (private passphrase protected: {bool(args.priv_passphrase)})")


def cmd_encrypt(args):
    infile = Path(args.infile)
    outpkg = Path(args.out)
    pub_pem = load_bytes(Path(args.pubkey))

    # master session key (random) — will be RSA-encrypted
    master_key = get_random_bytes(32)  # 256-bit master, then HKDF derives AES/HMAC keys

    # Prepare a temporary ciphertext file (streaming)
    tmp_cipher = Path(outpkg.parent) / (outpkg.name + ".ctmp")
    if tmp_cipher.exists():
        tmp_cipher.unlink()

    t0 = time.perf_counter()
    with open(tmp_cipher, "wb") as ct_f:
        metadata_inner = encrypt_stream(infile, ct_f, args.aes_mode, master_key, args.aes_bits)
    t1 = time.perf_counter()
    aes_time = t1 - t0

    t0 = time.perf_counter()
    enc_session_key = rsa_encrypt_session_key(master_key, pub_pem)
    t1 = time.perf_counter()
    rsa_time = t1 - t0

    signature = None
    if args.sign and args.sign_privkey:
        priv_pem = load_bytes(Path(args.sign_privkey))
        # sign ciphertext file + enc_session_key
        h = SHA256.new()
        with open(tmp_cipher, "rb") as f:
            while True:
                chunk = f.read(CHUNK_SIZE)
                if not chunk:
                    break
                h.update(chunk)
        h.update(enc_session_key)
        priv = RSA.import_key(priv_pem, passphrase=args.sign_priv_passphrase)
        signature = pkcs1_15.new(priv).sign(h)

    package_metadata = {
        "original_filename": infile.name,
        "aes_mode": metadata_inner["mode"],
        "aes_bits": args.aes_bits,
        "rsa_bits": args.rsa_bits,
        "created_at": int(time.time()),
        "metadata": metadata_inner,
    }

    # write zip package
    package_to_zip(outpkg, enc_session_key, tmp_cipher, package_metadata, signature)

    # cleanup temp
    tmp_cipher.unlink()

    in_size = infile.stat().st_size
    out_size = outpkg.stat().st_size

    print(f"Encrypted -> {outpkg} (AES time {aes_time:.6f}s, RSA wrap {rsa_time:.6f}s)")
    print(f"Input {in_size} bytes -> Package {out_size} bytes")

    if getattr(args, "log", None):
        with open(args.log, "a", newline="") as csvf:
            writer = csv.writer(csvf)
            if csvf.tell() == 0:
                writer.writerow(["operation", "aes_mode", "aes_bits", "rsa_bits", "input_bytes", "package_bytes", "aes_time_s", "rsa_time_s", "timestamp"])
            writer.writerow(["encrypt", args.aes_mode, args.aes_bits, args.rsa_bits, in_size, out_size, round(aes_time, 6), round(rsa_time, 6), int(time.time())])


def cmd_decrypt(args):
    inpkg = Path(args.infile)
    priv_pem = load_bytes(Path(args.privkey))

    extract_dir = Path(inpkg.parent) / (inpkg.stem + "_extracted")
    ciphertext_path, enc_session_key, package_metadata, signature = unpack_zip_package(inpkg, extract_dir)

    t0 = time.perf_counter()
    master_key = rsa_decrypt_session_key(enc_session_key, priv_pem, passphrase=args.priv_passphrase)
    t1 = time.perf_counter()
    rsa_time = t1 - t0

    outdir = Path(args.outdir)
    outdir.mkdir(parents=True, exist_ok=True)
    out_path = outdir / package_metadata["original_filename"]

    t0 = time.perf_counter()
    with open(ciphertext_path, "rb") as ct_f:
        # pass aes_bits from package_metadata so derived keys match encryption
        decrypt_stream(ct_f, out_path, package_metadata["aes_mode"], master_key, package_metadata["metadata"], package_metadata.get("aes_bits"))
    t1 = time.perf_counter()
    aes_time = t1 - t0

    # signature verification
    if signature and getattr(args, "verify_pubkey", None):
        pub_pem = load_bytes(Path(args.verify_pubkey))
        h = SHA256.new()
        with open(ciphertext_path, "rb") as f:
            while True:
                chunk = f.read(CHUNK_SIZE)
                if not chunk:
                    break
                h.update(chunk)
        h.update(enc_session_key)
        try:
            pkcs1_15.new(RSA.import_key(pub_pem)).verify(h, signature)
            print("Signature: OK")
        except (ValueError, TypeError):
            print("Signature: FAILED")

    # cleanup extracted ciphertext file
    try:
        ciphertext_path.unlink()
        extract_dir.rmdir()
    except Exception:
        pass

    print(f"Decrypted -> {out_path} (RSA unwrap {rsa_time:.6f}s, AES time {aes_time:.6f}s)")

    if getattr(args, "log", None):
        with open(args.log, "a", newline="") as csvf:
            writer = csv.writer(csvf)
            if csvf.tell() == 0:
                writer.writerow(["operation", "aes_mode", "aes_bits", "rsa_bits", "package_bytes", "output_bytes", "rsa_time_s", "aes_time_s", "timestamp"])
            writer.writerow(["decrypt", package_metadata["aes_mode"], package_metadata["aes_bits"], package_metadata["rsa_bits"], inpkg.stat().st_size, out_path.stat().st_size, round(rsa_time, 6), round(aes_time, 6), int(time.time())])


# --- Benchmark harness (automatic experiments)
def run_benchmarks(args):
    aes_modes = args.modes
    aes_bits_list = args.aes_bits
    rsa_bits_list = args.rsa_bits
    sample_files = [Path(p) for p in args.samples]

    out_csv = Path(args.out_csv)
    out_csv.parent.mkdir(parents=True, exist_ok=True)

    header = ["aes_mode", "aes_bits", "rsa_bits", "sample_file", "input_bytes", "package_bytes", "aes_time_s", "rsa_wrap_s", "rsa_unwrap_s", "decrypt_aes_s"]
    with open(out_csv, "w", newline="") as csvf:
        writer = csv.writer(csvf)
        writer.writerow(header)

        for sample in sample_files:
            for mode in aes_modes:
                for a_bits in aes_bits_list:
                    for r_bits in rsa_bits_list:
                        # generate fresh RSA key for wrap/unwrap timings
                        priv, pub = generate_rsa_keypair(r_bits)
                        # encrypt
                        master_key = get_random_bytes(32)
                        tmp_ct = Path("tmp_bench_ct.bin")
                        if tmp_ct.exists():
                            tmp_ct.unlink()

                        t0 = time.perf_counter()
                        with open(tmp_ct, "wb") as f:
                            meta = encrypt_stream(sample, f, mode, master_key, a_bits)
                        t1 = time.perf_counter()
                        aes_time = t1 - t0

                        t0 = time.perf_counter()
                        enc_session = rsa_encrypt_session_key(master_key, pub)
                        t1 = time.perf_counter()
                        rsa_wrap = t1 - t0

                        # package
                        pkg = Path(f"bench_{mode}_{a_bits}_{r_bits}_{sample.name}.zip")
                        package_metadata = {"original_filename": sample.name, "aes_mode": mode, "aes_bits": a_bits, "rsa_bits": r_bits, "metadata": meta}
                        package_to_zip(pkg, enc_session, tmp_ct, package_metadata, None)

                        # decrypt - measure unwrap + aes decryption
                        t0 = time.perf_counter()
                        master_key2 = rsa_decrypt_session_key(enc_session, priv)
                        t1 = time.perf_counter()
                        rsa_unwrap = t1 - t0

                        # decrypt AES from package
                        t0 = time.perf_counter()
                        extract_dir = Path("tmp_bench_extract")
                        if extract_dir.exists():
                            for f in extract_dir.iterdir():
                                f.unlink()
                        else:
                            extract_dir.mkdir()
                        ciphertext_path, enc_key, pm, sig = unpack_zip_package(pkg, extract_dir)
                        with open(ciphertext_path, "rb") as ct_f:
                            decrypt_stream(ct_f, Path("tmp_bench_out.bin"), pm["aes_mode"], master_key2, pm["metadata"], pm["aes_bits"])
                        t1 = time.perf_counter()
                        decrypt_aes = t1 - t0

                        in_b = sample.stat().st_size
                        pkg_b = pkg.stat().st_size

                        writer.writerow([mode, a_bits, r_bits, sample.name, in_b, pkg_b, round(aes_time, 6), round(rsa_wrap, 6), round(rsa_unwrap, 6), round(decrypt_aes, 6)])

                        # cleanup
                        try:
                            tmp_ct.unlink()
                            pkg.unlink()
                            for f in extract_dir.iterdir():
                                f.unlink()
                            extract_dir.rmdir()
                            Path("tmp_bench_out.bin").unlink()
                        except Exception:
                            pass

    print(f"Benchmark completed -> {out_csv}")


# --- CLI
def build_parser():
    p = argparse.ArgumentParser(description="Advanced hybrid AES+RSA secure file transfer")
    sub = p.add_subparsers(dest="cmd", required=True)

    g = sub.add_parser("gen-rsa")
    g.add_argument("--bits", type=int, choices=[2048, 3072], default=2048)
    g.add_argument("--outdir", type=str, default="keys")
    g.add_argument("--priv-passphrase", type=str, default=None, help="Optional passphrase to protect private key PEM")
    g.set_defaults(func=cmd_gen_rsa)

    e = sub.add_parser("encrypt")
    e.add_argument("--infile", required=True)
    e.add_argument("--out", required=True)
    e.add_argument("--pubkey", required=True)
    e.add_argument("--aes-mode", choices=["GCM", "CTR", "CBC"], default="GCM")
    e.add_argument("--aes-bits", choices=[128, 192, 256], type=int, default=256)
    e.add_argument("--rsa-bits", choices=[2048, 3072], type=int, default=2048)
    e.add_argument("--sign", action="store_true")
    e.add_argument("--sign-privkey", help="Sender private key for signing (PEM with optional passphrase)")
    e.add_argument("--sign-priv-passphrase", help="Passphrase for sender private key if protected")
    e.add_argument("--log", help="CSV log file")
    e.set_defaults(func=cmd_encrypt)

    d = sub.add_parser("decrypt")
    d.add_argument("--infile", required=True)
    d.add_argument("--privkey", required=True)
    d.add_argument("--priv-passphrase", required=False, help="Passphrase if private key is protected")
    d.add_argument("--outdir", required=True)
    d.add_argument("--verify-pubkey", help="Sender public key to verify signature")
    d.add_argument("--log", help="CSV log file")
    d.set_defaults(func=cmd_decrypt)

    b = sub.add_parser("benchmark")
    b.add_argument("--modes", nargs="+", default=["GCM", "CTR", "CBC"])
    b.add_argument("--aes-bits", nargs="+", type=int, default=[128, 256])
    b.add_argument("--rsa-bits", nargs="+", type=int, default=[2048, 3072])
    b.add_argument("--samples", nargs="+", required=True, help="Paths to sample files to test")
    b.add_argument("--out-csv", default="benchmark_results.csv")
    b.set_defaults(func=run_benchmarks)

    return p


def _is_interactive_session() -> bool:
    # Detect common interactive environments (Jupyter/Colab/IPython)
    if "ipykernel" in sys.modules:
        return True
    if "google.colab" in sys.modules:
        return True
    try:
        # get_ipython exists in interactive shells
        if "get_ipython" in globals() and get_ipython is not None:
            return True
    except Exception:
        pass
    return False


def main(argv=None):
    parser = build_parser()

    # If user did not pass argv and we are in an interactive session, DO NOT consume notebook argv
    if argv is None:
        if _is_interactive_session():
            # safer: require explicit args in notebook; show help if none provided
            argv = []
        else:
            argv = sys.argv[1:]

    # parse_known_args is tolerant of unknown flags; but positional invalid choices are still checked.
    # Because in interactive mode we set argv = [], it won't try to parse kernel-injected args.
    args, _unknown = parser.parse_known_args(argv)

    if getattr(args, "func", None):
        args.func(args)
    else:
        parser.print_help()


if __name__ == "__main__":
    # CLI mode: uses real sys.argv
    main()


usage: colab_kernel_launcher.py [-h] {gen-rsa,encrypt,decrypt,benchmark} ...
colab_kernel_launcher.py: error: the following arguments are required: cmd
ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/lib/python3.12/argparse.py", line 1943, in _parse_known_args2
    namespace, args = self._parse_known_args(args, namespace, intermixed)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/argparse.py", line 2230, in _parse_known_args
    raise ArgumentError(None, _('the following arguments are required: %s') %
argparse.ArgumentError: the following arguments are required: cmd

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipython-input-2004304160.py", line 574, in <cell line: 0>
    main()
  File "/tmp/ipython-input-2004304160.py", line 564, in main
    args, _unknown = parser.parse_known_args(argv)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/l

TypeError: object of type 'NoneType' has no len()

In [None]:
# ✅ Always put this at the top
import argparse
import os
import sys
import zipfile
from typing import Optional, Tuple

from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Hash import HMAC, SHA256
from Crypto.Protocol.KDF import HKDF, scrypt
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes


# --------- RSA Key Generation ---------
def generate_rsa_keys(bits: int = 2048, outdir: str = "keys") -> None:
    key = RSA.generate(bits)
    private_key = key.export_key()
    public_key = key.publickey().export_key()

    os.makedirs(outdir, exist_ok=True)
    with open(os.path.join(outdir, "private.pem"), "wb") as f:
        f.write(private_key)
    with open(os.path.join(outdir, "public.pem"), "wb") as f:
        f.write(public_key)

    print(f"[+] RSA keys generated in {outdir}/")


# --------- Hybrid Encryption ---------
def encrypt_file(
    infile: str, outfile: str, pubkey_path: str, password: Optional[str] = None
) -> None:
    with open(infile, "rb") as f:
        plaintext = f.read()

    with open(pubkey_path, "rb") as f:
        recipient_key = RSA.import_key(f.read())

    # Generate AES session key
    session_key = get_random_bytes(32)

    # Optional password strengthening
    if password:
        session_key = scrypt(password.encode(), get_random_bytes(16), 32, N=2 ** 14, r=8, p=1)

    # AES-GCM for confidentiality & integrity
    cipher_aes = AES.new(session_key, AES.MODE_GCM)
    ciphertext, tag = cipher_aes.encrypt_and_digest(plaintext)

    # RSA-OAEP encrypt AES key
    cipher_rsa = PKCS1_OAEP.new(recipient_key)
    enc_session_key = cipher_rsa.encrypt(session_key)

    # Package into ZIP
    with zipfile.ZipFile(outfile, "w") as zf:
        zf.writestr("enc_session_key.bin", enc_session_key)
        zf.writestr("nonce.bin", cipher_aes.nonce)
        zf.writestr("tag.bin", tag)
        zf.writestr("ciphertext.bin", ciphertext)

    print(f"[+] File encrypted → {outfile}")


# --------- Hybrid Decryption ---------
def decrypt_file(infile: str, outfile: str, privkey_path: str, password: Optional[str] = None) -> None:
    with zipfile.ZipFile(infile, "r") as zf:
        enc_session_key = zf.read("enc_session_key.bin")
        nonce = zf.read("nonce.bin")
        tag = zf.read("tag.bin")
        ciphertext = zf.read("ciphertext.bin")

    with open(privkey_path, "rb") as f:
        private_key = RSA.import_key(f.read())

    cipher_rsa = PKCS1_OAEP.new(private_key)
    session_key = cipher_rsa.decrypt(enc_session_key)

    if password:
        session_key = scrypt(password.encode(), get_random_bytes(16), 32, N=2 ** 14, r=8, p=1)

    cipher_aes = AES.new(session_key, AES.MODE_GCM, nonce)
    plaintext = cipher_aes.decrypt_and_verify(ciphertext, tag)

    with open(outfile, "wb") as f:
        f.write(plaintext)

    print(f"[+] File decrypted → {outfile}")


# --------- CLI / Notebook Compatible ---------
def main(argv=None):
    parser = argparse.ArgumentParser(description="Hybrid AES + RSA Encryption Tool")
    subparsers = parser.add_subparsers(dest="command", required=True)

    # RSA keygen
    gen_parser = subparsers.add_parser("gen-rsa", help="Generate RSA key pair")
    gen_parser.add_argument("--bits", type=int, default=2048, help="RSA key size in bits")
    gen_parser.add_argument("--outdir", type=str, default="keys", help="Output directory for keys")

    # Encrypt
    enc_parser = subparsers.add_parser("encrypt", help="Encrypt a file")
    enc_parser.add_argument("--infile", type=str, required=True, help="Input file")
    enc_parser.add_argument("--outfile", type=str, required=True, help="Output file")
    enc_parser.add_argument("--pubkey", type=str, required=True, help="Recipient public key")
    enc_parser.add_argument("--password", type=str, help="Optional password")

    # Decrypt
    dec_parser = subparsers.add_parser("decrypt", help="Decrypt a file")
    dec_parser.add_argument("--infile", type=str, required=True, help="Encrypted file")
    dec_parser.add_argument("--outfile", type=str, required=True, help="Decrypted output")
    dec_parser.add_argument("--privkey", type=str, required=True, help="Private key path")
    dec_parser.add_argument("--password", type=str, help="Optional password")

    # Ignore Jupyter’s unwanted args
    args, _ = parser.parse_known_args(argv)

    if args.command == "gen-rsa":
        generate_rsa_keys(args.bits, args.outdir)
    elif args.command == "encrypt":
        encrypt_file(args.infile, args.outfile, args.pubkey, args.password)
    elif args.command == "decrypt":
        decrypt_file(args.infile, args.outfile, args.privkey, args.password)


if __name__ == "__main__":
    main()


usage: colab_kernel_launcher.py [-h] {gen-rsa,encrypt,decrypt} ...
colab_kernel_launcher.py: error: argument command: invalid choice: '/root/.local/share/jupyter/runtime/kernel-5c00d68e-2f5d-4e61-b752-bc21bd6c5c6f.json' (choose from gen-rsa, encrypt, decrypt)
ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/lib/python3.12/argparse.py", line 1943, in _parse_known_args2
    namespace, args = self._parse_known_args(args, namespace, intermixed)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/argparse.py", line 2188, in _parse_known_args
    stop_index = consume_positionals(start_index)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/argparse.py", line 2141, in consume_positionals
    take_action(action, args)
  File "/usr/lib/python3.12/argparse.py", line 2003, in take_action
    argument_values = self._get_values(action, argument_strings)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/argparse.py", line 2523, in _get_values
    self._check_value(action, value[0])
  File "/usr/lib/python3.12/argparse.py", line 2573, in _check_value
    raise ArgumentError(action, msg % args)
argparse.ArgumentError: argument command: i

TypeError: object of type 'NoneType' has no len()