In [2]:
!pip install python-jose passlib[bcrypt]


Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
from datetime import datetime, timedelta
from jose import jwt, JWTError
from passlib.context import CryptContext


In [8]:
import hashlib

def hash_pwd(password: str) -> str:
    return hashlib.sha256(password.encode()).hexdigest()

def verify_pwd(password: str, hashed: str) -> bool:
    return hashlib.sha256(password.encode()).hexdigest() == hashed


In [9]:
USERS = {
    "intern": {
        "password": hash_pwd("intern123"),
        "role": "intern"
    },
    "finance": {
        "password": hash_pwd("finance123"),
        "role": "finance"
    },
    "admin": {
        "password": hash_pwd("admin123"),
        "role": "admin"
    }
}


In [10]:
SECRET_KEY = "secret"
ALGORITHM = "HS256"

def create_token(data, expires_delta):
    payload = data.copy()
    payload["exp"] = datetime.utcnow() + expires_delta
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def decode_token(token):
    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])


In [11]:
def login(username, password):
    user = USERS.get(username)

    if not user or not verify_pwd(password, user["password"]):
        return {"status": 401, "error": "Invalid credentials"}

    access_token = create_token(
        {"sub": username, "role": user["role"]},
        timedelta(minutes=15)
    )

    refresh_token = create_token(
        {"sub": username},
        timedelta(days=7)
    )

    return {
        "status": 200,
        "access_token": access_token,
        "refresh_token": refresh_token
    }


In [12]:
def refresh(refresh_token):
    try:
        payload = decode_token(refresh_token)
        username = payload["sub"]
        role = USERS[username]["role"]
    except Exception:
        return {"status": 401, "error": "Invalid refresh token"}

    new_access = create_token(
        {"sub": username, "role": role},
        timedelta(minutes=15)
    )

    return {"status": 200, "access_token": new_access}


In [13]:
PERMISSION_MAP = {
    "search": {"intern", "finance", "admin"},
    "finance_view": {"finance", "admin"},
    "admin_only": {"admin"},
}

def require_permission(token, permission):
    if not token:
        return {"status": 401, "error": "Missing token"}

    try:
        payload = decode_token(token)
        role = payload.get("role")
    except JWTError:
        return {"status": 401, "error": "Invalid token"}

    if role not in PERMISSION_MAP.get(permission, set()):
        return {"status": 403, "error": "Forbidden"}

    return {"status": 200, "user": payload}


In [14]:
def search_endpoint(token):
    return require_permission(token, "search")

def finance_endpoint(token):
    return require_permission(token, "finance_view")

def admin_endpoint(token):
    return require_permission(token, "admin_only")


In [15]:
intern_login = login("intern", "intern123")
finance_login = login("finance", "finance123")
admin_login = login("admin", "admin123")

intern_login


  payload["exp"] = datetime.utcnow() + expires_delta


{'status': 200,
 'access_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJpbnRlcm4iLCJyb2xlIjoiaW50ZXJuIiwiZXhwIjoxNzY2NzY4OTg0fQ.Fz_BhloEdQn3HFn4OWBkhV-fSdzGkyLTj9wS8Im0UQk',
 'refresh_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJpbnRlcm4iLCJleHAiOjE3NjczNzI4ODR9.Rb93qaqqYDHflXLK0G10G0rfP-EebgaLyz0cHOppncI'}

In [16]:
refresh(intern_login["refresh_token"])


  payload["exp"] = datetime.utcnow() + expires_delta


{'status': 200,
 'access_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJpbnRlcm4iLCJyb2xlIjoiaW50ZXJuIiwiZXhwIjoxNzY2NzY5MDA1fQ.iFlPqovl3FK7S6TyD3deZgMfaV1mHDTP32Flz_9c-_I'}

In [17]:
search_endpoint(None)


{'status': 401, 'error': 'Missing token'}

In [18]:
search_endpoint("invalid.token.here")


{'status': 401, 'error': 'Invalid token'}

In [19]:
search_endpoint(intern_login["access_token"])


{'status': 200, 'user': {'sub': 'intern', 'role': 'intern', 'exp': 1766768984}}

In [20]:
finance_endpoint(intern_login["access_token"])


{'status': 403, 'error': 'Forbidden'}

In [22]:
fake_token = create_token(
    {"sub": "hacker", "role": "unknown"},
    timedelta(minutes=5))
search_endpoint(fake_token)


  payload["exp"] = datetime.utcnow() + expires_delta


{'status': 403, 'error': 'Forbidden'}

In [23]:
admin_endpoint(admin_login["access_token"])


{'status': 200, 'user': {'sub': 'admin', 'role': 'admin', 'exp': 1766768984}}