In [None]:
# Initialize workspace, folders, and runtime config
import os, sys, json, time, random

# Define base path
BASE_DIR = "/content/offsec_workspace"
ART_DIR = os.path.join(BASE_DIR, "artifacts")
DATA_DIR = os.path.join(BASE_DIR, "data")
LOGS_DIR = os.path.join(BASE_DIR, "logs")
SBX_DIR  = os.path.join(BASE_DIR, "sandbox")

# Create folders
for p in [BASE_DIR, ART_DIR, DATA_DIR, LOGS_DIR, SBX_DIR]:
    os.makedirs(p, exist_ok=True)

# Define config
config = {
    "project_name": "Offensive Security in a Controlled Sandbox (Ethical Hacking)",
    "base_dir": BASE_DIR,
    "random_seed": 42,
    "created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}

# Set seed
random.seed(config["random_seed"])

# Save config
cfg_path = os.path.join(ART_DIR, "config.json")
with open(cfg_path, "w") as f:
    json.dump(config, f, indent=2)

# Print status
print("Python:", sys.version.split()[0])
print("Base dir:", BASE_DIR)
print("Config:", cfg_path)


Python: 3.12.12
Base dir: /content/offsec_workspace
Config: /content/offsec_workspace/artifacts/config.json


In [None]:
# Install runtime packages
!pip -q install flask requests


In [None]:
# Start a local login service with throttling and logging
import os, time, threading, secrets, logging
from collections import defaultdict, deque
from flask import Flask, request, jsonify, session
from werkzeug.security import generate_password_hash, check_password_hash

BASE_DIR = "/content/offsec_workspace"
LOG_FILE = os.path.join(BASE_DIR, "logs", "security_events.log")
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)

logger = logging.getLogger("security")
logger.setLevel(logging.INFO)
if not logger.handlers:
    fh = logging.FileHandler(LOG_FILE)
    fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
    logger.addHandler(fh)

app = Flask(__name__)
app.secret_key = secrets.token_hex(32)

users = {
    "alice": {"pwd_hash": generate_password_hash("Alice#Pass123"), "locked_until": 0},
    "bob":   {"pwd_hash": generate_password_hash("Bob#Pass123"),   "locked_until": 0},
}

attempts = defaultdict(lambda: deque(maxlen=200))

WINDOW_SEC = 60
MAX_ATTEMPTS_PER_WINDOW = 10
LOCK_FAILS = 20
LOCK_WINDOW_SEC = 15 * 60
LOCK_SEC = 10 * 60

def _now():
    return int(time.time())

def _key(ip, username):
    return f"{ip}|{username}"

def _count_recent(dq, sec):
    t = _now()
    return sum(1 for x in dq if t - x <= sec)

@app.post("/login")
def login():
    ip = request.headers.get("X-Forwarded-For", request.remote_addr) or "unknown"
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")

    user = users.get(username)
    if not user:
        logger.info(f"LOGIN user=UNKNOWN ip={ip} result=FAIL")
        return jsonify({"ok": False}), 401

    if _now() < user["locked_until"]:
        logger.warning(f"LOGIN user={username} ip={ip} result=LOCKED")
        return jsonify({"ok": False, "reason": "locked"}), 423

    dq = attempts[_key(ip, username)]
    if _count_recent(dq, WINDOW_SEC) >= MAX_ATTEMPTS_PER_WINDOW:
        logger.warning(f"LOGIN user={username} ip={ip} result=THROTTLED")
        return jsonify({"ok": False, "reason": "throttled"}), 429

    if check_password_hash(user["pwd_hash"], password):
        session["user"] = username
        logger.info(f"LOGIN user={username} ip={ip} result=OK")
        return jsonify({"ok": True}), 200

    dq.append(_now())
    recent_fails = _count_recent(dq, LOCK_WINDOW_SEC)
    if recent_fails >= LOCK_FAILS:
        user["locked_until"] = _now() + LOCK_SEC
        logger.warning(f"LOGIN user={username} ip={ip} result=LOCK_SET fails={recent_fails}")
        return jsonify({"ok": False, "reason": "locked"}), 423

    logger.info(f"LOGIN user={username} ip={ip} result=FAIL fails_window={recent_fails}")
    return jsonify({"ok": False}), 401

@app.get("/me")
def me():
    u = session.get("user")
    if not u:
        return jsonify({"ok": False}), 401
    return jsonify({"ok": True, "user": u}), 200

def run_service():
    app.run(host="127.0.0.1", port=8000, debug=False, use_reloader=False, threaded=True)

t = threading.Thread(target=run_service, daemon=True)
t.start()

time.sleep(1)
print("Service ready on http://127.0.0.1:8000")
print("Log file:", LOG_FILE)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:8000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:8000
Log file: /content/offsec_workspace/logs/security_events.log


In [None]:
# Technique 1: testing password-guessing/brute-force behavior against a login endpoint and validating defenses (throttling, backoff, lockout)
# Start upgraded login service with per-IP throttling and backoff
import os, time, threading, secrets, logging, math
from collections import defaultdict, deque
from flask import Flask, request, jsonify, session, make_response
from werkzeug.security import generate_password_hash, check_password_hash

# Define paths
BASE_DIR = "/content/offsec_workspace"
LOG_FILE = os.path.join(BASE_DIR, "logs", "security_events.log")
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)

# Configure logger
logger = logging.getLogger("security_v2")
logger.setLevel(logging.INFO)
if not logger.handlers:
    fh = logging.FileHandler(LOG_FILE)
    fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
    logger.addHandler(fh)

# Create app
app = Flask(__name__)
app.secret_key = secrets.token_hex(32)

# Define users
users = {
    "alice": {"pwd_hash": generate_password_hash("Alice#Pass123"), "locked_until": 0},
    "bob":   {"pwd_hash": generate_password_hash("Bob#Pass123"),   "locked_until": 0},
}

# Track attempts
attempts_user = defaultdict(lambda: deque(maxlen=500))
attempts_ip = defaultdict(lambda: deque(maxlen=2000))

# Track backoff
fail_streak = defaultdict(int)
next_allowed = defaultdict(int)

# Define thresholds
USER_WINDOW_SEC = 60
USER_MAX_PER_WINDOW = 10

IP_WINDOW_SEC = 60
IP_MAX_PER_WINDOW = 40

LOCK_WINDOW_SEC = 15 * 60
LOCK_FAILS = 20
LOCK_SEC = 10 * 60

BACKOFF_BASE_SEC = 1
BACKOFF_CAP_SEC = 30

def _now():
    return int(time.time())

def _key(ip, username):
    return f"{ip}|{username}"

def _count_recent(dq, sec):
    t = _now()
    return sum(1 for x in dq if t - x <= sec)

def _retry_after(sec):
    return max(1, int(sec))

@app.post("/login")
def login():
    # Read input
    ip = request.remote_addr or "unknown"
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")

    # Validate user
    user = users.get(username)
    if not user:
        logger.info(f"LOGIN user=UNKNOWN ip={ip} result=FAIL")
        return jsonify({"ok": False}), 401

    # Enforce lock
    if _now() < user["locked_until"]:
        logger.warning(f"LOGIN user={username} ip={ip} result=LOCKED")
        return jsonify({"ok": False, "reason": "locked"}), 423

    # Enforce backoff
    k = _key(ip, username)
    if _now() < next_allowed[k]:
        wait = next_allowed[k] - _now()
        logger.warning(f"LOGIN user={username} ip={ip} result=BACKOFF wait={wait}s streak={fail_streak[k]}")
        resp = make_response(jsonify({"ok": False, "reason": "backoff"}), 429)
        resp.headers["Retry-After"] = str(_retry_after(wait))
        return resp

    # Enforce per-IP throttling
    dq_ip = attempts_ip[ip]
    if _count_recent(dq_ip, IP_WINDOW_SEC) >= IP_MAX_PER_WINDOW:
        logger.warning(f"LOGIN user={username} ip={ip} result=IP_THROTTLED")
        resp = make_response(jsonify({"ok": False, "reason": "ip_throttled"}), 429)
        resp.headers["Retry-After"] = str(_retry_after(10))
        return resp

    # Enforce per-user throttling
    dq_user = attempts_user[k]
    if _count_recent(dq_user, USER_WINDOW_SEC) >= USER_MAX_PER_WINDOW:
        logger.warning(f"LOGIN user={username} ip={ip} result=USER_THROTTLED")
        resp = make_response(jsonify({"ok": False, "reason": "user_throttled"}), 429)
        resp.headers["Retry-After"] = str(_retry_after(10))
        return resp

    # Check password
    if check_password_hash(user["pwd_hash"], password):
        session["user"] = username
        fail_streak[k] = 0
        next_allowed[k] = 0
        logger.info(f"LOGIN user={username} ip={ip} result=OK")
        return jsonify({"ok": True}), 200

    # Register failure
    t = _now()
    dq_user.append(t)
    dq_ip.append(t)
    fail_streak[k] += 1

    # Apply exponential backoff
    backoff = min(BACKOFF_CAP_SEC, BACKOFF_BASE_SEC * (2 ** max(0, fail_streak[k] - 1)))
    next_allowed[k] = t + int(backoff)

    # Apply lock window
    recent_fails = _count_recent(dq_user, LOCK_WINDOW_SEC)
    if recent_fails >= LOCK_FAILS:
        user["locked_until"] = t + LOCK_SEC
        logger.warning(f"LOGIN user={username} ip={ip} result=LOCK_SET fails={recent_fails}")
        return jsonify({"ok": False, "reason": "locked"}), 423

    logger.info(f"LOGIN user={username} ip={ip} result=FAIL streak={fail_streak[k]} backoff={int(backoff)}s")
    resp = make_response(jsonify({"ok": False}), 401)
    return resp

@app.get("/me")
def me():
    # Return current user
    u = session.get("user")
    if not u:
        return jsonify({"ok": False}), 401
    return jsonify({"ok": True, "user": u}), 200

def run_service():
    # Run service
    app.run(host="127.0.0.1", port=8001, debug=False, use_reloader=False, threaded=True)

# Start service thread
t = threading.Thread(target=run_service, daemon=True)
t.start()

time.sleep(1)
print("Service ready on http://127.0.0.1:8001")
print("Log file:", LOG_FILE)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:8001
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:8001
Log file: /content/offsec_workspace/logs/security_events.log


In [None]:
# Send repeated failed logins and observe throttling/backoff
import requests, time, os

url = "http://127.0.0.1:8001/login"
payload = {"username": "alice", "password": "wrong-password"}

for i in range(12):
    r = requests.post(url, json=payload, timeout=5)
    ra = r.headers.get("Retry-After")
    print(i + 1, "status=", r.status_code, "retry_after=", ra, "body=", r.json())
    time.sleep(0.2)

log_path = "/content/offsec_workspace/logs/security_events.log"
if os.path.exists(log_path):
    with open(log_path, "r") as f:
        print("\n".join(f.readlines()[-15:]))


INFO:security_v2:LOGIN user=alice ip=127.0.0.1 result=FAIL streak=1 backoff=1s
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:21] "[31m[1mPOST /login HTTP/1.1[0m" 401 -


1 status= 401 retry_after= None body= {'ok': False}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:21] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


2 status= 429 retry_after= 1 body= {'ok': False, 'reason': 'backoff'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:21] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


3 status= 429 retry_after= 1 body= {'ok': False, 'reason': 'backoff'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:21] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


4 status= 429 retry_after= 1 body= {'ok': False, 'reason': 'backoff'}


INFO:security_v2:LOGIN user=alice ip=127.0.0.1 result=FAIL streak=2 backoff=2s
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:22] "[31m[1mPOST /login HTTP/1.1[0m" 401 -


5 status= 401 retry_after= None body= {'ok': False}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:22] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


6 status= 429 retry_after= 2 body= {'ok': False, 'reason': 'backoff'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:22] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


7 status= 429 retry_after= 2 body= {'ok': False, 'reason': 'backoff'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:22] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


8 status= 429 retry_after= 2 body= {'ok': False, 'reason': 'backoff'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:23] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


9 status= 429 retry_after= 1 body= {'ok': False, 'reason': 'backoff'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:23] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


10 status= 429 retry_after= 1 body= {'ok': False, 'reason': 'backoff'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:23] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


11 status= 429 retry_after= 1 body= {'ok': False, 'reason': 'backoff'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:23] "[31m[1mPOST /login HTTP/1.1[0m" 429 -


12 status= 429 retry_after= 1 body= {'ok': False, 'reason': 'backoff'}
2025-12-15 11:22:21,278 INFO LOGIN user=alice ip=127.0.0.1 result=FAIL streak=1 backoff=1s




2025-12-15 11:22:22,267 INFO LOGIN user=alice ip=127.0.0.1 result=FAIL streak=2 backoff=2s










In [None]:
# Technique 2: Harden session cookies and rotate session on sign-in
from flask import Flask, request, jsonify, session
import threading, time, secrets

# Create session-focused service
session_app = Flask(__name__)
session_app.secret_key = secrets.token_hex(32)

# Set cookie policy
session_app.config.update(
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE="Strict",
    SESSION_COOKIE_SECURE=False,
)

@session_app.post("/login")
def login_hardened():
    # Read input
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")

    # Validate credentials
    user = users.get(username)
    if not user or not check_password_hash(user["pwd_hash"], password):
        return jsonify({"ok": False}), 401

    # Rotate session
    session.clear()
    session["user"] = username
    session["sid"] = secrets.token_urlsafe(24)
    session["issued_at"] = int(time.time())

    return jsonify({"ok": True}), 200

@session_app.get("/me")
def me_hardened():
    # Return current identity
    u = session.get("user")
    sid = session.get("sid")
    if not u or not sid:
        return jsonify({"ok": False}), 401
    return jsonify({"ok": True, "user": u, "sid_prefix": sid[:8]}), 200

def run_session_service():
    # Run service
    session_app.run(host="127.0.0.1", port=8002, debug=False, use_reloader=False, threaded=True)

t2 = threading.Thread(target=run_session_service, daemon=True)
t2.start()

time.sleep(1)
print("Service ready on http://127.0.0.1:8002")


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:8002
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:8002


In [None]:
# Verify cookie flags and session rotation
import requests

s = requests.Session()

r1 = s.post("http://127.0.0.1:8002/login", json={"username": "alice", "password": "Alice#Pass123"}, timeout=5)
print("login status:", r1.status_code)
print("set-cookie:", r1.headers.get("Set-Cookie"))

r2 = s.get("http://127.0.0.1:8002/me", timeout=5)
print("me status:", r2.status_code, "body:", r2.json())

r3 = s.post("http://127.0.0.1:8002/login", json={"username": "alice", "password": "Alice#Pass123"}, timeout=5)
r4 = s.get("http://127.0.0.1:8002/me", timeout=5)
print("me after re-login:", r4.status_code, "body:", r4.json())


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:25] "POST /login HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:25] "GET /me HTTP/1.1" 200 -


login status: 200
set-cookie: session=eyJpc3N1ZWRfYXQiOjE3NjU3OTc3NDUsInNpZCI6ImJrc3ZoUmhpV1g4R0FpMk81LXZKbzc0X3JIOTVTMEIyIiwidXNlciI6ImFsaWNlIn0.aT_vcQ.lDP4c4oDUA9RpbeoCqDEpSWaWC4; HttpOnly; Path=/; SameSite=Strict
me status: 200 body: {'ok': True, 'sid_prefix': 'bksvhRhi', 'user': 'alice'}


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:25] "POST /login HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:25] "GET /me HTTP/1.1" 200 -


me after re-login: 200 body: {'ok': True, 'sid_prefix': '6zwvkyZn', 'user': 'alice'}


In [None]:
# Technique 3: Start access service with insecure vs enforced authorization
if "Flask" not in globals():
    from flask import Flask, request, jsonify, session
if "secrets" not in globals():
    import secrets
if "threading" not in globals():
    import threading
if "time" not in globals():
    import time
import socket

# Pick free port
_sock = socket.socket()
_sock.bind(("127.0.0.1", 0))
ACCESS_PORT = _sock.getsockname()[1]
_sock.close()

# Create service
access_app_v2 = Flask(__name__)
access_app_v2.secret_key = secrets.token_hex(32)

# Set cookie policy
access_app_v2.config.update(
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE="Strict",
    SESSION_COOKIE_SECURE=False,
)

# Define records
orders_v2 = {
    "1001": {"id": "1001", "owner": "alice", "item": "Router", "amount_usd": 120},
    "1002": {"id": "1002", "owner": "alice", "item": "Switch", "amount_usd": 340},
    "2001": {"id": "2001", "owner": "bob",   "item": "Firewall", "amount_usd": 900},
}

@access_app_v2.post("/login")
def login_access_v2():
    # Validate credentials
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")
    user = users.get(username)
    if not user or not check_password_hash(user["pwd_hash"], password):
        return jsonify({"ok": False}), 401

    # Rotate session
    session.clear()
    session["user"] = username
    session["sid"] = secrets.token_urlsafe(24)
    return jsonify({"ok": True}), 200

@access_app_v2.get("/orders/<order_id>")
def get_order_insecure_v2(order_id):
    # Require login
    if not session.get("user"):
        return jsonify({"ok": False}), 401

    # Return record without ownership check
    order = orders_v2.get(order_id)
    if not order:
        return jsonify({"ok": False}), 404
    return jsonify({"ok": True, "order": order}), 200

@access_app_v2.get("/orders_secure/<order_id>")
def get_order_secure_v2(order_id):
    # Require login
    user = session.get("user")
    if not user:
        return jsonify({"ok": False}), 401

    # Enforce ownership
    order = orders_v2.get(order_id)
    if not order:
        return jsonify({"ok": False}), 404
    if order["owner"] != user:
        return jsonify({"ok": False}), 403

    return jsonify({"ok": True, "order": order}), 200

def run_access_service_v2():
    # Run service
    access_app_v2.run(host="127.0.0.1", port=ACCESS_PORT, debug=False, use_reloader=False, threaded=True)

t_access = threading.Thread(target=run_access_service_v2, daemon=True)
t_access.start()

time.sleep(1)
print(f"Service ready on http://127.0.0.1:{ACCESS_PORT}")


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:42983
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:42983


In [None]:
# Technique 3: Validate insecure access vs enforced authorization
import requests

base = f"http://127.0.0.1:{ACCESS_PORT}"
s = requests.Session()

r_login = s.post(f"{base}/login", json={"username": "alice", "password": "Alice#Pass123"}, timeout=5)
print("login:", r_login.status_code, r_login.json())

r_ok = s.get(f"{base}/orders/1001", timeout=5)
print("insecure own:", r_ok.status_code, r_ok.json())

r_idor = s.get(f"{base}/orders/2001", timeout=5)
print("insecure other:", r_idor.status_code, r_idor.json())

r_block = s.get(f"{base}/orders_secure/2001", timeout=5)
print("secure other:", r_block.status_code, r_block.json())


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:26] "POST /login HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:26] "GET /orders/1001 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:26] "GET /orders/2001 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:26] "[31m[1mGET /orders_secure/2001 HTTP/1.1[0m" 403 -


login: 200 {'ok': True}
insecure own: 200 {'ok': True, 'order': {'amount_usd': 120, 'id': '1001', 'item': 'Router', 'owner': 'alice'}}
insecure other: 200 {'ok': True, 'order': {'amount_usd': 900, 'id': '2001', 'item': 'Firewall', 'owner': 'bob'}}
secure other: 403 {'ok': False}


In [None]:
# Technique 4: Build a SQL-backed endpoint and show why parameterized queries matter
import os, sqlite3

DB_PATH = "/content/offsec_workspace/data/app.db"
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)

conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()

cur.execute("DROP TABLE IF EXISTS orders")
cur.execute("""
CREATE TABLE orders (
  id TEXT PRIMARY KEY,
  owner TEXT NOT NULL,
  item TEXT NOT NULL,
  amount_usd INTEGER NOT NULL
)
""")

cur.executemany(
    "INSERT INTO orders (id, owner, item, amount_usd) VALUES (?, ?, ?, ?)",
    [
        ("1001", "alice", "Router", 120),
        ("1002", "alice", "Switch", 340),
        ("2001", "bob",   "Firewall", 900),
    ],
)

conn.commit()
conn.close()

print("DB ready:", DB_PATH)


DB ready: /content/offsec_workspace/data/app.db


In [None]:
# Technique 4: Expose an unsafe query and then enforce parameterized SQL
import socket, threading, time, secrets
import sqlite3
from flask import Flask, request, jsonify, session

_sock = socket.socket()
_sock.bind(("127.0.0.1", 0))
SQL_PORT = _sock.getsockname()[1]
_sock.close()

sql_app = Flask(__name__)
sql_app.secret_key = secrets.token_hex(32)

sql_app.config.update(
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE="Strict",
    SESSION_COOKIE_SECURE=False,
)

def db_conn():
    return sqlite3.connect(DB_PATH)

@sql_app.post("/login")
def sql_login():
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")
    user = users.get(username)
    if not user or not check_password_hash(user["pwd_hash"], password):
        return jsonify({"ok": False}), 401
    session.clear()
    session["user"] = username
    session["sid"] = secrets.token_urlsafe(24)
    return jsonify({"ok": True}), 200

@sql_app.get("/orders_search_insecure")
def orders_search_insecure():
    user = session.get("user")
    if not user:
        return jsonify({"ok": False}), 401

    q = request.args.get("q", "")
    sql = f"SELECT id, owner, item, amount_usd FROM orders WHERE owner = '{user}' AND item LIKE '%{q}%'"

    con = db_conn()
    try:
        rows = con.execute(sql).fetchall()
    finally:
        con.close()

    data = [{"id": r[0], "owner": r[1], "item": r[2], "amount_usd": r[3]} for r in rows]
    return jsonify({"ok": True, "count": len(data), "orders": data}), 200

@sql_app.get("/orders_search_secure")
def orders_search_secure():
    user = session.get("user")
    if not user:
        return jsonify({"ok": False}), 401

    q = request.args.get("q", "")
    like = f"%{q}%"

    con = db_conn()
    try:
        rows = con.execute(
            "SELECT id, owner, item, amount_usd FROM orders WHERE owner = ? AND item LIKE ?",
            (user, like),
        ).fetchall()
    finally:
        con.close()

    data = [{"id": r[0], "owner": r[1], "item": r[2], "amount_usd": r[3]} for r in rows]
    return jsonify({"ok": True, "count": len(data), "orders": data}), 200

def run_sql_service():
    sql_app.run(host="127.0.0.1", port=SQL_PORT, debug=False, use_reloader=False, threaded=True)

t4 = threading.Thread(target=run_sql_service, daemon=True)
t4.start()

time.sleep(1)
print(f"Service ready on http://127.0.0.1:{SQL_PORT}")


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:51325
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:51325


In [None]:
# Technique 4: Call both endpoints with normal input and with a crafted input string
import requests

base = f"http://127.0.0.1:{SQL_PORT}"
s = requests.Session()

print("login:", s.post(f"{base}/login", json={"username": "alice", "password": "Alice#Pass123"}, timeout=5).status_code)

q_normal = "Router"
r1 = s.get(f"{base}/orders_search_insecure", params={"q": q_normal}, timeout=5)
r2 = s.get(f"{base}/orders_search_secure", params={"q": q_normal}, timeout=5)
print("normal insecure:", r1.status_code, r1.json()["count"])
print("normal secure:  ", r2.status_code, r2.json()["count"])

q_crafted = "%' OR 1=1 --"
r3 = s.get(f"{base}/orders_search_insecure", params={"q": q_crafted}, timeout=5)
r4 = s.get(f"{base}/orders_search_secure", params={"q": q_crafted}, timeout=5)
print("crafted insecure:", r3.status_code, r3.json()["count"])
print("crafted secure:  ", r4.status_code, r4.json()["count"])


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:28] "POST /login HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:28] "GET /orders_search_insecure?q=Router HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:28] "GET /orders_search_secure?q=Router HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:28] "GET /orders_search_insecure?q=%25'+OR+1%3D1+-- HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:22:28] "GET /orders_search_secure?q=%25'+OR+1%3D1+-- HTTP/1.1" 200 -


login: 200
normal insecure: 200 1
normal secure:   200 1
crafted insecure: 200 3
crafted secure:   200 0


In [None]:
# Technique 5: Render user input safely to prevent XSS
import socket, threading, time, secrets
from flask import Flask, request, jsonify, session, Response
from markupsafe import escape

if "users" not in globals() or "check_password_hash" not in globals():
    raise RuntimeError("Define users and password hashing first.")

_sock = socket.socket()
_sock.bind(("127.0.0.1", 0))
XSS_PORT = _sock.getsockname()[1]
_sock.close()

xss_app = Flask(__name__)
xss_app.secret_key = secrets.token_hex(32)

xss_app.config.update(
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE="Strict",
    SESSION_COOKIE_SECURE=False,
)

@xss_app.post("/login")
def xss_login():
    # Validate credentials
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")
    user = users.get(username)
    if not user or not check_password_hash(user["pwd_hash"], password):
        return jsonify({"ok": False}), 401

    # Rotate session
    session.clear()
    session["user"] = username
    session["sid"] = secrets.token_urlsafe(24)
    return jsonify({"ok": True}), 200

@xss_app.get("/feedback_insecure")
def feedback_insecure():
    # Require login
    if not session.get("user"):
        return jsonify({"ok": False}), 401

    # Render user input directly
    msg = request.args.get("msg", "")
    html = f"<html><body><h3>Feedback</h3><div>{msg}</div></body></html>"
    return Response(html, mimetype="text/html")

@xss_app.get("/feedback_secure")
def feedback_secure():
    # Require login
    if not session.get("user"):
        return jsonify({"ok": False}), 401

    # Escape user input
    msg = request.args.get("msg", "")
    safe = escape(msg)
    html = f"<html><body><h3>Feedback</h3><div>{safe}</div></body></html>"
    return Response(html, mimetype="text/html")

def run_xss_service():
    # Run service
    xss_app.run(host="127.0.0.1", port=XSS_PORT, debug=False, use_reloader=False, threaded=True)

t5 = threading.Thread(target=run_xss_service, daemon=True)
t5.start()

time.sleep(1)
print(f"Service ready on http://127.0.0.1:{XSS_PORT}")


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:43059
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:43059


In [None]:
# Technique 5: Validate unsafe rendering vs escaped output
import requests

base = f"http://127.0.0.1:{XSS_PORT}"
s = requests.Session()

print("login:", s.post(f"{base}/login", json={"username": "alice", "password": "Alice#Pass123"}, timeout=5).status_code)

payload = "<script>alert(1)</script>"

r1 = s.get(f"{base}/feedback_insecure", params={"msg": payload}, timeout=5)
r2 = s.get(f"{base}/feedback_secure", params={"msg": payload}, timeout=5)

print("insecure status:", r1.status_code, "contains <script>:", "<script>" in r1.text)
print("secure status:  ", r2.status_code, "contains <script>:", "<script>" in r2.text)

print("\nINSECURE HTML (first 180 chars):")
print(r1.text[:180])

print("\nSECURE HTML (first 180 chars):")
print(r2.text[:180])


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:26:57] "POST /login HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:26:57] "GET /feedback_insecure?msg=<script>alert(1)</script> HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:26:57] "GET /feedback_secure?msg=<script>alert(1)</script> HTTP/1.1" 200 -


login: 200
insecure status: 200 contains <script>: True
secure status:   200 contains <script>: False

INSECURE HTML (first 180 chars):
<html><body><h3>Feedback</h3><div><script>alert(1)</script></div></body></html>

SECURE HTML (first 180 chars):
<html><body><h3>Feedback</h3><div>&lt;script&gt;alert(1)&lt;/script&gt;</div></body></html>


In [None]:
# Technique 6: Block cross-site state changes using CSRF tokens
import socket, threading, time, secrets
from flask import Flask, request, jsonify, session

_sock = socket.socket()
_sock.bind(("127.0.0.1", 0))
CSRF_PORT = _sock.getsockname()[1]
_sock.close()

csrf_app = Flask(__name__)
csrf_app.secret_key = secrets.token_hex(32)

# Set cookie policy
csrf_app.config.update(
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE="Lax",
    SESSION_COOKIE_SECURE=False,
)

# Define balances
balances = {"alice": 1000, "bob": 1000}

@csrf_app.post("/login")
def csrf_login():
    # Validate credentials
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")
    user = users.get(username)
    if not user or not check_password_hash(user["pwd_hash"], password):
        return jsonify({"ok": False}), 401

    # Create session
    session.clear()
    session["user"] = username
    session["csrf"] = secrets.token_urlsafe(24)
    return jsonify({"ok": True}), 200

@csrf_app.get("/csrf")
def get_csrf():
    # Return token
    if not session.get("user"):
        return jsonify({"ok": False}), 401
    return jsonify({"ok": True, "csrf": session["csrf"]}), 200

@csrf_app.get("/balance")
def get_balance():
    # Return balance
    u = session.get("user")
    if not u:
        return jsonify({"ok": False}), 401
    return jsonify({"ok": True, "user": u, "balance": balances.get(u, 0)}), 200

@csrf_app.post("/transfer_insecure")
def transfer_insecure():
    # Apply transfer without CSRF check
    u = session.get("user")
    if not u:
        return jsonify({"ok": False}), 401

    to_user = (request.json or {}).get("to_user", "")
    amount = int((request.json or {}).get("amount", 0))

    if to_user not in balances or amount <= 0 or balances[u] < amount:
        return jsonify({"ok": False}), 400

    balances[u] -= amount
    balances[to_user] += amount
    return jsonify({"ok": True, "from": u, "to": to_user, "amount": amount}), 200

@csrf_app.post("/transfer_secure")
def transfer_secure():
    # Validate CSRF token
    u = session.get("user")
    if not u:
        return jsonify({"ok": False}), 401

    token = request.headers.get("X-CSRF-Token", "")
    if not token or token != session.get("csrf"):
        return jsonify({"ok": False}), 403

    to_user = (request.json or {}).get("to_user", "")
    amount = int((request.json or {}).get("amount", 0))

    if to_user not in balances or amount <= 0 or balances[u] < amount:
        return jsonify({"ok": False}), 400

    balances[u] -= amount
    balances[to_user] += amount
    return jsonify({"ok": True, "from": u, "to": to_user, "amount": amount}), 200

def run_csrf_service():
    # Run service
    csrf_app.run(host="127.0.0.1", port=CSRF_PORT, debug=False, use_reloader=False, threaded=True)

t6 = threading.Thread(target=run_csrf_service, daemon=True)
t6.start()

time.sleep(1)
print(f"Service ready on http://127.0.0.1:{CSRF_PORT}")


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:45231
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:45231


In [None]:
# Technique 6: Validate transfer without token vs blocked without token
import requests

base = f"http://127.0.0.1:{CSRF_PORT}"
s = requests.Session()

print("login:", s.post(f"{base}/login", json={"username": "alice", "password": "Alice#Pass123"}, timeout=5).status_code)
print("balance:", s.get(f"{base}/balance", timeout=5).json())

r1 = s.post(f"{base}/transfer_insecure", json={"to_user": "bob", "amount": 50}, timeout=5)
print("insecure transfer:", r1.status_code, r1.json())
print("balance:", s.get(f"{base}/balance", timeout=5).json())

r2 = s.post(f"{base}/transfer_secure", json={"to_user": "bob", "amount": 50}, timeout=5)
print("secure transfer no token:", r2.status_code, r2.json())
print("balance:", s.get(f"{base}/balance", timeout=5).json())

csrf_token = s.get(f"{base}/csrf", timeout=5).json()["csrf"]
r3 = s.post(f"{base}/transfer_secure", json={"to_user": "bob", "amount": 50}, headers={"X-CSRF-Token": csrf_token}, timeout=5)
print("secure transfer with token:", r3.status_code, r3.json())
print("balance:", s.get(f"{base}/balance", timeout=5).json())


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "POST /login HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "GET /balance HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "POST /transfer_insecure HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "GET /balance HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "[31m[1mPOST /transfer_secure HTTP/1.1[0m" 403 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "GET /balance HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "GET /csrf HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "POST /transfer_secure HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:45:01] "GET /balance HTTP/1.1" 200 -


login: 200
balance: {'balance': 1000, 'ok': True, 'user': 'alice'}
insecure transfer: 200 {'amount': 50, 'from': 'alice', 'ok': True, 'to': 'bob'}
balance: {'balance': 950, 'ok': True, 'user': 'alice'}
secure transfer no token: 403 {'ok': False}
balance: {'balance': 950, 'ok': True, 'user': 'alice'}
secure transfer with token: 200 {'amount': 50, 'from': 'alice', 'ok': True, 'to': 'bob'}
balance: {'balance': 900, 'ok': True, 'user': 'alice'}


In [None]:
# Technique 7: Prevent path traversal by validating and constraining file paths
import os, socket, threading, time, secrets
from pathlib import Path
from flask import Flask, request, jsonify, session, Response

if "users" not in globals() or "check_password_hash" not in globals():
    raise RuntimeError("Run the auth setup cells first.")

PUBLIC_DIR = "/content/offsec_workspace/data/public"
SECRETS_DIR = "/content/offsec_workspace/data/secrets"
os.makedirs(PUBLIC_DIR, exist_ok=True)
os.makedirs(SECRETS_DIR, exist_ok=True)

with open(os.path.join(PUBLIC_DIR, "readme.txt"), "w") as f:
    f.write("Public file: readme.txt\n")
with open(os.path.join(SECRETS_DIR, "secret.txt"), "w") as f:
    f.write("Sensitive file: secret.txt\n")

_sock = socket.socket()
_sock.bind(("127.0.0.1", 0))
TRAV_PORT = _sock.getsockname()[1]
_sock.close()

trav_app = Flask(__name__)
trav_app.secret_key = secrets.token_hex(32)

trav_app.config.update(
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE="Strict",
    SESSION_COOKIE_SECURE=False,
)

@trav_app.post("/login")
def trav_login():
    # Validate credentials
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")
    user = users.get(username)
    if not user or not check_password_hash(user["pwd_hash"], password):
        return jsonify({"ok": False}), 401
    session.clear()
    session["user"] = username
    return jsonify({"ok": True}), 200

@trav_app.get("/download_insecure")
def download_insecure():
    # Read file using user-controlled path
    if not session.get("user"):
        return jsonify({"ok": False}), 401
    name = request.args.get("name", "")
    p = os.path.join(PUBLIC_DIR, name)
    try:
        data = Path(p).read_text()
        return Response(data, mimetype="text/plain")
    except Exception:
        return jsonify({"ok": False}), 404

@trav_app.get("/download_secure")
def download_secure():
    # Constrain path to public directory
    if not session.get("user"):
        return jsonify({"ok": False}), 401
    name = request.args.get("name", "")

    base = Path(PUBLIC_DIR).resolve()
    target = (base / name).resolve()

    if base not in target.parents and target != base:
        return jsonify({"ok": False}), 403
    if not target.is_file():
        return jsonify({"ok": False}), 404

    data = target.read_text()
    return Response(data, mimetype="text/plain")

def run_trav_service():
    # Run service
    trav_app.run(host="127.0.0.1", port=TRAV_PORT, debug=False, use_reloader=False, threaded=True)

t7 = threading.Thread(target=run_trav_service, daemon=True)
t7.start()

time.sleep(1)
print(f"Service ready on http://127.0.0.1:{TRAV_PORT}")


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:41653
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:41653


In [None]:
# Technique 7: Validate insecure read vs constrained read
import requests

base = f"http://127.0.0.1:{TRAV_PORT}"
s = requests.Session()

print("login:", s.post(f"{base}/login", json={"username": "alice", "password": "Alice#Pass123"}, timeout=5).status_code)

r1 = s.get(f"{base}/download_insecure", params={"name": "readme.txt"}, timeout=5)
print("insecure public:", r1.status_code, r1.text.strip())

r2 = s.get(f"{base}/download_insecure", params={"name": "../secrets/secret.txt"}, timeout=5)
print("insecure traversal:", r2.status_code, r2.text.strip())

r3 = s.get(f"{base}/download_secure", params={"name": "../secrets/secret.txt"}, timeout=5)
print("secure traversal:", r3.status_code, r3.json())

r4 = s.get(f"{base}/download_secure", params={"name": "readme.txt"}, timeout=5)
print("secure public:", r4.status_code, r4.text.strip())


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:54:47] "POST /login HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:54:47] "GET /download_insecure?name=readme.txt HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:54:47] "GET /download_insecure?name=../secrets/secret.txt HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:54:47] "[31m[1mGET /download_secure?name=../secrets/secret.txt HTTP/1.1[0m" 403 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 11:54:47] "GET /download_secure?name=readme.txt HTTP/1.1" 200 -


login: 200
insecure public: 200 Public file: readme.txt
insecure traversal: 200 Sensitive file: secret.txt
secure traversal: 403 {'ok': False}
secure public: 200 Public file: readme.txt


In [None]:
# Technique 8: Restrict file uploads and serve them safely
import os, socket, threading, time, secrets, uuid
from pathlib import Path
from flask import Flask, request, jsonify, session, send_file
from werkzeug.utils import secure_filename

if "users" not in globals() or "check_password_hash" not in globals():
    raise RuntimeError("Define users and password hashing first.")

UPLOAD_ROOT = "/content/offsec_workspace/uploads"
INSECURE_DIR = os.path.join(UPLOAD_ROOT, "insecure")
SECURE_DIR = os.path.join(UPLOAD_ROOT, "secure")
os.makedirs(INSECURE_DIR, exist_ok=True)
os.makedirs(SECURE_DIR, exist_ok=True)

_sock = socket.socket()
_sock.bind(("127.0.0.1", 0))
UPLOAD_PORT = _sock.getsockname()[1]
_sock.close()

upload_app = Flask(__name__)
upload_app.secret_key = secrets.token_hex(32)

# Enforce request size limit
upload_app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024

# Set cookie policy
upload_app.config.update(
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE="Strict",
    SESSION_COOKIE_SECURE=False,
)

ALLOWED_EXT = {"txt", "pdf", "png", "jpg", "jpeg"}

def _ext(name):
    return (Path(name).suffix.lower().lstrip(".")) if name else ""

@upload_app.post("/login")
def upload_login():
    # Validate credentials
    username = (request.json or {}).get("username", "")
    password = (request.json or {}).get("password", "")
    user = users.get(username)
    if not user or not check_password_hash(user["pwd_hash"], password):
        return jsonify({"ok": False}), 401
    session.clear()
    session["user"] = username
    return jsonify({"ok": True}), 200

@upload_app.post("/upload_insecure")
def upload_insecure():
    # Save upload using client filename
    if not session.get("user"):
        return jsonify({"ok": False}), 401

    f = request.files.get("file")
    if not f or not f.filename:
        return jsonify({"ok": False}), 400

    dst = os.path.join(INSECURE_DIR, f.filename)
    os.makedirs(os.path.dirname(dst), exist_ok=True)
    f.save(dst)

    return jsonify({"ok": True, "saved_as": f.filename}), 200

@upload_app.get("/files_insecure/<path:name>")
def files_insecure(name):
    # Serve file directly
    if not session.get("user"):
        return jsonify({"ok": False}), 401

    p = Path(INSECURE_DIR) / name
    if not p.is_file():
        return jsonify({"ok": False}), 404
    return send_file(str(p), mimetype="application/octet-stream")

@upload_app.post("/upload_secure")
def upload_secure():
    # Validate and store upload safely
    if not session.get("user"):
        return jsonify({"ok": False}), 401

    f = request.files.get("file")
    if not f or not f.filename:
        return jsonify({"ok": False}), 400

    safe_name = secure_filename(f.filename)
    e = _ext(safe_name)
    if e not in ALLOWED_EXT:
        return jsonify({"ok": False}), 400

    stored_name = f"{uuid.uuid4().hex}_{safe_name}"
    dst = Path(SECURE_DIR) / stored_name
    f.save(str(dst))

    return jsonify({"ok": True, "stored_name": stored_name}), 200

@upload_app.get("/files_secure/<path:name>")
def files_secure(name):
    # Serve file as download
    if not session.get("user"):
        return jsonify({"ok": False}), 401

    safe = secure_filename(name)
    if safe != name:
        return jsonify({"ok": False}), 403

    p = Path(SECURE_DIR) / safe
    if not p.is_file():
        return jsonify({"ok": False}), 404

    return send_file(str(p), as_attachment=True, download_name=safe)

def run_upload_service():
    # Run service
    upload_app.run(host="127.0.0.1", port=UPLOAD_PORT, debug=False, use_reloader=False, threaded=True)

t8 = threading.Thread(target=run_upload_service, daemon=True)
t8.start()

time.sleep(1)
print(f"Service ready on http://127.0.0.1:{UPLOAD_PORT}")


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:52985
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Service ready on http://127.0.0.1:52985


In [None]:
# Technique 8: Validate unrestricted upload vs restricted upload
import requests

base = f"http://127.0.0.1:{UPLOAD_PORT}"
s = requests.Session()

print("login:", s.post(f"{base}/login", json={"username": "alice", "password": "Alice#Pass123"}, timeout=5).status_code)

html_payload = b"<h1>Report</h1><script>console.log('x')</script>"
r1 = s.post(
    f"{base}/upload_insecure",
    files={"file": ("report.html", html_payload, "text/html")},
    timeout=10,
)
print("upload insecure:", r1.status_code, r1.json())

r2 = s.get(f"{base}/files_insecure/report.html", timeout=5)
print("fetch insecure:", r2.status_code, "content_type=", r2.headers.get("Content-Type"))
print("body starts:", r2.text[:80])

r3 = s.post(
    f"{base}/upload_secure",
    files={"file": ("report.html", html_payload, "text/html")},
    timeout=10,
)
print("upload secure blocked:", r3.status_code, r3.json())

txt_payload = b"notes: ok\n"
r4 = s.post(
    f"{base}/upload_secure",
    files={"file": ("notes.txt", txt_payload, "text/plain")},
    timeout=10,
)
print("upload secure allowed:", r4.status_code, r4.json())

stored = r4.json().get("stored_name", "")
r5 = s.get(f"{base}/files_secure/{stored}", timeout=5)
print("fetch secure:", r5.status_code, "disposition=", r5.headers.get("Content-Disposition"))
print("body starts:", r5.text[:40])


INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 12:14:11] "POST /login HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 12:14:11] "POST /upload_insecure HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 12:14:11] "GET /files_insecure/report.html HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 12:14:11] "[31m[1mPOST /upload_secure HTTP/1.1[0m" 400 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 12:14:11] "POST /upload_secure HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [15/Dec/2025 12:14:11] "GET /files_secure/7d72d7d8bb784d199e33d225f582f481_notes.txt HTTP/1.1" 200 -


login: 200
upload insecure: 200 {'ok': True, 'saved_as': 'report.html'}
fetch insecure: 200 content_type= application/octet-stream
body starts: <h1>Report</h1><script>console.log('x')</script>
upload secure blocked: 400 {'ok': False}
upload secure allowed: 200 {'ok': True, 'stored_name': '7d72d7d8bb784d199e33d225f582f481_notes.txt'}
fetch secure: 200 disposition= attachment; filename=7d72d7d8bb784d199e33d225f582f481_notes.txt
body starts: notes: ok

