diff --git a/app/__init__.py b/app/__init__.py index dea58e1..48affb8 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -6,7 +6,7 @@ from flask_pymongo import PyMongo from app.config import Config -from app.extensions import mongo +from app.extensions import bcrypt, mongo def create_app(test_config=None): @@ -20,6 +20,8 @@ def create_app(test_config=None): # Connect Pymongo to our specific app instance mongo.init_app(app) + # Connect Flask-BCrypt + bcrypt.init_app(app) # Import blueprints inside the factory from app.routes.auth_routes import \ diff --git a/app/extensions.py b/app/extensions.py index b404e4c..f2cbb44 100644 --- a/app/extensions.py +++ b/app/extensions.py @@ -1,7 +1,9 @@ """Module for Flask extensions.""" +from flask_bcrypt import Bcrypt from flask_pymongo import PyMongo # Createempty PyMongo extension object globally # This way, we can import it in other files and avoid a code smell: tighly-coupled, cyclic error mongo = PyMongo() +bcrypt = Bcrypt() diff --git a/app/routes/auth_routes.py b/app/routes/auth_routes.py index 9cd7c3c..77aa763 100644 --- a/app/routes/auth_routes.py +++ b/app/routes/auth_routes.py @@ -1,12 +1,14 @@ # pylint: disable=cyclic-import """Routes for authorization for the JWT upgrade""" -import bcrypt +import datetime + +import jwt from email_validator import EmailNotValidError, validate_email -from flask import Blueprint, jsonify, request +from flask import Blueprint, current_app, jsonify, request from werkzeug.exceptions import BadRequest -from app.extensions import mongo +from app.extensions import bcrypt, mongo auth_bp = Blueprint("auth_bp", __name__, url_prefix="/auth") @@ -45,24 +47,20 @@ def register_user(): return jsonify({"message": "Invalid JSON format"}), 400 # Check for Duplicate User - # Easy access with Flask_PyMongo's 'mongo' if mongo.db.users.find_one({"email": email}): return jsonify({"message": "Email is already registered"}), 409 # Password Hashing - # Generate a salt and hash the password - # result is a byte object representing the final hash - hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) + hashed_password = bcrypt.generate_password_hash(password).decode("utf-8") # Database Insertion user_id = mongo.db.users.insert_one( { "email": email, # The hash is stored as a string in the DB - "password_hash": hashed_password.decode("utf-8"), + "password": hashed_password, } ).inserted_id - print(user_id) # Prepare response return ( @@ -74,3 +72,45 @@ def register_user(): ), 201, ) + + +# ----- LOGIN ------- +@auth_bp.route("/login", methods=["POST"]) +def login_user(): + """Authenticates a user and returns a JWT""" + # 1. Get the user's credentials from the request body + data = request.get_json() + + if not data or not data.get("email") or not data.get("password"): + return jsonify({"error": "Email and password are required"}), 400 + + email = data.get("email") + password = data.get("password") + + # 2. Find the user in the DB + user = mongo.db.users.find_one({"email": email}) + + # 3. Verify the user and password + if not user or not bcrypt.check_password_hash(user["password"], password): + return jsonify({"error": "Invalid credentials"}), 401 + + # 4. Generate the JWT payload + payload = { + "sub": str(user["_id"]), # sub (subject)- standard claim for user ID + "iat": datetime.datetime.now( + datetime.UTC + ), # iat (issued at)- when token was created + "exp": datetime.datetime.now(datetime.UTC) + + datetime.timedelta(hours=24), # expiration + } + + # 5. Encode the token with our app's SECRET_KEY + try: + token = jwt.encode( + payload, + current_app.config["SECRET_KEY"], + algorithm="HS256", # the standard signing algorithm + ) + return jsonify({"token": token}), 200 + except jwt.PyJWTError: + return jsonify({"error": "Token generation failed"}), 500 diff --git a/app/utils/decorators.py b/app/utils/decorators.py new file mode 100644 index 0000000..2ae26e9 --- /dev/null +++ b/app/utils/decorators.py @@ -0,0 +1,68 @@ +# pylint: disable=too-many-return-statements +""" +This module provides decorators for Flask routes, including JWT authentication. +""" +import functools +import jwt +from flask import current_app, g, jsonify, request +from bson.objectid import ObjectId +from bson.errors import InvalidId +from app.extensions import mongo + + +def require_jwt(f): + """Protects routes by verifying JWT tokens in the + 'Authorization: Bearer ' header, decoding and validating the token, + and attaching the authenticated user to the request context. + """ + + @functools.wraps(f) + def decorated_function(*args, **kwargs): + # 1. Get Authorization header + auth_header = request.headers.get("Authorization", "") + if not auth_header: + return jsonify({"error": "Authorization header missing"}), 401 + + # 2. Expect exactly: "Bearer " (case-insensitive) + parts = auth_header.split() + if len(parts) != 2 or parts[0].lower() != "bearer": + return jsonify({"error": "Malformed Authorization header"}), 401 + + token = parts[1] + + # 3. Decode & verify JWT + try: + payload = jwt.decode( + token, + current_app.config["SECRET_KEY"], + algorithms=["HS256"], + # options={"require": ["exp", "sub"]} # optional: force required claims + ) + except jwt.ExpiredSignatureError: + return jsonify({"error": "Token has expired"}), 401 + except jwt.InvalidTokenError: + return jsonify({"error": "Invalid token. Please log in again."}), 401 + + # 4. Extract user id from payload + user_id = payload.get("sub") + if not user_id: + return jsonify({"error": "Token missing subject (sub) claim"}), 401 + + # 5. Convert to ObjectId and fetch user + try: + oid = ObjectId(user_id) + except (InvalidId, TypeError): + return jsonify({"error": "Invalid user id in token"}), 401 + + # Exclude sensitive fields such as password + user = mongo.db.users.find_one({"_id": oid}, {"password": 0}) + if not user: + return jsonify({"error": "User not found"}), 401 + + # 6. Attach safe user object to request context + g.current_user = user + + # 7. Call original route + return f(*args, **kwargs) + + return decorated_function diff --git a/requirements.txt b/requirements.txt index 09cdefe..741455f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ black isort flask_pymongo flask-bcrypt -email-validator \ No newline at end of file +email-validator +PyJWT \ No newline at end of file diff --git a/scripts/seed_users.py b/scripts/seed_users.py index ae6b3a1..c6866d8 100644 --- a/scripts/seed_users.py +++ b/scripts/seed_users.py @@ -1,7 +1,7 @@ """Seeding user_data script""" -import os import json +import os import bcrypt @@ -39,7 +39,7 @@ def seed_users(users_to_seed: list) -> str: # insert to new user mongo.db.users.insert_one( - {"email": email, "password_hash": hashed_password.decode("utf-8")} + {"email": email, "password": hashed_password.decode("utf-8")} ) count += 1 print(f"Created user: {email}") diff --git a/tests/conftest.py b/tests/conftest.py index e761a4c..609d6ab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,11 +6,13 @@ """ from unittest.mock import patch +import bcrypt import mongomock import pytest -from app import create_app, mongo +from app import create_app from app.datastore.mongo_db import get_book_collection +from app.extensions import bcrypt, mongo @pytest.fixture(name="_insert_book_to_db") @@ -73,6 +75,7 @@ def test_app(): "TESTING": True, "TRAP_HTTP_EXCEPTIONS": True, "API_KEY": "test-key-123", + "SECRET_KEY": "a-secure-key-for-testing-only", "MONGO_URI": "mongodb://localhost:27017/", "DB_NAME": "test_database", "COLLECTION_NAME": "test_books", @@ -134,3 +137,41 @@ def users_db_setup(test_app): # pylint: disable=redefined-outer-name with test_app.app_context(): users_collection = mongo.db.users users_collection.delete_many({}) + + +TEST_USER_ID = "6154b3a3e4a5b6c7d8e9f0a1" +PLAIN_PASSWORD = "a-secure-password" + + +@pytest.fixture(scope="session") # because this data never changes +def mock_user_data(): + """Provides a dictionary of a test user's data, with a hashed password.""" + # Use Flask-Bcrypt's function to CREATE the hash. + hashed_password = bcrypt.generate_password_hash(PLAIN_PASSWORD).decode("utf-8") + + return { + "_id": TEST_USER_ID, + "email": "testuser@example.com", + "password": hashed_password, + } + + +@pytest.fixture +def seeded_user_in_db( + test_app, mock_user_data, users_db_setup +): # pylint: disable=redefined-outer-name + """ + Ensures the test database is clean and contains exactly one predefined user. + Depends on: + - test_app: To get the application context and correct mongo.db object + - mock_user_data: To get the user data to insert. + - users_db_Setup: To ensure the users collection is empty before seeding. + """ + _ = users_db_setup + + with test_app.app_context(): + mongo.db.users.insert_one(mock_user_data) + + # yield the user data in case a test needs it + # but often we just need the side-effect of the user being in the DB + yield mock_user_data diff --git a/tests/scripts/test_seed_users.py b/tests/scripts/test_seed_users.py index 3e36e26..99aa0df 100644 --- a/tests/scripts/test_seed_users.py +++ b/tests/scripts/test_seed_users.py @@ -37,9 +37,9 @@ def test_seed_users_successfully(test_app): assert admin_user is not None # Verify the password was hashed - assert admin_user["password_hash"] != "AdminPassword123" + assert admin_user["password"] != "AdminPassword123" assert bcrypt.checkpw( - b"AdminPassword123", admin_user["password_hash"].encode("utf-8") + b"AdminPassword123", admin_user["password"].encode("utf-8") ) assert "Successfully seeded 2 users" in result_message diff --git a/tests/test_auth.py b/tests/test_auth.py index 2bed9fc..949cb27 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,9 +1,14 @@ """Tests for auth/JWT upgrade""" -import bcrypt +from unittest.mock import patch + +import jwt import pytest +from conftest import PLAIN_PASSWORD, TEST_USER_ID # pylint: disable=import-error + +from app import bcrypt, mongo -from app import mongo +# -------- /auth/register TESTS --------- def test_register_with_valid_data(client, users_db_setup): @@ -21,11 +26,9 @@ def test_register_with_valid_data(client, users_db_setup): assert response.status_code == 201 user_in_db = mongo.db.users.find_one({"email": "newuser@example.com"}) assert user_in_db is not None - assert "password_hash" in user_in_db - # You can even check if the password hashes match! - assert bcrypt.checkpw( - b"a-secure-password", user_in_db["password_hash"].encode("utf-8") - ) + assert "password" in user_in_db + # Check if the password hashes match! + assert bcrypt.check_password_hash(user_in_db["password"], "a-secure-password") def test_register_with_duplicate_email(client, users_db_setup): @@ -44,7 +47,7 @@ def test_register_with_duplicate_email(client, users_db_setup): # sanity-check user_in_db = mongo.db.users.find_one({"email": "newuser@example.com"}) assert user_in_db is not None - assert "password_hash" in user_in_db + assert "password" in user_in_db # Act: try to register with the same email again response = client.post("/auth/register", json=existing_user_data) @@ -149,3 +152,141 @@ def test_register_fails_with_invalid_email(client, users_db_setup, invalid_email assert isinstance(data, dict), "Expected JSON body" assert "message" in data assert "message" in data, "The error response should contain a 'message' key" + + +# ---------- /auth/login ----------------- + + +def test_login_user_returns_jwt_for_valid_credentials( + test_app, client, seeded_user_in_db +): + """ + GIVEN a user exists in the database (via seeded_user_in_db fixture) + WHEN a POST request is sent to /auth/login with correct credentials + THEN a 200 response with a valid JWT is returned + """ + # Arrange + _ = seeded_user_in_db + + login_credentials = { + "email": "testuser@example.com", + "password": PLAIN_PASSWORD, + } # plain-text password + + # ACT + response = client.post("/auth/login", json=login_credentials) + data = response.get_json() + + # Assert + assert response.status_code == 200 + assert "token" in data + + with test_app.app_context(): + # check token: we need the SECRET_KEY from the app config to decode it. + payload = jwt.decode( + data["token"], test_app.config["SECRET_KEY"], algorithms=["HS256"] + ) + assert payload["sub"] == TEST_USER_ID + + +def test_login_user_fails_for_wrong_password(client, seeded_user_in_db): + """ + GIVEN a user exists in the database + WHEN a POST request is sent to /auth/login with an incorrect password + THEN a 401 Unauthorized response is returned + """ + # Arrange + _ = seeded_user_in_db + + login_credentials = {"email": "testuser@example.com", "password": "wrong-password"} + + # Act + response = client.post("/auth/login", json=login_credentials) + + # Assert + assert response.status_code == 401 + assert response.get_json()["error"] == "Invalid credentials" + + +def test_login_user_fails_for_nonexistent_user(client, seeded_user_in_db): + """ + GIVEN a database (with or without users) + WHEN a POST request is sent to /auth/login for a user that doesn't exist + THEN a 401 Unauthorized response is returned + """ + # Arrange + _ = seeded_user_in_db + + login_credentials = {"email": "ghost@example.com", "password": "any-password"} + + # Act + response = client.post("/auth/login", json=login_credentials) + + # Assert + assert response.status_code == 401 + assert response.get_json()["error"] == "Invalid credentials" + + +@pytest.mark.parametrize( + "payload, expected_message", + [ + ("null", "Email and password are required"), # 'if not data' + ({}, "Email and password are required"), # empty JSON object + ( + {"password": "a-password"}, + "Email and password are required", + ), # Missing email + ( + {"email": "test@example.com"}, + "Email and password are required", + ), # Missing password + ], + ids=["null_payload", "empty_payload", "missing_email", "missing_password"], +) +def test_login_user_fails_with_missing_data(client, payload, expected_message): + """ + GIVEN a test client + WHEN a POST request is sent to /auth/login with incomplete data + THEN the response should be 400 BAD REQUEST with a specific error message. + """ + # Act + # For the special 'null' case, we send it as raw data. + # For all other cases (which are dicts), we send it as json. + if isinstance(payload, str): + response = client.post( + "/auth/login", data=payload, content_type="application/json" + ) + else: + response = client.post("/auth/login", json=payload) + + data = response.get_json() + + # Assert + assert response.status_code == 400 + assert data["error"] == expected_message + + +def test_login_handles_jwp_encoding_error(client, seeded_user_in_db): + """ + GIVEN a valid user is logging in + WHEN the internal PyJWT library fails to encode the token + THEN the server should catch the specific PyJWTError and return a 500 + """ + # Arrange + _ = seeded_user_in_db + login_credentials = { + "email": "testuser@example.com", + "password": "a-secure-password", + } + # Patch jwt.encode() to be a mock + with patch("app.routes.auth_routes.jwt.encode") as mock_jwt_encode: + # Configure the mock to raise the specific exception we want to test + mock_jwt_encode.side_effect = jwt.PyJWTError("Simulated library error") + + # Act + response = client.post("/auth/login", json=login_credentials) + data = response.get_json() + + # Assert + assert response.status_code == 500 + assert data["error"] == "Token generation failed" diff --git a/tests/test_decorators.py b/tests/test_decorators.py new file mode 100644 index 0000000..394e2d0 --- /dev/null +++ b/tests/test_decorators.py @@ -0,0 +1,222 @@ +# pylint: disable=redefined-outer-name +""" +Test suite for decorators + +To test the decorator in isolation, we'll create a tiny, temporary Flask app inside our test file. + +This app will have a single, simple route that does nothing but apply our decorator. +This way, we know that any success or failure is due to the decorator itself, not other application code. # pylint: disable=line-too-long +""" + +from unittest.mock import patch +from bson import ObjectId +from bson.errors import InvalidId +import pytest +import jwt +from flask import Flask, g, jsonify + +from app.utils.decorators import require_jwt +from app.utils import decorators + +# A dummy secret key for testing +TEST_SECRET_KEY = "test-secret-key" + +# Dummy route we'll protect +@require_jwt +def protected_route(): + """ + A minimal protected route function + decorated with require_jwt we are testing + + Returns: the current user's email from g.current_user as JSON + The idea is: if the decorator runs and attaches g.current_user, the route will be able to read it and return an email. If the decorator blocks the request (bad/missing token), the route body never runs. # pylint: disable=line-too-long + """ + return jsonify({"email": g.current_user["email"]}) + +@pytest.fixture +def client(): + """ + Creates a minimal app and + a test client + """ + app = Flask(__name__) + app.config["SECRET_KEY"] = TEST_SECRET_KEY + # register protected_route at /protected + app.add_url_rule("/protected", view_func=protected_route) + with app.test_client() as test_client: + yield test_client + + +def test_require_jwt_valid_token(client): + """ + GIVEN a request to a protected endpoint + WHEN the Authorization header contains a valid JWT + THEN it should succeed and the route should have access to the user in g.current_user + """ + # Arrange + # 1. Create a dummy user ID + user_id = ObjectId() + dummy_user = {"_id": user_id, "email": "test@example.com"} + + # Mock the database call + with patch("app.extensions.mongo.db.users.find_one", return_value=dummy_user), \ + patch("app.utils.decorators.jwt.decode", return_value={"sub": str(user_id)}): + + token = "valid-token" + response = client.get( + "/protected", + headers={"Authorization": f"Bearer {token}"} + ) + data = response.get_json() + + assert response.status_code == 200 + assert data["email"] == dummy_user["email"] + + +def test_require_jwt_authorization_header_missing(client): + """ + GIVEN a request to a protected endpoint + WHEN the Authorization header is missing + THEN it should return a 401 Unauthorized error + """ + # Act + response = client.get("/protected") + data = response.get_json() + + # Assert + assert response.status_code == 401 + assert data["error"] == "Authorization header missing" + +@pytest.mark.parametrize( + "auth_header", + [ + "Bearer", # Just the word "Bearer" + "Token 12345", # Wrong schema word ("Token" instead of "Bearer") + "Bearer token1 token2", # Too many parts + "JustAToken", # Only one part + ], +) +def test_require_jwt_malformed_header(client, auth_header): + """ + GIVEN a request to a protected endpoint + WHEN the Authorization header is malformed + THEN it should return a 401 Unauthorized error + """ + # Act + response = client.get("/protected", headers= {"Authorization": auth_header}) + data = response.get_json() + + # Assert + assert response.status_code == 401 + assert data["error"] == "Malformed Authorization header" + + +def test_require_jwt_expired_signature_error(client, monkeypatch): + """ + GIVEN a request to a protected endpoint + WHEN jwt.decode raises ExpiredSignatureError + THEN it should return a 401 expiration error + """ + # Arrange: patch jwt.decode where the decorator imports it + def fake_decode(*args, **kwargs): + raise jwt.ExpiredSignatureError() + + monkeypatch.setattr("app.utils.decorators.jwt.decode", fake_decode) + + # Act: send a request with any Bearer token + response = client.get("/protected", headers={"Authorization": "Bearer expired-token"}) + data = response.get_json() + + # Assert + assert response.status_code == 401 + assert data["error"] == "Token has expired" + + +def test_require_jwt_invalid_token_error(client): + """ + GIVEN a request to a protected endpoint + WHEN jwt.decode raises InvalidTokenError + THEN it should return a 401 invalid token error + """ + with patch("app.utils.decorators.jwt.decode", side_effect=jwt.InvalidTokenError()): + response = client.get("/protected", headers={"Authorization": "Bearer invalid-token"}) + data = response.get_json() + + assert response.status_code == 401 + assert data is not None + assert data["error"] == "Invalid token. Please log in again." + + +def test_require_jwt_missing_sub_claim(client): + """ + GIVEN jwt.decode returns a payload without 'sub' + WHEN we call the protected endpoint + THEN the decorator should respond 401 with a missing-sub error + """ + with patch("app.utils.decorators.jwt.decode", return_value={}): + response = client.get("/protected", headers={"Authorization": "Bearer "}) + data = response.get_json() + + assert response.status_code == 401 + assert data is not None + assert data["error"] == "Token missing subject (sub) claim" + + +@pytest.mark.parametrize( + "exc", + [ + InvalidId("bad id"), + TypeError("bad type") + ] +) +def test_require_jwt_invalid_user_id_in_token_returns_401(client, monkeypatch, exc): + """ + GIVEN jwt.decode returns a payload with a 'sub' value + WHEN converting that 'sub' into an ObjectId raises InvalidId or TypeError + THEN the endpoint should return 401 with a helpful error + """ + + # Arrange: make jwt.decode return a payload with a sub claim + monkeypatch.setattr( + "app.utils.decorators.jwt.decode", + lambda *a, **k: {"sub": "some-value"} + ) + + # Arrange: make ObjectId(...) raise the desired exception + def raise_exc(_value): + raise exc + + monkeypatch.setattr("app.utils.decorators.ObjectId", raise_exc) + + # Act + response = client.get("/protected", headers={"Authorization": "Bearer any-token"}) + data = response.get_json() + + # Assert + assert response.status_code == 401 + assert data is not None + assert data["error"] == "Invalid user id in token" + + +def test_require_jwt_user_not_found(client): + """ + GIVEN a valid token with a user_id + WHEN no matching user is found in the database + THEN it should return 401 with 'User not found' + """ + # 1. Create a valid JWT payload with a fake user_id + fake_user_id = str(ObjectId()) + token = jwt.encode({"sub": fake_user_id}, TEST_SECRET_KEY, algorithm="HS256") + + # 2. Patch mongo to simulate "no user found" + with patch.object(decorators.mongo.db.users, "find_one", return_value=None): + # Act + response = client.get( + "/protected", + headers={"Authorization": f"Bearer {token}"} + ) + data = response.get_json() + + # Assert + assert response.status_code == 401 + assert data["error"] == "User not found"