In [None]:
!pip install flask_sqlalchemy

In [None]:
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
import threading

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password = db.Column(db.String(120), nullable=False)

with app.app_context():
  db.create_all()

@app.route('/')
def home():
    return "Welcome to the Security Testing Demo!"

@app.route('/users', methods=['GET'])
def get_users():
    users = User.query.all()
    return jsonify([{"id": user.id, "username": user.username, "password": user.password} for user in users])

@app.route('/user/<int:id>', methods=['GET'])
def get_user(id):
    user = User.query.get(id)
    if user:
        return jsonify({"id": user.id, "username": user.username, "password": user.password})
    return jsonify({"message": "User not found"}), 404

@app.route('/user', methods=['POST'])
def add_user():
    data = request.get_json()
    new_user = User(username=data['username'], password=data['password'])
    db.session.add(new_user)
    db.session.commit()
    return jsonify({"message": "User added successfully"}), 201

@app.route('/user/<int:id>', methods=['PUT'])
def update_user(id):
    data = request.get_json()
    user = User.query.get(id)
    if user:
        user.username = data['username']
        user.password = data['password']
        db.session.commit()
        return jsonify({"message": "User updated successfully"})
    return jsonify({"message": "User not found"}), 404

@app.route('/user/<int:id>', methods=['DELETE'])
def delete_user(id):
    user = User.query.get(id)
    if user:
        db.session.delete(user)
        db.session.commit()
        return jsonify({"message": "User deleted successfully"})
    return jsonify({"message": "User not found"}), 404


threading.Thread(target=app.run, kwargs={'host':'0.0.0.0','port':8080}).start()


In [None]:
# !curl -X GET http://172.28.0.12:5000/users
!curl -X GET http://127.0.0.1:8080

In [None]:
!curl -X GET http://localhost:8080/user/1

In [None]:
!curl -X POST http://localhost:8080/user -H "Content-Type: application/json" -d '{"username":"testuser","password":"securepassword"}'


In [None]:
!curl -X PUT http://localhost:8080/user/1 -H "Content-Type: application/json" -d '{"username":"testuser","password":"newsecurepassword"}'

In [None]:
!curl -X DELETE http://localhost:8080/user/1

In [None]:
import unittest
import requests
import json

# BASE_URL = "http://172.28.0.12:5000"
BASE_URL = "http://localhost:8080"

class FlaskAppTestCase(unittest.TestCase):

    def test_sql_injection(self):
        payload = {"username": "testuser'; DROP TABLE users; --", "password": "testpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        self.assertNotEqual(response.status_code, 201, "SQL Injection vulnerability detected")

    def test_xss(self):
        payload = {"username": "<script>alert('XSS');</script>", "password": "testpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        self.assertNotIn("<script>", response.text, "XSS vulnerability detected")

    def test_insecure_password_storage(self):
        payload = {"username": "testuser", "password": "testpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        self.assertEqual(response.status_code, 201)
        user_id = response.json()["id"]
        response = requests.get(f"{BASE_URL}/user/{user_id}")
        self.assertNotIn("testpass", response.json()["password"], "Insecure password storage detected")

    def test_authentication(self):
        response = requests.get(f"{BASE_URL}/users")
        self.assertEqual(response.status_code, 401, "No authentication detected")

    def test_authorization(self):
        payload = {"username": "admin", "password": "adminpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        user_id = response.json()["id"]
        payload = {"username": "normaluser", "password": "userpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        response = requests.put(f"{BASE_URL}/user/{user_id}", json={"username": "hacked", "password": "hackedpass"})
        self.assertEqual(response.status_code, 403, "No authorization detected")

    def test_insecure_direct_object_references(self):
        payload = {"username": "testuser1", "password": "testpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        user_id = response.json()["id"]
        response = requests.get(f"{BASE_URL}/user/{user_id}")
        self.assertNotEqual(response.status_code, 200, "Insecure direct object reference detected")

    def test_data_exposure(self):
        response = requests.get(f"{BASE_URL}/users")
        self.assertNotIn("password", response.json()[0], "Sensitive data exposure detected")

if __name__ == '__main__':
    unittest.main()