<a href="https://colab.research.google.com/github/khuwajashafique/khuwajashafique.github.io/blob/main/USER%20ROLES%20%26%20ACCESS%20MANAGEMENT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
from fastapi import FastAPI, Request, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from typing import List, Optional, Callable, Dict, Any
from pydantic import BaseModel
from pydantic_settings import BaseSettings
from jose import JWTError, jwt
from enum import Enum
from functools import lru_cache
import time

# ---------- Configuration (Loaded from Environment) ----------
class Settings(BaseSettings):
    """Loads configuration from environment variables."""
    # Default is for demo; set this in your .env file or environment
    SECRET_KEY: str = "CHANGE_THIS_TO_A_LONG_RANDOM_SECRET_IN_YOUR_ENV"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 60

    class Config:
        # This allows loading from a .env file (if python-dotenv is installed)
        env_file = ".env"

@lru_cache()
def get_settings():
    """Returns a cached instance of the settings."""
    return Settings()

# ---------- FastAPI App and Security Scheme ----------
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
app = FastAPI(title="Admissions SaaS — RBAC Refactored")


# ---------- Permission Enum & Role Mapping ----------
class Permission(str, Enum):
    APPS_CREATE = "applications:create"
    APPS_READ   = "applications:read"
    APPS_UPDATE = "applications:update"
    APPS_DELETE = "applications:delete"
    REVIEWS_CREATE = "reviews:create"
    REVIEWS_READ   = "reviews:read"
    REVIEWS_UPDATE = "reviews:update"
    ASSIGN = "assignments:assign"
    USERS_MANAGE = "users:manage"
    INSTITUTION_CONFIG = "institution:configure"
    ANALYTICS_VIEW = "analytics:view"
    SYSTEM_MANAGE = "system:manage"
    PAYMENTS_VIEW = "payments:view"
    PAYMENTS_REFUND = "payments:refund"

# Role → permissions mapping (In prod, load from DB / config)
ROLE_PERMISSIONS: Dict[str, List[Permission]] = {
    "super_admin": list(Permission),  # all
    "institution_admin": [
        Permission.USERS_MANAGE,
        Permission.INSTITUTION_CONFIG,
        Permission.ANALYTICS_VIEW,
        Permission.APPS_READ,
        Permission.APPS_UPDATE,
        Permission.APPS_DELETE,
        Permission.PAYMENTS_VIEW,
    ],
    "admission_officer": [
        Permission.APPS_CREATE,
        Permission.APPS_READ,
        Permission.APPS_UPDATE,
        Permission.ASSIGN,
        Permission.REVIEWS_READ,
    ],
    "reviewer": [
        Permission.REVIEWS_CREATE,
        Permission.REVIEWS_READ,
        Permission.REVIEWS_UPDATE,
    ],
    "student": [
        Permission.APPS_CREATE,
        Permission.APPS_READ,
        Permission.PAYMENTS_VIEW,
    ],
}

# ---------- Pydantic user model extracted from token ----------
class TokenUser(BaseModel):
    user_id: str
    role: str
    institution_id: Optional[str] = None
    # 'scopes' will contain the string values from the Permission enum
    scopes: List[str] = []

# ---------- JWT utilities ----------
def create_access_token(data: dict, expires_delta_seconds: Optional[int] = None):
    settings = get_settings()
    to_encode = data.copy()

    if expires_delta_seconds:
        expire = int(time.time()) + int(expires_delta_seconds)
    else:
        expire = int(time.time()) + (settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60)

    to_encode["exp"] = expire
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt

def decode_token(token: str) -> TokenUser:
    settings = get_settings()
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])

        user_id: str = payload.get("user_id")
        role: str = payload.get("role")
        institution_id: Optional[str] = payload.get("institution_id")
        scopes = payload.get("scopes", [])

        if user_id is None or role is None:
            raise credentials_exception

        return TokenUser(
            user_id=user_id,
            role=role,
            institution_id=institution_id,
            scopes=scopes
        )
    except JWTError:
        raise credentials_exception

# ---------- Core dependency: get_current_user ----------
async def get_current_user(token: str = Depends(oauth2_scheme)) -> TokenUser:
    return decode_token(token)

# ---------- Permission dependency factory ----------
def require_permissions(*required_perms: Permission):
    """
    Returns a FastAPI dependency that raises 403 if user's token
    lacks any required permission (scope).
    """
    async def _require(user: TokenUser = Depends(get_current_user)):
        # We check the 'scopes' list that was embedded in the token.
        user_scopes = set(user.scopes)

        # Check if all required permissions (as strings) are in the user's scopes
        missing = [str(p.value) for p in required_perms if str(p.value) not in user_scopes]

        # No 'super_admin' short-circuit is needed. A super_admin token
        # is correctly created with all scopes, so this check will pass.

        if missing:
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
                                detail=f"Missing permissions: {', '.join(missing)}")
        return user
    return _require

# ---------- Resource scope checks ----------
async def require_institution_scope(
    institution_id: str,
    user: TokenUser = Depends(get_current_user)
) -> TokenUser:
    """
    Ensures that user belongs to the same institution OR is super_admin.
    This is a TENANCY check, not an RBAC check.
    """
    if user.role == "super_admin":
        return user
    if user.institution_id != institution_id:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
                            detail="Insufficient scope for institution resource")
    return user

def application_access_check(application: Dict[str, Any], user: TokenUser):
    """
    This is an ATTRIBUTE/OWNERSHIP-BASED (ABAC) check.
    It checks the user's relationship to the *specific resource*.
    """
    if user.role == "super_admin":
        return True

    # Student: only own applications
    if user.role == "student":
        if application.get("applicant_user_id") == user.user_id:
            return True
        raise HTTPException(status_code=403, detail="Students can only access their own applications")

    # Reviewer: only assigned apps
    if user.role == "reviewer":
        if application.get("assigned_reviewer_id") == user.user_id:
            return True
        raise HTTPException(status_code=403, detail="Reviewer not assigned to this application")

    # Institution roles: must belong to same institution
    if user.institution_id == application.get("institution_id") and user.role in ("admission_officer", "institution_admin"):
        return True

    raise HTTPException(status_code=403, detail="Access to application denied")

# ---------- Example data store (in-memory for demo) ----------
DB_APPLICATIONS: Dict[str, Dict[str, Any]] = {
    "app-100": {"id": "app-100", "institution_id": "inst-1", "applicant_user_id": "stu-1", "assigned_reviewer_id": "rev-1", "payload": {"program":"MSc"}}
}

# ---------- Example endpoints ----------

# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !! WARNING: DEMO ONLY !!
# !!
# !! This endpoint is INSECURE. It allows anyone to generate a token
# !! for any role. In production, replace this with a real
# !! authentication endpoint (e.g., /auth/login) that validates
# !! a username and password against a database.
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@app.post("/auth/token")
async def auth_token_for_demo(username: str, role: str, institution_id: Optional[str] = None):
    """
    Demo-only token generation. In prod authenticate with password / OAuth.
    """
    # Map username to user_id in a real user DB
    user_id = f"user-{username}"

    # Get the list of Permission enums for the role
    role_permissions = ROLE_PERMISSIONS.get(role, [])

    # --- IMPORTANT ---
    # Convert the Permission enums to their string values
    # for the JWT 'scopes' claim.
    scopes_list = [permission.value for permission in role_permissions]

    token_data = {
        "user_id": user_id,
        "role": role,
        "institution_id": institution_id,
        "scopes": scopes_list  # Pass the list of strings
    }

    token = create_access_token(token_data)
    return {"access_token": token, "token_type": "bearer"}

@app.post("/applications", dependencies=[Depends(require_permissions(Permission.APPS_CREATE))])
async def create_application(payload: dict, user: TokenUser = Depends(get_current_user)):
    # The dependency already checked if the user's role has APPS_CREATE.
    # Now we just apply business logic.
    if user.role == "student":
        new_id = f"app-{len(DB_APPLICATIONS)+1}"
        DB_APPLICATIONS[new_id] = {"id": new_id, "institution_id": user.institution_id, "applicant_user_id": user.user_id, "payload": payload}
        return {"status": "created", "application_id": new_id}

    elif user.role in ("admission_officer", "institution_admin", "super_admin"):
        new_id = f"app-{len(DB_APPLICATIONS)+1}"
        inst_id = payload.get("institution_id", user.institution_id)
        DB_APPLICATIONS[new_id] = {"id": new_id, "institution_id": inst_id, "applicant_user_id": payload.get("applicant_user_id"), "payload": payload}
        return {"status": "created", "application_id": new_id}

    else:
        # This case is technically unreachable if ROLE_PERMISSIONS is correct
        raise HTTPException(status_code=403, detail="Role not allowed to create applications")

@app.get("/applications/{app_id}")
async def get_application(app_id: str, user: TokenUser = Depends(get_current_user)):
    # Note: We don't use require_permissions(Permission.APPS_READ) here
    # because access is highly conditional (e.g., student vs. reviewer)
    # We will do the RBAC and ABAC checks manually inside.

    app_data = DB_APPLICATIONS.get(app_id)
    if not app_data:
        raise HTTPException(status_code=404, detail="Application not found")

    # 1. Check if user has the *base* permission to read any app
    # (This is the RBAC check)
    if Permission.APPS_READ.value not in user.scopes and user.role != "super_admin":
        # Manually check for reviewer/student roles which have special read logic
        if not (user.role == "reviewer" and Permission.REVIEWS_READ.value in user.scopes):
             raise HTTPException(status_code=403, detail="Missing required permission: applications:read")

    # 2. Enforce fine-grained ABAC (ownership/assignment) checks
    application_access_check(app_data, user)

    # 3. Field-level redaction
    response = app_data.copy()
    if user.role == "reviewer":
        response["payload"].pop("payment_details", None)
        response["payload"].pop("identity_docs", None)

    return response

@app.post("/applications/{app_id}/assign")
async def assign_reviewer(
    app_id: str,
    reviewer_id: str,
    user: TokenUser = Depends(require_permissions(Permission.ASSIGN))
):
    """
    Admission Officer assigns reviewer to an application.
    Requires: ASSIGN permission and institution scope.
    """
    app_data = DB_APPLICATIONS.get(app_id)
    if not app_data:
        raise HTTPException(status_code=404, detail="Application not found")

    # The 'user' is already guaranteed to have ASSIGN permission.
    # Now, we just check the TENANCY scope.
    await require_institution_scope(app_data["institution_id"], user)

    app_data["assigned_reviewer_id"] = reviewer_id
    return {"status": "assigned", "application_id": app_id, "assigned_reviewer_id": reviewer_id}


@app.post("/applications/{app_id}/reviews", dependencies=[Depends(require_permissions(Permission.REVIEWS_CREATE))])
async def create_review(app_id: str, review_payload: dict, user: TokenUser = Depends(get_current_user)):
    app_data = DB_APPLICATIONS.get(app_id)
    if not app_data:
        raise HTTPException(status_code=404, detail="Application not found")

    # User is guaranteed to have REVIEWS_CREATE.
    # Now check if they are the *assigned* reviewer (ABAC check).
    application_access_check(app_data, user)

    review_id = f"rev-{app_id}-{user.user_id}"
    return {"status": "reviewed", "review_id": review_id, "application": app_id}


@app.get("/admin-only", dependencies=[Depends(require_permissions(Permission.SYSTEM_MANAGE))])
async def admin_only_route(user: TokenUser = Depends(get_current_user)):
    # This endpoint is doubly protected by the dependency.
    # We are guaranteed the user has the SYSTEM_MANAGE permission.
    return {"msg": f"Hello system manager (or super_admin) {user.user_id}"}


# ---------- Notes and best practices (updated) ----------
"""
Best practices to operationalize:
- SECRET_KEY: This is now loaded from the environment. Protect it.
- ROLE_PERMISSIONS: Persist this mapping in a database.
- RS256: Use an asymmetric algorithm (like RS256) in production.
  Your Auth service signs with the private key, and resource services (like this)
  verify with the public key. This prevents this service from being
  able to create valid tokens.
- Auditing: Log every state-changing operation (POST, PUT, PATCH, DELETE)
  with the user_id, timestamp, and resource ID.
- Refresh Tokens: Implement a refresh token flow to allow users to get
  new access tokens without re-entering credentials.
- Revocation: For high-security needs, implement a token revocation list
  (e.g., in Redis) to instantly de-authorize compromised tokens.
"""

'\nBest practices to operationalize:\n- SECRET_KEY: This is now loaded from the environment. Protect it.\n- ROLE_PERMISSIONS: Persist this mapping in a database.\n- RS256: Use an asymmetric algorithm (like RS256) in production.\n  Your Auth service signs with the private key, and resource services (like this)\n  verify with the public key. This prevents this service from being\n  able to create valid tokens.\n- Auditing: Log every state-changing operation (POST, PUT, PATCH, DELETE)\n  with the user_id, timestamp, and resource ID.\n- Refresh Tokens: Implement a refresh token flow to allow users to get\n  new access tokens without re-entering credentials.\n- Revocation: For high-security needs, implement a token revocation list\n  (e.g., in Redis) to instantly de-authorize compromised tokens.\n'

In [9]:
!pip install fastapi uvicorn "python-jose[cryptography]" pydantic-settings

