Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions routers/user.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
from fastapi import status,APIRouter
from fastapi import status, APIRouter
from fastapi.responses import JSONResponse
from utils.jwt_manager import create_token
from schemas.user import User,UserBase,UserCreate
from schemas.user import User, UserBase, UserCreate
from config.database import Session
from services.user import UserService
from services.auth import Auth

user_router = APIRouter()
db = Session()


@user_router.post('/users', tags=['Auth'], response_model=User, status_code=status.HTTP_200_OK)
def create_user(user: UserCreate):
if check_user_exists(user):
return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content={"message": "User already exists"})

@user_router.post('/users',tags=['Auth'],response_model=User,status_code=status.HTTP_200_OK)
def create_user(user:UserCreate):

db = Session()

result = UserService(db).get_user_by_email(email=user.email)

if result:

return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST,content={"message":"User already exists"})

UserService(db).create_user(user)

return JSONResponse(status_code=status.HTTP_200_OK,content={"message":"User created"})


@user_router.post('/login',tags=['Auth'],status_code=status.HTTP_200_OK)
def login(user:UserCreate):

db = Session()
result = UserService(db).get_user_by_email(email=user.email)

if not (result and Auth().verify_password(user.password,result.password)):

return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED,content={"message":"Unauthorized"})

token:str = create_token(user.dict())

return JSONResponse(status_code=status.HTTP_200_OK,content=token)

return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "User created"})


def check_user_exists(user):
return bool(UserService(db).get_user_by_email(email=user.email))


@user_router.post('/login', tags=['Auth'], status_code=status.HTTP_200_OK)
def login(user: UserCreate):

if validates_password(user):
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "Unauthorized"})

token: str = create_token(user.dict())
Copy link
Collaborator Author

@ArmCM ArmCM Sep 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duda 🤔 , cual es el tipo de dato exacto que debe ser el token?

ya que aqui indicas que token debe ser de tipo string pero el método create_token retorna un dict.


y el IDE me marco un error, busque y me indica el dict esta deprecated para la versión 2 de pydantic

https://docs.pydantic.dev/latest/migration/#changes-to-pydanticbasemodel

pero desconozco si se hizo así por alguna razón .Aquí es a tu consideración bro


return JSONResponse(status_code=status.HTTP_200_OK, content=token)


def validates_password(user):
user_found = UserService(db).get_user_by_email(email=user.email)

return not bool(check_user_exists(user) and Auth().verify_password(user.password, user_found.password))
32 changes: 20 additions & 12 deletions utils/jwt_manager.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
from jwt import encode,decode
from datetime import datetime,timedelta
from jwt import encode, decode
from datetime import datetime, timedelta
from utils.settings import Settings

settings = Settings()

def create_token(data:dict) -> dict:
payload = expire_token(data)
token:str = encode(payload,key=settings.MY_SECRET_KEY,algorithm="HS256")

def create_token(data: dict) -> dict:
payload = add_expiration_date(data)

token: str = encode(payload, key=settings.MY_SECRET_KEY, algorithm="HS256")
return token

def validate_token(token:str) -> dict:
data:dict = decode(token,key=settings.MY_SECRET_KEY,algorithms=["HS256"])

def validate_token(token: str) -> dict:
data: dict = decode(token, key=settings.MY_SECRET_KEY, algorithms=["HS256"])

return data

def expire_token(data:dict):

def calculate_token_expiration():
return datetime.utcnow() + timedelta(minutes=settings.TOKEN_EXPIRE_MINUTES)


def add_expiration_date(data: dict):
to_encode = data.copy()
token_expires = timedelta(minutes=settings.TOKEN_EXPIRE_MINUTES)
expire = datetime.utcnow() + token_expires
to_encode.update({'exp':expire})
return to_encode
to_encode['exp'] = calculate_token_expiration()

return to_encode