In [2]:
!pip install flask_sqlalchemy

Collecting flask_sqlalchemy
  Downloading flask_sqlalchemy-3.1.1-py3-none-any.whl.metadata (3.4 kB)
Downloading flask_sqlalchemy-3.1.1-py3-none-any.whl (25 kB)
Installing collected packages: flask_sqlalchemy
Successfully installed flask_sqlalchemy-3.1.1


In [4]:
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
import threading

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password = db.Column(db.String(120), nullable=False)

with app.app_context():
  db.create_all()

@app.route('/')
def home():
    return "Welcome to the Security Testing Demo!"

@app.route('/users', methods=['GET'])
def get_users():
    users = User.query.all()
    return jsonify([{"id": user.id, "username": user.username, "password": user.password} for user in users])

@app.route('/user/<int:id>', methods=['GET'])
def get_user(id):
    user = User.query.get(id)
    if user:
        return jsonify({"id": user.id, "username": user.username, "password": user.password})
    return jsonify({"message": "User not found"}), 404

@app.route('/user', methods=['POST'])
def add_user():
    data = request.get_json()
    new_user = User(username=data['username'], password=data['password'])
    db.session.add(new_user)
    db.session.commit()
    return jsonify({"message": "User added successfully"}), 201

@app.route('/user/<int:id>', methods=['PUT'])
def update_user(id):
    data = request.get_json()
    user = User.query.get(id)
    if user:
        user.username = data['username']
        user.password = data['password']
        db.session.commit()
        return jsonify({"message": "User updated successfully"})
    return jsonify({"message": "User not found"}), 404

@app.route('/user/<int:id>', methods=['DELETE'])
def delete_user(id):
    user = User.query.get(id)
    if user:
        db.session.delete(user)
        db.session.commit()
        return jsonify({"message": "User deleted successfully"})
    return jsonify({"message": "User not found"}), 404


threading.Thread(target=app.run, kwargs={'host':'0.0.0.0','port':5000}).start()


 * Serving Flask app '__main__'
 * Debug mode: off


Address already in use
Port 5000 is in use by another program. Either identify and stop that program, or start the server with a different port.
On macOS, try disabling the 'AirPlay Receiver' service from System Preferences -> General -> AirDrop & Handoff.


In [5]:
!curl -X GET http://172.28.0.12:5000/users


curl: (28) Failed to connect to 172.28.0.12 port 5000 after 75006 ms: Couldn't connect to server


In [12]:
!curl -X GET http://localhost:5000/user/1

  user = User.query.get(id)
INFO:werkzeug:127.0.0.1 - - [20/Jul/2024 05:10:35] "[33mGET /user/1 HTTP/1.1[0m" 404 -


{"message":"User not found"}


In [5]:
!curl -X POST http://localhost:5000/user -H "Content-Type: application/json" -d '{"username":"testuser","password":"securepassword"}'


INFO:werkzeug:127.0.0.1 - - [20/Jul/2024 05:09:49] "[35m[1mPOST /user HTTP/1.1[0m" 201 -


{"message":"User added successfully"}


In [8]:
!curl -X PUT http://localhost:5000/user/1 -H "Content-Type: application/json" -d '{"username":"testuser","password":"newsecurepassword"}'

  user = User.query.get(id)
INFO:werkzeug:127.0.0.1 - - [20/Jul/2024 05:10:13] "PUT /user/1 HTTP/1.1" 200 -


{"message":"User updated successfully"}


In [10]:
!curl -X DELETE http://localhost:5000/user/1

  user = User.query.get(id)
INFO:werkzeug:127.0.0.1 - - [20/Jul/2024 05:10:27] "DELETE /user/1 HTTP/1.1" 200 -


{"message":"User deleted successfully"}


In [None]:
import unittest
import requests
import json

BASE_URL = "http://172.28.0.12:5000"

class FlaskAppTestCase(unittest.TestCase):

    def test_sql_injection(self):
        payload = {"username": "testuser'; DROP TABLE users; --", "password": "testpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        self.assertNotEqual(response.status_code, 201, "SQL Injection vulnerability detected")

    def test_xss(self):
        payload = {"username": "<script>alert('XSS');</script>", "password": "testpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        self.assertNotIn("<script>", response.text, "XSS vulnerability detected")

    def test_insecure_password_storage(self):
        payload = {"username": "testuser", "password": "testpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        self.assertEqual(response.status_code, 201)
        user_id = response.json()["id"]
        response = requests.get(f"{BASE_URL}/user/{user_id}")
        self.assertNotIn("testpass", response.json()["password"], "Insecure password storage detected")

    def test_authentication(self):
        response = requests.get(f"{BASE_URL}/users")
        self.assertEqual(response.status_code, 401, "No authentication detected")

    def test_authorization(self):
        payload = {"username": "admin", "password": "adminpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        user_id = response.json()["id"]
        payload = {"username": "normaluser", "password": "userpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        response = requests.put(f"{BASE_URL}/user/{user_id}", json={"username": "hacked", "password": "hackedpass"})
        self.assertEqual(response.status_code, 403, "No authorization detected")

    def test_insecure_direct_object_references(self):
        payload = {"username": "testuser1", "password": "testpass"}
        response = requests.post(f"{BASE_URL}/user", json=payload)
        user_id = response.json()["id"]
        response = requests.get(f"{BASE_URL}/user/{user_id}")
        self.assertNotEqual(response.status_code, 200, "Insecure direct object reference detected")

    def test_data_exposure(self):
        response = requests.get(f"{BASE_URL}/users")
        self.assertNotIn("password", response.json()[0], "Sensitive data exposure detected")

if __name__ == '__main__':
    unittest.main()

E
ERROR: /root/ (unittest.loader._FailedTest)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute '/root/'

----------------------------------------------------------------------
Ran 1 test in 0.003s

FAILED (errors=1)


SystemExit: True

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
a

NameError: name 'a' is not defined