Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import datetime
from unittest.mock import Mock

import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from starlette import status

import auth_backend.auth_plugins.email
from auth_backend.models import AuthMethod, User
from auth_backend.routes.base import app
from auth_backend.settings import get_settings

Expand All @@ -23,3 +26,47 @@ def dbsession():
engine = create_engine(settings.DB_DSN)
TestingSessionLocal = sessionmaker(autocommit=True, autoflush=False, bind=engine)
return TestingSessionLocal()


@pytest.fixture()
def user_id(client: TestClient, dbsession):
time = datetime.datetime.utcnow()
body = {
"email": f"user{time}@example.com",
"password": "string"
}
client.post("/email/registration", json=body)
db_user: AuthMethod = dbsession.query(AuthMethod).filter(AuthMethod.value == body['email'],
AuthMethod.param == 'email').one()
yield db_user.user_id
for row in dbsession.query(AuthMethod).filter(AuthMethod.user_id == db_user.user_id).all():
dbsession.delete(row)
dbsession.delete(dbsession.query(User).filter(User.id == db_user.user_id).one())
dbsession.flush()


@pytest.fixture()
def user(client: TestClient, dbsession):
url = "/email/login"
time = datetime.datetime.utcnow()
body = {
"email": f"user{time}@example.com",
"password": "string"
}
client.post("/email/registration", json=body)
db_user: AuthMethod = dbsession.query(AuthMethod).filter(AuthMethod.value == body['email'],
AuthMethod.param == 'email').one()
response = client.post(url, json=body)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
token = dbsession.query(AuthMethod).filter(AuthMethod.user_id == db_user.user_id,
AuthMethod.param == "confirmation_token",
AuthMethod.auth_method == "email").one()
response = client.get(f"/email/approve?token={token.value}")
assert response.status_code == status.HTTP_200_OK
response = client.post(url, json=body)
assert response.status_code == status.HTTP_200_OK
yield {"user_id": db_user.user_id, "body": body, "login_json": response.json()}
for row in dbsession.query(AuthMethod).filter(AuthMethod.user_id == db_user.user_id).all():
dbsession.delete(row)
dbsession.delete(dbsession.query(User).filter(User.id == db_user.user_id).one())
dbsession.flush()
68 changes: 68 additions & 0 deletions tests/test_routes/test_change_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import pytest
from starlette import status
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from auth_backend.models.db import AuthMethod, UserSession


url = "/email/reset/email/"


@pytest.mark.skip()
def test_main_scenario(client: TestClient, dbsession: Session, user):
user_id, body, login = user["user_id"], user["body"], user["login_json"]
conf_token_1 = dbsession.query(AuthMethod).filter(AuthMethod.user_id == user_id,
AuthMethod.param == "confirmation_token").one().value
response = client.post(f"{url}{user_id}/request", json={"token": login["token"], "email": "changed@mail.com"})
assert response.status_code == status.HTTP_200_OK

conf_token_2 = dbsession.query(AuthMethod).filter(AuthMethod.user_id == user_id, AuthMethod.param == "confirmation_token").one().value
assert conf_token_2 != conf_token_1

assert not dbsession.query(UserSession).filter(UserSession.token == login["token"]).one().expired

response = client.post(f"/email/login", json=body)
assert response.status_code == status.HTTP_200_OK

response = client.post(f"/email/login", json={"email": "changed@mail.com", "password": body["password"]})
assert response.status_code == status.HTTP_401_UNAUTHORIZED

response = client.get(f"{url}{user_id}?token={conf_token_1}&email=changed@mail.com")
assert response.status_code == status.HTTP_403_FORBIDDEN

response = client.get(f"{url}{user_id}?token={conf_token_2}&email=changed@mail.com")
assert response.status_code == status.HTTP_200_OK

response = client.post(f"/email/login", json=body)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

response = client.post(f"/email/login", json={"email": "changed@mail.com", "password": body["password"]})
assert response.status_code == status.HTTP_200_OK


@pytest.mark.skip()
def test_invalid_jsons(client: TestClient, dbsession: Session, user):
user_id, body, login = user["user_id"], user["body"], user["login_json"]

response = client.post(f"{url}{user_id}/request", json={"token": "", "email": "changed@mail.com"})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

response = client.post(f"{url}{user_id}/request", json={"token": login["token"], "email": ""})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

response = client.post(f"{url}{user_id}/request", json={"token": "", "email": ""})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY


@pytest.mark.skip()
def test_expired_token(client: TestClient, dbsession: Session, user):
user_id, body, login = user["user_id"], user["body"], user["login_json"]
response = client.post("/logout", json={"token": login["token"]})
assert response.status_code == status.HTTP_200_OK

response = client.post(f"{url}{user_id}/request", json={"token": login["token"], "email": "changed@mail.com"})
assert response.status_code == status.HTTP_401_UNAUTHORIZED




100 changes: 100 additions & 0 deletions tests/test_routes/test_change_password.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import pytest
from starlette import status
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from auth_backend.models.db import AuthMethod


url = "/email/reset/password/"


@pytest.mark.skip()
def test_unprocessable_jsons_no_token(client: TestClient, dbsession: Session, user_id: int):
token = dbsession.query(AuthMethod).filter(AuthMethod.user_id == user_id,
AuthMethod.param == "confirmation_token",
AuthMethod.auth_method == "email").one()
response = client.get(f"/email/approve?token={token.value}")
assert response.status_code == status.HTTP_200_OK

response = client.post(f"{url}{user_id}/request")
assert response.status_code == status.HTTP_200_OK
reset_token = dbsession.query(AuthMethod).filter(AuthMethod.auth_method == "email",
AuthMethod.param == "reset_token", AuthMethod.user_id == user_id).one()
assert reset_token

response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": ""})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

response = client.post(f"{url}{user_id}", json={"reset_token": "", "new_password": ""})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

response = client.post(f"{url}{user_id}", json={"reset_token": "", "new_password": "changedstring3"})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY


@pytest.mark.skip()
def test_unprocessable_jsons_with_token(client: TestClient, dbsession: Session, user):
user_id, body, response = user["user_id"], user["body"], user["login_json"]
auth_token = response["token"]

response = client.post(f"{url}{user_id}/request", json={"token": auth_token, "password": ""})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

response = client.post(f"{url}{user_id}/request", json={"token": "", "password": ""})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

response = client.post(f"{url}{user_id}/request", json={"token": "", "password": "string"})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

response = client.post(f"{url}{user_id}/request", json={"token": auth_token, "password": "string"})
assert response.status_code == status.HTTP_200_OK


@pytest.mark.skip()
def test_no_token(client: TestClient, dbsession: Session, user_id: str):
token = dbsession.query(AuthMethod).filter(AuthMethod.user_id == user_id,
AuthMethod.param == "confirmation_token",
AuthMethod.auth_method == "email").one()
response = client.post(f"{url}{user_id}/request")
assert response.status_code == status.HTTP_403_FORBIDDEN

response = client.get(f"/email/approve?token={token.value}")
assert response.status_code == status.HTTP_200_OK

response = client.post(f"{url}{user_id}/request")
assert response.status_code == status.HTTP_200_OK
reset_token = dbsession.query(AuthMethod).filter(AuthMethod.auth_method == "email", AuthMethod.param == "reset_token", AuthMethod.user_id == user_id).one()
assert reset_token

response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": "changedstring"})
assert response.status_code == status.HTTP_200_OK

response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": "changedstring2"})
assert response.status_code == status.HTTP_403_FORBIDDEN


@pytest.mark.skip()
def test_with_token(client: TestClient, dbsession: Session, user):
user_id, body, response = user["user_id"], user["body"], user["login_json"]
auth_token = response["token"]

response = client.post(f"{url}{user_id}/request", json={"token": auth_token, "password": "wrong"})
assert response.status_code == status.HTTP_403_FORBIDDEN

response = client.post(f"{url}{user_id}/request", json={"token": auth_token, "password": "string"})
assert response.status_code == status.HTTP_200_OK
reset_token = dbsession.query(AuthMethod).filter(AuthMethod.auth_method == "email",
AuthMethod.param == "reset_token", AuthMethod.user_id == user_id).one()
assert reset_token

response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": "changedstring"})
assert response.status_code == status.HTTP_200_OK

response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": "changedstring2"})
assert response.status_code == status.HTTP_403_FORBIDDEN






28 changes: 4 additions & 24 deletions tests/test_routes/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,14 @@ def test_invalid_email(client: TestClient):
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY


def test_main_scenario(client: TestClient, dbsession: Session):
time = datetime.datetime.utcnow()
body = {
"email": f"user{time}@example.com",
"password": "string"
}
client.post("/email/registration", json=body)
db_user: AuthMethod = dbsession.query(AuthMethod).filter(AuthMethod.value == body['email'],
AuthMethod.param == 'email').one()
id = db_user.user_id
response = client.post(url, json=body)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
query = dbsession.query(AuthMethod).filter(AuthMethod.auth_method == "email", AuthMethod.param == "email", AuthMethod.value == body["email"]).one()
token = dbsession.query(AuthMethod).filter(AuthMethod.user_id == query.user.id, AuthMethod.param == "confirmation_token", AuthMethod.auth_method =="email").one()
response = client.get(f"/email/approve?token={token.value}")
assert response.status_code == status.HTTP_200_OK
response = client.post(url, json=body)
assert response.status_code == status.HTTP_200_OK
def test_main_scenario(client: TestClient, dbsession: Session, user):
user_id, body, response = user["user_id"], user["body"], user["login_json"]
body_with_uppercase = {
"email": f"User{time}@example.com",
"email": body["email"].replace("u", "U"),
"password": "string"
}
response = client.post(url, json=body_with_uppercase)
assert response.status_code == status.HTTP_200_OK
for row in dbsession.query(AuthMethod).filter(AuthMethod.user_id == id).all():
dbsession.delete(row)
dbsession.delete(dbsession.query(User).filter(User.id == id).one())
dbsession.flush()


def test_incorrect_data(client: TestClient, dbsession: Session):
Expand All @@ -65,7 +45,7 @@ def test_incorrect_data(client: TestClient, dbsession: Session):
"email": "wrong@example.com",
"password": "strong"
}
response = client.post("/email/registration", json=body1)
client.post("/email/registration", json=body1)
db_user: AuthMethod = dbsession.query(AuthMethod).filter(AuthMethod.value == body1['email'],
AuthMethod.param == 'email').one()
id = db_user.user_id
Expand Down