In [1]:
# Kerberos Simulation (AES-GCM version) - Jupyter-ready single cell


import sys
import subprocess
import time
import json
import base64
import secrets
from pprint import pprint

# Try to import cryptography, install if necessary
try:
    from cryptography.hazmat.primitives.ciphers.aead import AESGCM
    from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.backends import default_backend
except Exception:
    print("cryptography not found — installing...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "cryptography"])
    from cryptography.hazmat.primitives.ciphers.aead import AESGCM
    from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.backends import default_backend

# ---------- Helpers ----------
def derive_key_from_password(password: str, salt: bytes = b"university_salt_v1", iterations: int = 200_000, key_len: int = 32) -> bytes:
    """Derive a symmetric key from a password using PBKDF2-HMAC-SHA256"""
    kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=key_len, salt=salt, iterations=iterations, backend=default_backend())
    return kdf.derive(password.encode())

def aesgcm_encrypt_struct(obj: dict, key: bytes) -> str:
    """Serialize JSON and encrypt with AES-GCM. Returns base64(nonce + ciphertext)."""
    aes = AESGCM(key)
    nonce = secrets.token_bytes(12)  # 96-bit nonce for AES-GCM
    plaintext = json.dumps(obj, separators=(",", ":"), sort_keys=True).encode()
    ct = aes.encrypt(nonce, plaintext, associated_data=None)
    return base64.b64encode(nonce + ct).decode()

def aesgcm_decrypt_struct(b64string: str, key: bytes) -> dict:
    raw = base64.b64decode(b64string.encode())
    nonce, ct = raw[:12], raw[12:]
    aes = AESGCM(key)
    plaintext = aes.decrypt(nonce, ct, associated_data=None)
    return json.loads(plaintext.decode())

def now_ts() -> int:
    return int(time.time())

def pretty_b64(b: bytes) -> str:
    return base64.b64encode(b).decode()

# ---------- Simulation state (secrets/databases) ----------
user_db = {
    "alice": {"password": "alice_pass_123", "uid": 1001},
    "bob":   {"password": "bob_secure_pw", "uid": 1002},
}

# TGS long-term key (shared between AS and TGS)
tgs_key = secrets.token_bytes(32)

# Services: each has its own long-term key (shared with TGS)
service_db = {
    "university_portal": {
        "service_key": secrets.token_bytes(32),
        "service_id": "portal01.univ.edu",
    }
}

# ---------- AS (Authentication Server) ----------
def as_authenticate_and_issue_ticket_tgs(username: str, client_request: dict):
    print(f"\n[AS] Authentication request received for '{username}'")
    if username not in user_db:
        raise ValueError("Unknown user")
    user_record = user_db[username]
    # Create session key for client <-> TGS
    session_key_c_tgs = secrets.token_bytes(32)
    expiry = now_ts() + 300  # 5 minutes validity (demo)
    ticket_tgs = {
        "username": username,
        "uid": user_record["uid"],
        "session_key_c_tgs": pretty_b64(session_key_c_tgs),
        "expiry": expiry,
        "issued_by": "TGS"
    }
    # AS encrypts ticket_tgs with TGS's long-term key
    encrypted_ticket_tgs = aesgcm_encrypt_struct(ticket_tgs, tgs_key)
    # AS prepares message for client encrypted with key derived from user's password
    client_key = derive_key_from_password(user_record["password"])
    as_to_client = {
        "session_key_c_tgs": pretty_b64(session_key_c_tgs),
        "expiry": expiry,
        "tgs": "TGS-server",
        "ticket_tgs": encrypted_ticket_tgs
    }
    encrypted_for_client = aesgcm_encrypt_struct(as_to_client, client_key)
    print("[AS] Issued Ticket_TGS and encrypted response for client.")
    return encrypted_for_client

# ---------- Client: request Ticket_TGS ----------
def client_request_ticket_tgs(username: str, password: str):
    req = {"username": username, "timestamp": now_ts()}
    print(f"\n[Client] Sending authentication request to AS: {req}")
    as_response_enc = as_authenticate_and_issue_ticket_tgs(username, req)
    client_key = derive_key_from_password(password)
    decrypted = aesgcm_decrypt_struct(as_response_enc, client_key)
    print("[Client] Decrypted AS response; obtained session_key for TGS and Ticket_TGS.")
    return decrypted

# ---------- TGS ----------
def tgs_validate_and_issue_service_ticket(ticket_tgs_enc: str, authenticator_enc: str, requested_service: str):
    print(f"\n[TGS] Received Ticket_TGS + Authenticator for service '{requested_service}'")
    # Decrypt ticket with TGS key
    ticket_tgs = aesgcm_decrypt_struct(ticket_tgs_enc, tgs_key)
    session_key_c_tgs = base64.b64decode(ticket_tgs["session_key_c_tgs"].encode())
    # Decrypt authenticator using session_key_c_tgs
    authenticator = aesgcm_decrypt_struct(authenticator_enc, session_key_c_tgs)
    # Validate
    if authenticator["username"] != ticket_tgs["username"]:
        raise ValueError("Authenticator username mismatch")
    if abs(now_ts() - authenticator["timestamp"]) > 120:
        raise ValueError("Authenticator timestamp outside allowed window (possible replay)")
    print("[TGS] Authenticator validated.")
    # Create session key for client <-> service
    session_key_c_s = secrets.token_bytes(32)
    expiry = now_ts() + 300
    service = service_db.get(requested_service)
    if not service:
        raise ValueError("Requested service unknown")
    ticket_service = {
        "username": ticket_tgs["username"],
        "uid": ticket_tgs["uid"],
        "session_key_c_s": pretty_b64(session_key_c_s),
        "expiry": expiry,
        "service_id": service["service_id"]
    }
    encrypted_ticket_service = aesgcm_encrypt_struct(ticket_service, service["service_key"])
    tgs_to_client = {
        "session_key_c_s": pretty_b64(session_key_c_s),
        "expiry": expiry,
        "service": requested_service,
        "ticket_service": encrypted_ticket_service
    }
    encrypted_for_client = aesgcm_encrypt_struct(tgs_to_client, session_key_c_tgs)
    print("[TGS] Issued Ticket_Service and encrypted session info for client.")
    return encrypted_for_client

# ---------- Client: access service ----------
def client_access_service(username: str, password: str, service_name: str):
    # 1) Get ticket_tgs from AS
    as_resp = client_request_ticket_tgs(username, password)
    session_key_c_tgs = base64.b64decode(as_resp["session_key_c_tgs"].encode())
    ticket_tgs_enc = as_resp["ticket_tgs"]
    # 2) Authenticator for TGS
    authenticator_tgs = {"username": username, "timestamp": now_ts()}
    authenticator_tgs_enc = aesgcm_encrypt_struct(authenticator_tgs, session_key_c_tgs)
    # Send to TGS
    tgs_response_enc = tgs_validate_and_issue_service_ticket(ticket_tgs_enc, authenticator_tgs_enc, service_name)
    tgs_response = aesgcm_decrypt_struct(tgs_response_enc, session_key_c_tgs)
    session_key_c_s = base64.b64decode(tgs_response["session_key_c_s"].encode())
    ticket_service_enc = tgs_response["ticket_service"]
    print("[Client] Received session key for service and Ticket_Service from TGS.")
    # 3) Authenticator for Service
    authenticator_service = {"username": username, "timestamp": now_ts()}
    authenticator_service_enc = aesgcm_encrypt_struct(authenticator_service, session_key_c_s)
    print(f"\n[Client] Sending Ticket_Service + Authenticator to service '{service_name}'.")
    service_response_enc = service_validate_and_respond(ticket_service_enc, authenticator_service_enc, service_name)
    confirmation = aesgcm_decrypt_struct(service_response_enc, session_key_c_s)
    print(f"[Client] Service responded: {confirmation}")
    return confirmation

# ---------- Service ----------
def service_validate_and_respond(ticket_service_enc: str, authenticator_enc: str, service_name: str):
    print(f"\n[Service:{service_name}] Received Ticket_Service + Authenticator.")
    service = service_db[service_name]
    service_key = service["service_key"]
    ticket_service = aesgcm_decrypt_struct(ticket_service_enc, service_key)
    session_key_c_s = base64.b64decode(ticket_service["session_key_c_s"].encode())
    authenticator = aesgcm_decrypt_struct(authenticator_enc, session_key_c_s)
    if authenticator["username"] != ticket_service["username"]:
        raise ValueError("Service username mismatch")
    if abs(now_ts() - authenticator["timestamp"]) > 120:
        raise ValueError("Service authenticator timestamp expiry")
    print(f"[Service:{service_name}] Authenticator validated. Access granted to {ticket_service['username']}.")
    confirmation = {"status": "OK", "message": f"Welcome {ticket_service['username']} to {service['service_id']}", "timestamp": now_ts()}
    return aesgcm_encrypt_struct(confirmation, session_key_c_s)

# ---------- Demo run ----------
if __name__ == "__main__":
    print("Kerberos AES-GCM Simulation - Demo Run")
    username = "alice"
    password = "alice_pass_123"
    service_requested = "university_portal"
    confirmation = client_access_service(username, password, service_requested)
    print("\n--- Final Confirmation (decrypted) ---")
    pprint(confirmation)


Kerberos AES-GCM Simulation - Demo Run

[Client] Sending authentication request to AS: {'username': 'alice', 'timestamp': 1761226116}

[AS] Authentication request received for 'alice'
[AS] Issued Ticket_TGS and encrypted response for client.
[Client] Decrypted AS response; obtained session_key for TGS and Ticket_TGS.

[TGS] Received Ticket_TGS + Authenticator for service 'university_portal'
[TGS] Authenticator validated.
[TGS] Issued Ticket_Service and encrypted session info for client.
[Client] Received session key for service and Ticket_Service from TGS.

[Client] Sending Ticket_Service + Authenticator to service 'university_portal'.

[Service:university_portal] Received Ticket_Service + Authenticator.
[Service:university_portal] Authenticator validated. Access granted to alice.
[Client] Service responded: {'message': 'Welcome alice to portal01.univ.edu', 'status': 'OK', 'timestamp': 1761226117}

--- Final Confirmation (decrypted) ---
{'message': 'Welcome alice to portal01.univ.edu',