# Lab 10: Secure Authentication API (Flask Version)
**Course:** COME6304 - Special Topics in Software Engineering  
**Student:** Nchinde Tandjong Josue (UBa25EP063)  

## Objective
To build a **Real Flask API** that implements secure authentication standards (Bcrypt Hashing, JWT Tokens, Rate Limiting) and test it within this notebook.

In [1]:
# CELL 1: Install & Import Flask + Crypto
import sqlite3
import bcrypt
import jwt
import datetime
from flask import Flask, request, jsonify

# Configuration
app = Flask(__name__)
app.config['SECRET_KEY'] = "super_secret_key_2025"
DB_NAME = "flask_users.db"

print("‚úÖ Flask & Cryptography Loaded.")

‚úÖ Flask & Cryptography Loaded.


In [2]:
# CELL 2: Database Setup (The Persistence Layer)
def init_db():
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()
    # Security Columns: password_hash (No plaintext), failed_attempts, lockout_until
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS users (
            username TEXT PRIMARY KEY,
            password_hash BLOB,
            failed_attempts INTEGER DEFAULT 0,
            lockout_until TIMESTAMP
        )
    """)
    conn.commit()
    conn.close()

init_db()
print("‚úÖ Database Initialized.")

‚úÖ Database Initialized.


In [3]:
# CELL 3: Define The Flask Routes (The API)

# --- Helper: DB Connection ---
def get_db():
    return sqlite3.connect(DB_NAME)

# --- Route 1: Register ---
@app.route('/register', methods=['POST'])
def register():
    data = request.get_json()
    username = data['username']
    password = data['password']
    
    # 1. Hash the Password (Bcrypt automatically handles Salting)
    salt = bcrypt.gensalt()
    hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
    
    try:
        conn = get_db()
        conn.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed))
        conn.commit()
        return jsonify({"message": "User registered successfully"}), 201
    except sqlite3.IntegrityError:
        return jsonify({"error": "User already exists"}), 400
    finally:
        conn.close()

# --- Route 2: Login (With Defense) ---
@app.route('/login', methods=['POST'])
def login():
    data = request.get_json()
    username = data['username']
    password = data['password']
    
    conn = get_db()
    cursor = conn.cursor()
    user = cursor.execute("SELECT * FROM users WHERE username=?", (username,)).fetchone()
    
    if not user:
        return jsonify({"error": "Invalid credentials"}), 401
    
    # Unpack user data
    stored_hash = user[1]
    attempts = user[2]
    lockout_str = user[3]
    
    # 1. CHECK LOCKOUT
    if lockout_str:
        lockout_time = datetime.datetime.fromisoformat(lockout_str)
        if datetime.datetime.now() < lockout_time:
            return jsonify({"error": f"Account locked until {lockout_time}"}), 403

    # 2. VERIFY PASSWORD
    if bcrypt.checkpw(password.encode('utf-8'), stored_hash):
        # Reset failures on success
        conn.execute("UPDATE users SET failed_attempts=0, lockout_until=NULL WHERE username=?", (username,))
        conn.commit()
        
        # Generate Token
        token = jwt.encode({
            'user': username,
            'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
        }, app.config['SECRET_KEY'])
        
        return jsonify({"token": token})
    else:
        # Increase failures on error
        new_attempts = attempts + 1
        new_lockout = None
        
        if new_attempts >= 5:
            new_lockout = datetime.datetime.now() + datetime.timedelta(minutes=15)
        
        conn.execute("UPDATE users SET failed_attempts=?, lockout_until=? WHERE username=?", 
                     (new_attempts, new_lockout, username))
        conn.commit()
        return jsonify({"error": "Invalid credentials"}), 401

print("‚úÖ Flask App Defined (Ready for Testing).")

‚úÖ Flask App Defined (Ready for Testing).


## Phase 1: Test Registration
We use the **Flask Test Client** to send requests without running a server.

In [4]:
# CELL 4: Register 'admin'
client = app.test_client()

response = client.post('/register', json={
    "username": "admin",
    "password": "securePass123"
})

print("Status Code:", response.status_code)
print("Response:", response.get_json())

Status Code: 201
Response: {'message': 'User registered successfully'}


## Phase 2: Test Valid Login
Should return a JWT Token.

In [5]:
# CELL 5: Login
response = client.post('/login', json={
    "username": "admin",
    "password": "securePass123"
})

print("Status Code:", response.status_code)
data = response.get_json()
print("Full Response:", data)

if 'token' in data:
    # Verify token works
    decoded = jwt.decode(data['token'], app.config['SECRET_KEY'], algorithms=["HS256"])
    print("\nüîì Decoded Token Payload:", decoded)

Status Code: 200
Full Response: {'token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyIjoiYWRtaW4iLCJleHAiOjE3NjY0MzcwMzd9.OmsAbUpRP9m4eND7MzIhPeTTrsj929TlqSgJxHClgG8'}

üîì Decoded Token Payload: {'user': 'admin', 'exp': 1766437037}


## Phase 3: Brute Force Attack Simulation
We will hammer the API with wrong passwords to trigger the 403 Lockout.

In [6]:
# CELL 6: Attack Script
passwords = ["wrong1", "wrong2", "wrong3", "wrong4", "wrong5", "wrong6"]

print("üëæ STARTING ATTACK...")

for i, pwd in enumerate(passwords):
    res = client.post('/login', json={"username": "admin", "password": pwd})
    
    print(f"Attempt {i+1}: {res.status_code} - {res.get_json()['error']}")
    
    if res.status_code == 403:
        print("\nüõ°Ô∏è SYSTEM DEFENSE ACTIVATED: ACCOUNT LOCKED.")
        break

üëæ STARTING ATTACK...
Attempt 1: 401 - Invalid credentials
Attempt 2: 401 - Invalid credentials
Attempt 3: 401 - Invalid credentials
Attempt 4: 401 - Invalid credentials
Attempt 5: 401 - Invalid credentials
Attempt 6: 403 - Account locked until 2025-12-22 12:12:58.031497

üõ°Ô∏è SYSTEM DEFENSE ACTIVATED: ACCOUNT LOCKED.
