In [1]:
pip install cryptography


Note: you may need to restart the kernel to use updated packages.


In [2]:
import time
import hashlib
import hmac
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash

# ---- Step 1: Key Generation ---- #
def generate_keys():
    """Generate RSA key pair."""
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    public_key = private_key.public_key()
    return private_key, public_key

# Serialize keys
def serialize_key(key, is_private=False):
    """Serialize keys for storage or transmission."""
    if is_private:
        return key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )
    else:
        return key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

# ---- Step 2: Time-Locked Authentication ---- #
def sign_time_nonce(private_key, timestamp, nonce):
    """Sign a timestamp + nonce with a private key."""
    message = f"{timestamp}:{nonce}".encode()
    signature = private_key.sign(message, PKCS1v15(), SHA256())
    return signature

def verify_signature(public_key, timestamp, nonce, signature):
    """Verify the timestamp + nonce signature."""
    message = f"{timestamp}:{nonce}".encode()
    try:
        public_key.verify(signature, message, PKCS1v15(), SHA256())
        return True
    except Exception as e:
        return False

# ---- Step 3: Blockchain-Inspired Integrity ---- #
class Packet:
    """Represents a single data packet in the session."""
    def __init__(self, data, prev_hash, timestamp, session_id):
        self.data = data
        self.prev_hash = prev_hash
        self.timestamp = timestamp
        self.session_id = session_id

    def compute_hash(self):
        """Compute the hash of the packet."""
        content = f"{self.data}:{self.prev_hash}:{self.timestamp}:{self.session_id}".encode()
        return hashlib.sha256(content).hexdigest()

class BlockchainSession:
    """Manages the session using a blockchain-inspired structure."""
    def __init__(self, session_id):
        self.session_id = session_id
        self.chain = []

    def add_packet(self, packet):
        """Add a packet to the blockchain session."""
        if len(self.chain) > 0:
            packet.prev_hash = self.chain[-1].compute_hash()
        else:
            packet.prev_hash = "0"  # Genesis block
        self.chain.append(packet)

    def verify_chain(self):
        """Verify the integrity of the blockchain session."""
        for i in range(1, len(self.chain)):
            current_packet = self.chain[i]
            prev_packet = self.chain[i - 1]
            if current_packet.prev_hash != prev_packet.compute_hash():
                return False
        return True

# ---- Step 4: Test the Prototype ---- #
if __name__ == "__main__":
    # Generate client and server keys
    client_private, client_public = generate_keys()
    server_private, server_public = generate_keys()

    # Step 1: Time-Locked Authentication
    timestamp = int(time.time())
    nonce = "random_nonce"
    signature = sign_time_nonce(server_private, timestamp, nonce)
    auth_verified = verify_signature(server_public, timestamp, nonce, signature)
    print(f"Authentication Verified: {auth_verified}")

    # Step 2: Blockchain-Inspired Integrity
    session = BlockchainSession(session_id="SESSION123")
    packet1 = Packet(data="Hello, World!", prev_hash="", timestamp=timestamp, session_id="SESSION123")
    session.add_packet(packet1)

    packet2 = Packet(data="Another message", prev_hash="", timestamp=timestamp + 1, session_id="SESSION123")
    session.add_packet(packet2)

    # Verify the blockchain integrity
    print(f"Blockchain Integrity Verified: {session.verify_chain()}")

    # Tamper with a packet
    session.chain[1].data = "Tampered message"
    print(f"Blockchain Integrity After Tampering: {session.verify_chain()}")


Authentication Verified: True
Blockchain Integrity Verified: True
Blockchain Integrity After Tampering: True


In [3]:
# ---- Step 5: Testing the Security Mechanism ---- #
def test_authentication():
    """Test the time-locked authentication process."""
    print("\n--- Authentication Test ---")
    timestamp = int(time.time())
    nonce = "auth_nonce"

    # Valid signature
    valid_signature = sign_time_nonce(server_private, timestamp, nonce)
    is_valid = verify_signature(server_public, timestamp, nonce, valid_signature)
    print(f"Valid Authentication Test Passed: {is_valid}")

    # Forged signature
    forged_signature = valid_signature[:-1] + b"0"  # Modify the valid signature
    is_forged_valid = verify_signature(server_public, timestamp, nonce, forged_signature)
    print(f"Invalid Authentication Test Passed: {not is_forged_valid}")

def test_integrity():
    """Test the blockchain-inspired integrity verification."""
    print("\n--- Integrity Test ---")
    session = BlockchainSession(session_id="TEST_SESSION")

    # Create packets
    packet1 = Packet(data="Packet 1 data", prev_hash="", timestamp=int(time.time()), session_id=session.session_id)
    session.add_packet(packet1)

    packet2 = Packet(data="Packet 2 data", prev_hash="", timestamp=int(time.time()), session_id=session.session_id)
    session.add_packet(packet2)

    # Verify integrity
    is_valid = session.verify_chain()
    print(f"Original Chain Integrity Verified: {is_valid}")

    # Tamper with a packet
    session.chain[1].data = "Tampered data"
    is_tampered_valid = session.verify_chain()
    print(f"Tampered Chain Integrity Verified: {is_tampered_valid == False}")

# Run tests
if __name__ == "__main__":
    test_authentication()
    test_integrity()



--- Authentication Test ---
Valid Authentication Test Passed: True
Invalid Authentication Test Passed: True

--- Integrity Test ---
Original Chain Integrity Verified: True
Tampered Chain Integrity Verified: False


In [4]:
# ---- Step 6: Developing and Demonstrating Attacks ---- #
def attack_authentication():
    """Simulate an attacker trying to forge a valid authentication signature."""
    print("\n--- Authentication Attack ---")
    timestamp = int(time.time())
    nonce = "attack_nonce"

    # Attacker does not have the server's private key
    forged_signature = b"random_data"
    is_valid = verify_signature(server_public, timestamp, nonce, forged_signature)
    print(f"Authentication Attack Successful: {not is_valid}")

def attack_integrity():
    """Simulate an attacker modifying the blockchain session data."""
    print("\n--- Integrity Attack ---")
    session = BlockchainSession(session_id="ATTACK_SESSION")

    # Create original packets
    packet1 = Packet(data="Original Packet 1", prev_hash="", timestamp=int(time.time()), session_id=session.session_id)
    session.add_packet(packet1)

    packet2 = Packet(data="Original Packet 2", prev_hash="", timestamp=int(time.time()), session_id=session.session_id)
    session.add_packet(packet2)

    # Verify original chain
    print(f"Original Chain Integrity Verified: {session.verify_chain()}")

    # Attacker modifies packet
    session.chain[1].data = "Modified Packet 2"
    print(f"Tampered Chain Integrity Verified: {session.verify_chain()} (Expected: False)")

# Run attacks
if __name__ == "__main__":
    attack_authentication()
    attack_integrity()



--- Authentication Attack ---
Authentication Attack Successful: True

--- Integrity Attack ---
Original Chain Integrity Verified: True
Tampered Chain Integrity Verified: True (Expected: False)
