# CodeGenie AI - Colab Edition (Local HF Models)

In [None]:
## Step 1: Install Dependencies

In [None]:
%%capture
!pip install -q transformers accelerate torch streamlit pyngrok python-dotenv PyJWT vaderSentiment Pillow pandas matplotlib requests fastapi uvicorn nest_asyncio bitsandbytes sentencepiece

In [None]:
## Step 2: Setup Environment

In [None]:
#@title Setup Environment Variables
import os

#@markdown Enter your tokens:
HF_TOKEN = "your_token" #@param {type:"string"}
NGROK_TOKEN = "your_token" #@param {type:"string"}

# Write to .env file
with open('.env', 'w') as f:
    f.write(f"HF_TOKEN={HF_TOKEN}\n")
    f.write(f"NGROK_TOKEN={NGROK_TOKEN}\n")
    f.write("JWT_SECRET_KEY=colab_secret_key_12345\n") # Default for Colab
    f.write("SMTP_EMAIL=yourmailaddress\n")
    f.write("SMTP_PASSWORD=yourappassword\n")
    f.write("SMTP_SERVER=smtp.gmail.com\n")
    f.write("SMTP_PORT=587\n")

print("✅ Environment variables set.")


In [None]:
## Step 3: Create Directory Structure

In [None]:
!mkdir -p backend streamlit_app/avatars

In [None]:
## Step 4: Backend Modules

In [None]:
%%writefile backend/model_loader.py
import streamlit as st
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel, BitsAndBytesConfig
import os
from dotenv import load_dotenv

load_dotenv()

# Models Configuration
MODELS_CONFIG = {
    "gemma": "google/gemma-2b-it",
    "deepseek": "deepseek-ai/deepseek-coder-1.3b-instruct",
    "phi-2": "microsoft/phi-2"
}

@st.cache_resource
def load_models():
    models = {}
    tokenizers = {}
    
    hf_token = os.getenv("HF_TOKEN")
    if not hf_token:
        st.error("HF_TOKEN not found in environment variables.")
        return None, None

    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Loading models on {device}...")

    # Quantization Config (4-bit)
    if device == "cuda":
        qconf = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.bfloat16,
            bnb_4bit_quant_type="nf4"
        )
    else:
        qconf = None

    for name, model_id in MODELS_CONFIG.items():
        try:
            print(f"Loading {name} ({model_id})...")
            tokenizer = AutoTokenizer.from_pretrained(model_id, token=hf_token, trust_remote_code=True)
            
            # Apply specific chat template for Gemma
            if name == "gemma":
                tokenizer.chat_template = (
                    "{% for message in messages %}"
                    "{% if message['role'] == 'user' %}"
                    "<start_of_turn>user\n{{ message['content'] }}<end_of_turn>\n"
                    "{% elif message['role'] == 'model' %}"
                    "<start_of_turn>model\n{{ message['content'] }}<end_of_turn>\n"
                    "{% endif %}"
                    "{% endfor %}"
                    "{% if add_generation_prompt %}"
                    "<start_of_turn>model\n"
                    "{% endif %}"
                )

            # Load Model
            if device == "cuda":
                model = AutoModelForCausalLM.from_pretrained(
                    model_id, 
                    token=hf_token,
                    quantization_config=qconf,
                    device_map="auto",
                    trust_remote_code=True
                )
            else:
                model = AutoModelForCausalLM.from_pretrained(
                    model_id, 
                    token=hf_token,
                    trust_remote_code=True
                ).to(device)

            models[name] = model
            tokenizers[name] = tokenizer
            print(f"✅ {name} loaded successfully.")
            
        except Exception as e:
            print(f"❌ Failed to load {name}: {e}")
            st.error(f"Failed to load {name}: {e}")

    return models, tokenizers

def get_model(model_name):
    models, tokenizers = load_models()
    if models and model_name in models:
        return models[model_name], tokenizers[model_name]
    return None, None


In [None]:
%%writefile backend/jwt_utils.py
# -*- coding: utf-8 -*-
"""JWT Utilities Module

Handles generation and verification of JSON Web Tokens.
"""

import jwt
import os
import datetime
from typing import Dict, Any, Optional

# Secret key for signing tokens
# In production, this should be a strong secret loaded from env
SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "super-secret-dev-key-change-in-prod")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

def create_access_token(data: Dict[str, Any], expires_delta: Optional[datetime.timedelta] = None) -> str:
    """Create a new access token."""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.datetime.utcnow() + expires_delta
    else:
        expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    
    to_encode.update({"exp": expire, "type": "access"})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def create_refresh_token(data: Dict[str, Any]) -> str:
    """Create a new refresh token."""
    to_encode = data.copy()
    expire = datetime.datetime.utcnow() + datetime.timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    
    to_encode.update({"exp": expire, "type": "refresh"})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str) -> Optional[Dict[str, Any]]:
    """Verify and decode a token."""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        return None
    except jwt.PyJWTError:
        return None


In [None]:
%%writefile backend/user_management_module.py
# -*- coding: utf-8 -*-
"""User Management Module

Handles user registration, login tracking, member replacement, and activity history.
Now supports RBAC (Admin/User), Security Questions, and SMTP-based OTP.
"""

import json
import os
import logging
import threading
import smtplib
import re
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Resolve to the project's streamlit_app folder
CURRENT_FILE_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_FILE_DIR, '..'))
STREAMLIT_APP_DIR = os.path.join(PROJECT_ROOT, 'streamlit_app')
USERS_FILE = os.path.join(STREAMLIT_APP_DIR, 'users.json')
USER_ACTIVITY_FILE = os.path.join(STREAMLIT_APP_DIR, 'user_activity.json')

# Use a lock for thread-safe file operations
file_lock = threading.Lock()

# SMTP Configuration (Load from env in production)
SMTP_EMAIL = os.environ.get("SMTP_EMAIL", "")
SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD", "")
SMTP_SERVER = os.environ.get("SMTP_SERVER", "smtp.gmail.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", 587))


def _ensure_files_exist():
    """Ensure JSON files exist with proper structure."""
    os.makedirs(os.path.dirname(USERS_FILE), exist_ok=True)
    
    if not os.path.exists(USERS_FILE):
        with open(USERS_FILE, 'w', encoding='utf-8') as f:
            json.dump([], f, indent=4)
    
    if not os.path.exists(USER_ACTIVITY_FILE):
        with open(USER_ACTIVITY_FILE, 'w', encoding='utf-8') as f:
            json.dump([], f, indent=4)


def validate_email(email: str) -> bool:
    """
    Validate email format:
    - Min 3 chars before @
    - Allowed domains: @gmail.com, @mail.com
    - Allowed TLDs: .edu, .in
    - No leading dots in domain (e.g. abc@.com is invalid)
    """
    if not email:
        return False
        
    parts = email.split('@')
    if len(parts) != 2:
        return False
        
    local_part, domain_part = parts
    
    if len(local_part) < 3:
        return False
    
    domain_part = domain_part.lower()
    
    # Check for leading dot (e.g. @.com)
    if domain_part.startswith('.'):
        return False

    # Whitelisted full domains
    if domain_part in ['gmail.com', 'mail.com']:
        return True
        
    # Whitelisted TLDs
    if domain_part.endswith('.edu') or domain_part.endswith('.in'):
        return True
            
    return False

def validate_password(password: str) -> bool:
    """
    Validate password policy:
    - Minimum 8 characters
    - Must contain alphabets, numbers, and at least one special symbol
    """
    if len(password) < 8:
        return False
    if not any(c.isalpha() for c in password):
        return False
    if not any(c.isdigit() for c in password):
        return False
    if not any(not c.isalnum() for c in password): # Check for symbol
        return False
    return True


def register_user(username: str, email: str, 
                 security_question: str = "", security_answer: str = "") -> Dict[str, Any]:
    """
    Register a new user with auto-generated user_id based on arrival.
    
    Args:
        username: Username for display
        email: Email address (mandatory)
        security_question: Question for password recovery
        security_answer: Answer for password recovery
        
    Returns:
        Dictionary with user info and success status
    """
    _ensure_files_exist()
    
    # Email Validation
    if not email or not validate_email(email):
        return {'success': False, 'error': 'Invalid email format. Must have 3+ chars before @, 3+ chars after @, and valid extension.'}
    
    with file_lock:
        try:
            # Read existing users
            with open(USERS_FILE, 'r', encoding='utf-8') as f:
                try:
                    users = json.load(f)
                    if not isinstance(users, list):
                        users = []
                except json.JSONDecodeError:
                    users = []
            
            # Duplicate Check (Username and Email)
            for u in users:
                if u.get('username', '').lower() == username.lower():
                    return {'success': False, 'error': f"Username '{username}' is already taken."}
                if u.get('email', '').lower() == email.lower():
                    return {'success': False, 'error': f"Email '{email}' is already registered."}

            # Auto-generate User ID based on arrival (count)
            user_id = f"user_{len(users) + 1}"
            
            # Hash security answer if provided
            sec_answer_hash = None
            if security_answer:
                import hashlib
                sec_answer_hash = hashlib.sha256(security_answer.lower().strip().encode()).hexdigest()

            user_entry = {
                'user_id': user_id,
                'username': username,
                'email': email,
                'role': 'user',  # Default role
                'security_question': security_question,
                'security_answer_hash': sec_answer_hash,
                'created_at': datetime.now().isoformat(),
                'last_login': datetime.now().isoformat(),
                'total_logins': 1,
                'total_queries': 0,
                'average_rating': 0.0,
                'total_feedback_entries': 0
            }
            
            # Add new user
            users.append(user_entry)
            
            # Write back
            with open(USERS_FILE, 'w', encoding='utf-8') as f:
                json.dump(users, f, indent=4)
            
            logger.info(f"Successfully registered user {user_id}")
            return {'success': True, 'user_id': user_id, 'message': 'User registered successfully'}
            
        except Exception as e:
            logger.error(f"Failed to register user: {e}")
            return {'success': False, 'error': str(e)}


def log_user_activity(user_id: str, activity_type: str, query: str = "", 
                     language: str = "", rating: int = 0, comments: str = "", model_name: str = "", explanation: str = "", generated_code: str = "") -> bool:
    """
    Log user activity (query, feedback, etc).
    """
    _ensure_files_exist()
    
    activity_entry = {
        'timestamp': datetime.now().isoformat(),
        'user_id': user_id,
        'activity_type': activity_type,
        'query': query,
        'language': language,
        'rating': rating,
        'comments': comments,
        'model': model_name,
        'explanation': explanation,
        'generated_code': generated_code
    }
    
    with file_lock:
        try:
            # Read existing activities
            if os.path.exists(USER_ACTIVITY_FILE):
                with open(USER_ACTIVITY_FILE, 'r', encoding='utf-8') as f:
                    try:
                        activities = json.load(f)
                        if not isinstance(activities, list):
                            activities = []
                    except json.JSONDecodeError:
                        activities = []
            else:
                activities = []
            
            # Append new activity
            activities.append(activity_entry)
            
            # Write back
            with open(USER_ACTIVITY_FILE, 'w', encoding='utf-8') as f:
                json.dump(activities, f, indent=4)
            
            logger.info(f"Successfully logged activity for user {user_id}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to log activity: {e}")
            return False


def get_all_users() -> List[Dict[str, Any]]:
    """Get all registered users."""
    _ensure_files_exist()
    
    try:
        with open(USERS_FILE, 'r', encoding='utf-8') as f:
            try:
                users = json.load(f)
                if isinstance(users, list):
                    return users
            except json.JSONDecodeError:
                pass
    except Exception as e:
        logger.error(f"Failed to read users: {e}")
    
    return []


def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
    """Get user by username (case-insensitive)."""
    users = get_all_users()
    username = (username or '').strip().lower()
    return next((u for u in users if str(u.get('username','')).strip().lower() == username), None)


def _save_users(users: List[Dict[str, Any]]) -> bool:
    """Save users list to USERS_FILE atomically."""
    try:
        with open(USERS_FILE, 'w', encoding='utf-8') as f:
            json.dump(users, f, indent=4)
        return True
    except Exception as e:
        logger.error(f"Failed to save users: {e}")
        return False


def _hash_password(password: str, salt: Optional[bytes] = None) -> Dict[str, str]:
    """Return dict with hex salt and password hash using pbkdf2_hmac."""
    import hashlib, binascii, os
    if salt is None:
        salt = os.urandom(16)
    dk = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100_000)
    return {'salt': binascii.hexlify(salt).decode('ascii'), 'hash': binascii.hexlify(dk).decode('ascii')}


def set_password_for_user(user_id: str, password: str) -> Dict[str, Any]:
    """Set or replace a user's password (stores hash+salt)."""
    _ensure_files_exist()
    with file_lock:
        try:
            with open(USERS_FILE, 'r', encoding='utf-8') as f:
                try:
                    users = json.load(f)
                    if not isinstance(users, list):
                        users = []
                except json.JSONDecodeError:
                    users = []

            idx = next((i for i, u in enumerate(users) if u['user_id'] == user_id), None)
            if idx is None:
                return {'success': False, 'error': 'User not found'}

            ph = _hash_password(password)
            users[idx]['password_salt'] = ph['salt']
            users[idx]['password_hash'] = ph['hash']

            _save_users(users)
            logger.info(f"Password set for user {user_id}")
            return {'success': True}
        except Exception as e:
            logger.error(f"Failed to set password: {e}")
            return {'success': False, 'error': str(e)}


def register_user_with_password(username: str, password: str, email: str, 
                               security_question: str = "", security_answer: str = "") -> Dict[str, Any]:
    """Convenience: create user and set password atomically."""
    
    # Password Policy Check
    if not validate_password(password):
        return {'success': False, 'error': 'Password must be at least 8 characters long and contain letters, numbers, and symbols.'}
        
    res = register_user(username, email, security_question, security_answer)
    if not res.get('success'):
        return res
    pw_res = set_password_for_user(res['user_id'], password)
    if not pw_res.get('success'):
        return pw_res
    return {'success': True, 'user_id': res['user_id'], 'message': 'User registered successfully'}


def verify_user_password(user_identifier: str, password: str) -> Dict[str, Any]:
    """
    Verify a user's password. user_identifier may be user_id or username.
    Returns {'success': True, 'user_id': ..., 'role': ...} on success.
    """
    try:
        # Try user_id first
        user = get_user_by_id(user_identifier)
        if not user:
            user = get_user_by_username(user_identifier)
        if not user:
            return {'success': False, 'error': 'User not found'}

        salt_hex = user.get('password_salt')
        hash_hex = user.get('password_hash')
        if not salt_hex or not hash_hex:
            return {'success': False, 'error': 'Password not set for user'}

        import binascii, hashlib
        salt = binascii.unhexlify(salt_hex.encode('ascii'))
        attempt = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100_000)
        if binascii.hexlify(attempt).decode('ascii') == hash_hex:
            # update last_login and total_logins
            with file_lock:
                with open(USERS_FILE, 'r', encoding='utf-8') as f:
                    try:
                        users = json.load(f)
                    except json.JSONDecodeError:
                        users = []
                for u in users:
                    if u.get('user_id') == user.get('user_id'):
                        u['last_login'] = datetime.now().isoformat()
                        u['total_logins'] = u.get('total_logins', 0) + 1
                        break
                _save_users(users)

            return {'success': True, 'user_id': user.get('user_id'), 'role': user.get('role', 'user')}
        return {'success': False, 'error': 'Invalid password'}
    except Exception as e:
        logger.error(f"Error verifying password: {e}")
        return {'success': False, 'error': str(e)}


def send_otp_email(to_email: str, otp: str) -> bool:
    """Send OTP via SMTP."""
    if not SMTP_EMAIL or not SMTP_PASSWORD:
        logger.warning("SMTP credentials not set. Skipping email sending.")
        return False
        
    try:
        msg = MIMEMultipart()
        msg['From'] = SMTP_EMAIL
        msg['To'] = to_email
        msg['Subject'] = "Password Reset OTP - CodeGenie AI"
        
        body = f"Your OTP for password reset is: {otp}\\n\\nThis OTP is valid for 10 minutes."
        msg.attach(MIMEText(body, 'plain'))
        
        server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
        server.starttls()
        server.login(SMTP_EMAIL, SMTP_PASSWORD)
        text = msg.as_string()
        server.sendmail(SMTP_EMAIL, to_email, text)
        server.quit()
        logger.info(f"OTP sent to {to_email}")
        return True
    except Exception as e:
        logger.error(f"Failed to send email: {e}")
        return False


def generate_password_reset_otp(user_identifier: str, valid_minutes: int = 10) -> Dict[str, Any]:
    """Generate a 6-digit OTP for password reset."""
    try:
        user = get_user_by_id(user_identifier) or get_user_by_username(user_identifier)
        if not user:
            return {'success': False, 'error': 'User not found'}

        import random
        otp = str(random.randint(100000, 999999))
        expiry = (datetime.now() + timedelta(minutes=valid_minutes)).isoformat()

        with file_lock:
            with open(USERS_FILE, 'r', encoding='utf-8') as f:
                try:
                    users = json.load(f)
                except json.JSONDecodeError:
                    users = []
            for u in users:
                if u.get('user_id') == user.get('user_id'):
                    u['password_reset_otp'] = otp
                    u['password_reset_otp_expiry'] = expiry
                    break
            _save_users(users)

        # Try to send email
        email_sent = False
        if user.get('email'):
            email_sent = send_otp_email(user['email'], otp)

        # Return otp for development/testing, but indicate if email was sent
        return {'success': True, 'otp': otp, 'expiry': expiry, 'email_sent': email_sent}
    except Exception as e:
        logger.error(f"Failed to generate OTP: {e}")
        return {'success': False, 'error': str(e)}


def reset_password_with_otp(user_identifier: str, otp: str, new_password: str) -> Dict[str, Any]:
    """Verify OTP and reset the user's password."""
    try:
        user = get_user_by_id(user_identifier) or get_user_by_username(user_identifier)
        if not user:
            return {'success': False, 'error': 'User not found'}

        with file_lock:
            with open(USERS_FILE, 'r', encoding='utf-8') as f:
                try:
                    users = json.load(f)
                except json.JSONDecodeError:
                    users = []

            target = None
            for u in users:
                if u.get('user_id') == user.get('user_id'):
                    target = u
                    break

            if not target:
                return {'success': False, 'error': 'User not found during reset'}

            stored_otp = target.get('password_reset_otp')
            expiry = target.get('password_reset_otp_expiry')
            
            if not stored_otp or stored_otp != otp:
                return {'success': False, 'error': 'Invalid OTP'}
            if expiry and datetime.fromisoformat(expiry) < datetime.now():
                return {'success': False, 'error': 'OTP expired'}

            # Set new password
            ph = _hash_password(new_password)
            target['password_salt'] = ph['salt']
            target['password_hash'] = ph['hash']
            # Clear OTP
            target.pop('password_reset_otp', None)
            target.pop('password_reset_otp_expiry', None)

            _save_users(users)
            logger.info(f"Password reset for user {user.get('user_id')}")
            return {'success': True}
    except Exception as e:
        logger.error(f"Failed to reset password: {e}")
        return {'success': False, 'error': str(e)}


def verify_security_answer(user_identifier: str, answer: str) -> bool:
    """Verify security answer."""
    user = get_user_by_id(user_identifier) or get_user_by_username(user_identifier)
    if not user:
        return False
    
    stored_hash = user.get('security_answer_hash')
    if not stored_hash:
        return False
        
    import hashlib
    answer_hash = hashlib.sha256(answer.lower().strip().encode()).hexdigest()
    return answer_hash == stored_hash


def reset_password_with_security_question(user_identifier: str, answer: str, new_password: str) -> Dict[str, Any]:
    """Reset password using security question."""
    if verify_security_answer(user_identifier, answer):
        user = get_user_by_id(user_identifier) or get_user_by_username(user_identifier)
        return set_password_for_user(user['user_id'], new_password)
    else:
        return {'success': False, 'error': 'Incorrect security answer'}


def promote_user_to_admin(user_id: str) -> Dict[str, Any]:
    """Promote a user to admin role (max 2 admins)."""
    _ensure_files_exist()
    with file_lock:
        try:
            with open(USERS_FILE, 'r', encoding='utf-8') as f:
                users = json.load(f)
            
            # Count existing admins
            admin_count = sum(1 for u in users if u.get('role') == 'admin')
            if admin_count >= 2:
                return {'success': False, 'error': 'Maximum number of admins (2) reached.'}
            
            target = next((u for u in users if u['user_id'] == user_id), None)
            if not target:
                return {'success': False, 'error': 'User not found'}
            
            target['role'] = 'admin'
            _save_users(users)
            return {'success': True, 'message': f"User {user_id} promoted to admin."}
        except Exception as e:
            return {'success': False, 'error': str(e)}


def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]:
    """Get specific user information."""
    users = get_all_users()
    return next((u for u in users if u['user_id'] == user_id), None)


def get_user_activity(user_id: str = None) -> List[Dict[str, Any]]:
    """Get activity history for a specific user or all users."""
    _ensure_files_exist()
    try:
        with open(USER_ACTIVITY_FILE, 'r', encoding='utf-8') as f:
            try:
                activities = json.load(f)
                if isinstance(activities, list):
                    if user_id:
                        return [a for a in activities if a['user_id'] == user_id]
                    return activities
            except json.JSONDecodeError:
                pass
    except Exception as e:
        logger.error(f"Failed to read activities: {e}")
    return []


def replace_user(old_user_id: str, new_user_id: str, new_username: str, new_email: str = "") -> Dict[str, Any]:
    """Replace an old user with a new user (transfer/reassign)."""
    _ensure_files_exist()
    with file_lock:
        try:
            with open(USERS_FILE, 'r', encoding='utf-8') as f:
                users = json.load(f)
            
            user_idx = next((i for i, u in enumerate(users) if u['user_id'] == old_user_id), None)
            if user_idx is None:
                return {'success': False, 'error': f'User {old_user_id} not found'}
            
            old_user = users[user_idx]
            new_user = old_user.copy()
            new_user.update({
                'user_id': new_user_id,
                'username': new_username,
                'email': new_email,
                'last_login': datetime.now().isoformat()
            })
            
            users[user_idx] = new_user
            _save_users(users)
            
            # Update activities
            with open(USER_ACTIVITY_FILE, 'r', encoding='utf-8') as f:
                activities = json.load(f)
                for activity in activities:
                    if activity['user_id'] == old_user_id:
                        activity['user_id'] = new_user_id
            
            with open(USER_ACTIVITY_FILE, 'w', encoding='utf-8') as f:
                json.dump(activities, f, indent=4)
            
            return {'success': True, 'message': 'User replaced successfully'}
        except Exception as e:
            return {'success': False, 'error': str(e)}


def delete_user(user_id: str) -> Dict[str, Any]:
    """Delete a user from the system."""
    _ensure_files_exist()
    with file_lock:
        try:
            with open(USERS_FILE, 'r', encoding='utf-8') as f:
                users = json.load(f)
            
            users = [u for u in users if u['user_id'] != user_id]
            _save_users(users)
            return {'success': True, 'message': 'User deleted successfully'}
        except Exception as e:
            return {'success': False, 'error': str(e)}


def get_user_stats(user_id: str) -> Dict[str, Any]:
    """Get aggregated statistics for a specific user."""
    user = get_user_by_id(user_id)
    activities = get_user_activity(user_id)
    
    if not user:
        return {}
    
    queries = [a for a in activities if a['activity_type'] == 'query']
    feedbacks = [a for a in activities if a['activity_type'] == 'feedback']
    
    ratings = [a['rating'] for a in feedbacks if a['rating'] > 0]
    avg_rating = sum(ratings) / len(ratings) if ratings else 0.0
    
    return {
        **user,
        'total_queries': len(queries),
        'total_feedback': len(feedbacks),
        'average_rating': round(avg_rating, 2),
        'activities': {
            'queries': len(queries),
            'feedbacks': len(feedbacks),
            'logins': user.get('total_logins', 0)
        }
    }

if __name__ == "__main__":
    print("--- Testing User Management Module ---")


In [None]:
%%writefile backend/feedback_logger_module.py
import json
import os
import logging
from datetime import datetime
from typing import Dict, Any, Optional

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Helper to resolve log file path similar to admin module
CURRENT_FILE_DIR = os.path.dirname(os.path.abspath(__file__))
def _resolve_log_file(filename: str) -> str:
    candidates = []
    project_root = os.path.abspath(os.path.join(CURRENT_FILE_DIR, '..'))
    candidates.append(os.path.join(project_root, 'streamlit_app', filename))
    candidates.append(os.path.join(os.getcwd(), 'streamlit_app', filename))
    candidates.append(os.path.join(project_root, filename))
    candidates.append(os.path.join(CURRENT_FILE_DIR, filename))
    
    for path in candidates:
        p = os.path.normpath(os.path.abspath(path))
        if os.path.exists(p):
            return p
            
    # Default to streamlit_app/filename if not found
    return os.path.normpath(os.path.abspath(os.path.join(project_root, 'streamlit_app', filename)))

FEEDBACK_FILE = _resolve_log_file('feedback_log.json')

def log_feedback(user_id: str, query: str, rating: int, comments: str, feedback_type: str = "general", metadata: Dict[str, Any] = None) -> bool:
    """
    Logs user feedback to a JSON file.
    
    Args:
        user_id: The ID of the user providing feedback.
        query: The query or context associated with the feedback.
        rating: Integer rating (e.g., 1-5).
        comments: Text comments.
        feedback_type: Type of feedback (e.g., 'generation', 'explanation', 'general').
        metadata: Additional context (e.g., model used, language).
    """
    if metadata is None:
        metadata = {}

    feedback_entry = {
        "timestamp": datetime.now().isoformat(),
        "user_id": user_id,
        "query": query,
        "rating": rating,
        "comments": comments,
        "feedback_type": feedback_type,
        "metadata": metadata
    }

    try:
        # Load existing feedback
        if os.path.exists(FEEDBACK_FILE):
            with open(FEEDBACK_FILE, 'r', encoding='utf-8') as f:
                try:
                    data = json.load(f)
                    if not isinstance(data, list):
                        data = []
                except json.JSONDecodeError:
                    data = []
        else:
            data = []

        # Append new feedback
        data.append(feedback_entry)

        # Save back to file
        # Ensure directory exists
        os.makedirs(os.path.dirname(FEEDBACK_FILE), exist_ok=True)
        
        with open(FEEDBACK_FILE, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2)
            
        logger.info(f"Feedback logged for user {user_id}")
        return True

    except Exception as e:
        logger.error(f"Error logging feedback: {e}")
        raise e


In [None]:
%%writefile backend/user_history_module.py
import json
import os
import logging
from datetime import datetime
import threading

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

CURRENT_FILE_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_FILE_DIR, '..'))
STREAMLIT_APP_DIR = os.path.join(PROJECT_ROOT, 'streamlit_app')
HISTORY_FILE = os.path.join(STREAMLIT_APP_DIR, 'user_history.json')

file_lock = threading.Lock()

def log_user_query(user_id: str, query: str, language: str = "", generated_code: str = "", explanation: str = "", model_name: str = "") -> None:
    """
    Logs user interaction to history file.
    """
    entry = {
        "timestamp": datetime.now().isoformat(),
        "user_id": user_id,
        "query": query,
        "language": language,
        "generated_code": generated_code,
        "explanation": explanation,
        "model": model_name,
        "activity_type": "query"
    }
    
    with file_lock:
        try:
            # Write to local history file
            if os.path.exists(HISTORY_FILE):
                with open(HISTORY_FILE, 'r', encoding='utf-8') as f:
                    try:
                        data = json.load(f)
                        if not isinstance(data, list):
                            data = []
                    except json.JSONDecodeError:
                        data = []
            else:
                data = []
            
            data.append(entry)
            
            os.makedirs(os.path.dirname(HISTORY_FILE), exist_ok=True)
            with open(HISTORY_FILE, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=4)
                
            # Sync to user_activity.json via user_management_module
            try:
                import sys
                if PROJECT_ROOT not in sys.path:
                    sys.path.append(PROJECT_ROOT)
                from backend.user_management_module import log_user_activity
                
                log_user_activity(
                    user_id=user_id,
                    activity_type='query',
                    query=query,
                    language=language,
                    model_name=model_name,
                    explanation=explanation,
                    generated_code=generated_code,
                    comments="Interaction logged."
                )
            except Exception as e:
                logger.warning(f"Could not sync to user activity: {e}")

        except Exception as e:
            logger.error(f"Error logging history: {e}")


In [None]:
%%writefile backend/feedback_analysis_module.py
from typing import List, Tuple, Dict
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from PIL import Image, ImageDraw, ImageFont
import io
import hashlib
import random
import os
from pathlib import Path

# Avatar storage directory (relative to app root)
AVATAR_STORAGE_DIR = Path(__file__).resolve().parents[1] / 'streamlit_app' / 'avatars'
AVATAR_STORAGE_DIR.mkdir(parents=True, exist_ok=True)


def get_avatar_path(user_id: str) -> Path:
    """Get the full path to a user's uploaded avatar file."""
    return AVATAR_STORAGE_DIR / f"{user_id}_avatar.png"


def save_user_avatar(user_id: str, image_bytes: bytes) -> bool:
    """Save uploaded avatar image bytes to disk. Returns True if successful."""
    try:
        path = get_avatar_path(user_id)
        path.write_bytes(image_bytes)
        return True
    except Exception as e:
        print(f"Error saving avatar for {user_id}: {e}")
        return False


def load_user_avatar(user_id: str) -> Image.Image | None:
    """Load a user's avatar from disk if it exists. Returns PIL Image or None."""
    try:
        path = get_avatar_path(user_id)
        if path.exists():
            img = Image.open(path).convert('RGBA')
            return img
    except Exception as e:
        print(f"Error loading avatar for {user_id}: {e}")
    return None


def generate_wordcloud_image(texts: List[str], width: int = 800, height: int = 400,
                            min_font_size: int = 18, max_font_size: int = 300) -> Image.Image:
    """Generate a simple word-cloud-like PIL Image from texts without external C deps.

    This fallback creates a visual by placing the top words with sizes proportional
    to frequency. It is not as sophisticated as the `wordcloud` package but avoids
    compiled dependencies so it works on environments without build tools.
    """
    if not texts:
        return Image.new('RGB', (width, height), color=(240, 240, 240))

    # Basic tokenization and stopwords
    STOPWORDS = set([
        'the','and','is','in','to','of','a','for','it','on','this','that','with','as','are','was','but','be','or'
    ])

    words = {}
    for t in texts:
        if not isinstance(t, str):
            continue
        for token in t.lower().split():
            token = ''.join(ch for ch in token if ch.isalpha())
            if not token or len(token) < 3:
                continue
            if token in STOPWORDS:
                continue
            words[token] = words.get(token, 0) + 1

    if not words:
        return Image.new('RGB', (width, height), color=(240, 240, 240))

    # take top N words
    top = sorted(words.items(), key=lambda x: x[1], reverse=True)[:40]
    max_count = top[0][1]

    img = Image.new('RGB', (width, height), color='white')
    draw = ImageDraw.Draw(img)

    # Prefer DejaVu (bundled on most Linux distros) for consistent TTF rendering.
    font_paths = [
        '/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf',
        '/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf',
        'arial.ttf'
    ]
    base_font = None
    for fp in font_paths:
        try:
            base_font = ImageFont.truetype(fp, 20)
            break
        except Exception:
            continue
    if base_font is None:
        base_font = ImageFont.load_default()

    import random
    random.seed(42)
    positions = []
    for word, count in top:
        # scale font size between provided min and max so important words are much larger
        size = int(min_font_size + (count / max_count) * (max_font_size - min_font_size))
        # clamp
        size = max(min_font_size, min(size, max_font_size))
        # try to use the same preferred font family at requested size
        font = None
        for fp in font_paths:
            try:
                font = ImageFont.truetype(fp, size)
                break
            except Exception:
                continue
        if font is None:
            font = base_font
        # Use textbbox instead of textsize (deprecated in Pillow 10+)
        bbox = draw.textbbox((0, 0), word, font=font)
        w = bbox[2] - bbox[0]
        h = bbox[3] - bbox[1]
        # try random placement that doesn't overlap much
        attempts = 0
        placed = False
        while attempts < 200 and not placed:
            x = random.randint(0, max(0, width - w))
            y = random.randint(0, max(0, height - h))
            rect = (x, y, x + w, y + h)
            overlap = any(not (rect[2] < p[0] or rect[0] > p[2] or rect[3] < p[1] or rect[1] > p[3]) for p in positions)
            if not overlap:
                positions.append(rect)
                color = (random.randint(30, 180), random.randint(30, 180), random.randint(30, 180))
                draw.text((x, y), word, fill=color, font=font)
                placed = True
            attempts += 1

    return img


def analyze_sentiments(texts: List[str]) -> Dict[str, object]:
    """Analyze list of texts and return sentiment summary.

    Returns dict: {count, pos, neu, neg, avg_compound}
    """
    analyzer = SentimentIntensityAnalyzer()
    total = 0
    pos = neu = neg = 0
    compounds = []
    for t in texts:
        if not isinstance(t, str) or not t.strip():
            continue
        scores = analyzer.polarity_scores(t)
        compounds.append(scores.get('compound', 0.0))
        c = scores.get('compound', 0.0)
        if c >= 0.05:
            pos += 1
        elif c <= -0.05:
            neg += 1
        else:
            neu += 1
        total += 1

    avg = sum(compounds) / len(compounds) if compounds else 0.0
    return {
        'total': total,
        'positive': pos,
        'neutral': neu,
        'negative': neg,
        'avg_compound': avg,
    }


def _make_circular(img: Image.Image, size: int) -> Image.Image:
    """Return a circular-cropped RGBA image of given size."""
    img = img.convert('RGBA').resize((size, size), resample=Image.LANCZOS)
    mask = Image.new('L', (size, size), 0)
    draw = ImageDraw.Draw(mask)
    draw.ellipse((0, 0, size - 1, size - 1), fill=255)
    out = Image.new('RGBA', (size, size), (0, 0, 0, 0))
    out.paste(img, (0, 0), mask=mask)
    return out


def generate_avatar_image(name: str, size: int = 80, email: str | None = None) -> Image.Image:
    """Generate an avatar image.

    Behavior:
    - If `email` is provided and a Gravatar image is available, fetch and use it.
    - Otherwise, generate an initials image with deterministic background color.
    - Always return a circular RGBA image of `size` x `size`.
    """
    # Try Gravatar when email provided
    if email and isinstance(email, str) and '@' in email:
        try:
            import requests
            em = email.strip().lower().encode('utf-8')
            h = hashlib.md5(em).hexdigest()
            url = f'https://www.gravatar.com/avatar/{h}?s={size}&d=404&r=g'
            resp = requests.get(url, timeout=5)
            if resp.status_code == 200:
                try:
                    gimg = Image.open(io.BytesIO(resp.content)).convert('RGBA')
                    return _make_circular(gimg, size)
                except Exception:
                    pass
        except Exception:
            # network or requests not available: fall back to generated avatar
            pass

    # Fallback: generate initials avatar
    initials = "?"
    try:
        parts = (name or '').strip().split()
        if len(parts) == 0:
            initials = "?"
        elif len(parts) == 1:
            initials = parts[0][:2].upper()
        else:
            initials = (parts[0][0] + parts[-1][0]).upper()
    except Exception:
        initials = "?"

    # deterministic color from name hash
    seed_str = (email or name or '')
    hsh = hashlib.md5(seed_str.encode('utf-8')).hexdigest()
    rand = int(hsh[:8], 16)
    random.seed(rand)
    r = random.randint(40, 200)
    g = random.randint(40, 200)
    b = random.randint(40, 200)

    img = Image.new('RGB', (size, size), color=(r, g, b))
    draw = ImageDraw.Draw(img)

    # Choose a font size relative to image; use a moderate scale so initials stay inside the circle
    font_size = int(size * 0.45)
    # prefer the regular DejaVu Sans (not bold) for better fit, then fallbacks
    font = None
    font_paths = [
        '/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf',
        '/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf',
        'arial.ttf'
    ]
    for fp in font_paths:
        try:
            font = ImageFont.truetype(fp, font_size)
            break
        except Exception:
            continue
    if font is None:
        font = ImageFont.load_default()

    # Use textbbox instead of textsize (deprecated in Pillow 10+)
    bbox = draw.textbbox((0, 0), initials, font=font)
    w = bbox[2] - bbox[0]
    h_text = bbox[3] - bbox[1]
    text_x = (size - w) / 2
    text_y = (size - h_text) / 2
    draw.text((text_x, text_y), initials, fill='white', font=font)

    return _make_circular(img, size)


def pil_image_to_bytes(img: Image.Image, fmt: str = 'PNG') -> bytes:
    buf = io.BytesIO()
    img.save(buf, format=fmt)
    buf.seek(0)
    return buf.read()


In [None]:
%%writefile backend/admin_dashboard_module.py
# -*- coding: utf-8 -*-
"""Admin Dashboard Module

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1v2lo141LBE6Pf0rWSM673nvuhu6zfjX0
"""

import pandas as pd
import json
import logging
import os
from datetime import datetime
import ast
import re
from typing import Dict, List, Any

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Get the directory where this module lives
CURRENT_FILE_DIR = os.path.dirname(os.path.abspath(__file__))


# Helper: try multiple candidate locations for the log files so the dashboard
# works regardless of current working directory or how Streamlit was launched.
def _resolve_log_file(filename: str) -> str:
    """Return an existing path for filename by trying several common locations.

    Order of attempts:
    1) <repo_root>/streamlit_app/<filename>
    2) <cwd>/streamlit_app/<filename>
    3) next to this module (backend/<filename>)
    4) as provided (fallback)
    """
    candidates = []

    # Determine the most likely project root: parent of the backend folder
    project_root = os.path.abspath(os.path.join(CURRENT_FILE_DIR, '..'))

    # Candidate 1: <project_root>/streamlit_app/<filename>
    candidates.append(os.path.join(project_root, 'streamlit_app', filename))

    # Candidate 2: current working dir + streamlit_app
    candidates.append(os.path.join(os.getcwd(), 'streamlit_app', filename))

    # Candidate 3: <project_root>/<filename> (in case files live at repo root)
    candidates.append(os.path.join(project_root, filename))

    # Candidate 4: next to this module (backend/<filename>)
    candidates.append(os.path.join(CURRENT_FILE_DIR, filename))

    # Normalize and try each candidate
    tried = []
    for path in candidates:
        p = os.path.normpath(os.path.abspath(path))
        tried.append(p)
        if os.path.exists(p):
            logger.info(f"Resolved log file for '{filename}' -> {p}")
            return p

    # As a last resort, also try the explicit streamlit_app path that the UI shows
    explicit = os.path.normpath(os.path.abspath(os.path.join(project_root, 'streamlit_app', filename)))
    tried.append(explicit)
    if os.path.exists(explicit):
        logger.info(f"Resolved log file for '{filename}' -> {explicit}")
        return explicit

    logger.warning(f"Could not find existing file for '{filename}'. Tried: {tried}")
    # Return the most likely path (project_root/streamlit_app/<filename>) as a fallback
    return explicit


# Define file paths by resolving candidates
FEEDBACK_FILE = _resolve_log_file('feedback_log.json')
HISTORY_FILE = _resolve_log_file('user_history.json')
USERS_FILE = _resolve_log_file('users.json')

def _load_data(filename: str) -> pd.DataFrame:
    """Helper function to load a JSON log file into a DataFrame."""
    logger.info(f"Loading data from {filename}")
    logger.info(f"File exists: {os.path.exists(filename)}")
    try:
        # Read the JSON file first
        with open(filename, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        # Convert the JSON array to DataFrame
        if not isinstance(data, list):
            logger.warning(f"{filename} does not contain a JSON array.")
            return pd.DataFrame()
            
        if not data:  # Empty list
            logger.warning(f"{filename} contains an empty array.")
            return pd.DataFrame()
            
        # Convert the JSON array to DataFrame with explicit dtypes
        df = pd.DataFrame.from_records(data)
        
        # Convert timestamp strings to datetime objects
        if 'timestamp' in df.columns:
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
        return df
    except FileNotFoundError:
        logger.warning(f"File not found: {filename}. Returning empty DataFrame.")
        return pd.DataFrame()
    except json.JSONDecodeError as e:
        logger.warning(f"Could not parse {filename} (it might be empty or malformed): {e}")
        return pd.DataFrame()
    except Exception as e:
        logger.error(f"An unexpected error occurred loading {filename}: {e}")
        return pd.DataFrame()

def get_dashboard_stats() -> dict:
    """
    Reads from log files and computes dashboard statistics.

    Returns:
        A dictionary containing:
        - average_rating: Mean of all ratings (float)
        - total_feedback: Total number of feedback entries (int)
        - top_queries: Dict of top 5 trending queries and their counts
        - language_stats: Dict of language usage and their counts
        - total_queries: Total number of queries processed (int)
    """
    try:
        feedback_df = _load_data(FEEDBACK_FILE)
        history_df = _load_data(HISTORY_FILE)

        stats = {
            "average_rating": 0.0,
            "total_feedback": 0,
            "top_queries": {},
            "language_stats": {},
            "total_queries": 0
        }

        # Add a code_quality placeholder to return derived heuristics
        stats['code_quality'] = {
            'execution_success_pct': 0.0,
            'execution_failure_count': 0,
            'syntax': {
                'python_checked': 0,
                'python_syntax_ok': 0,
                'pct_ok': 0.0,
            },
            'runtime': {
                'Good': 0,
                'Average': 0,
                'Poor': 0,
            }
        }

        # 1. Average Feedback Rating
        if not feedback_df.empty and 'rating' in feedback_df.columns:
            stats['average_rating'] = float(round(feedback_df['rating'].mean(), 2))
            stats['total_feedback'] = int(feedback_df['rating'].count())

        # 2. Top 5 Trending Queries
        if not history_df.empty and 'query' in history_df.columns:
            # Standardize queries for better grouping
            queries = history_df['query'].str.lower().str.strip()
            stats['top_queries'] = {
                str(k): int(v) 
                for k, v in queries.value_counts().nlargest(5).items()
            }
            stats['total_queries'] = int(history_df.shape[0])
            # --- NEW: Model Evaluation Stats ---
            model_ratings = {}
            # Extract ratings from feedback
            if not feedback_df.empty:
                for _, row in feedback_df.iterrows():
                    model = "Unknown"
                    meta = row.get('metadata')
                    if isinstance(meta, dict):
                        model = meta.get('model', 'Unknown')
                    
                    if model not in model_ratings:
                        model_ratings[model] = []
                    if 'rating' in row:
                        model_ratings[model].append(row['rating'])
            
            stats['model_stats'] = {}
            for m, rs in model_ratings.items():
                if rs:
                    stats['model_stats'][m] = {
                        'avg_rating': float(round(sum(rs)/len(rs), 2)),
                        'feedback_count': len(rs)
                    }

            # Extract usage from history
            if not history_df.empty and 'model' in history_df.columns:
                 usage = history_df['model'].value_counts().to_dict()
                 # FILTER: Only show top 3 models (gemma, deepseek, phi-2)
                 allowed_models = ['gemma', 'deepseek', 'phi-2']
                 
                 for m, count in usage.items():
                     m_str = str(m).lower()
                     if m_str in allowed_models:
                         if m_str not in stats['model_stats']:
                             stats['model_stats'][m_str] = {'avg_rating': 0.0, 'feedback_count': 0}
                         stats['model_stats'][m_str]['usage_count'] = int(count)


            # 4. Code quality heuristics (derived from history entries)
            try:
                total = history_df.shape[0]
                failures = 0
                py_checked = 0
                py_ok = 0
                runtime_counts = {'Good': 0, 'Average': 0, 'Poor': 0}

                for idx, row in history_df.iterrows():
                    code = str(row.get('generated_code', '') or '')
                    lang = str(row.get('language', '') or '').strip().lower()

                    # Execution success heuristic: mark as failure if the generated
                    # code looks like an error/traceback or starts with 'Error:'
                    if re.search(r"\b(Error:|Traceback|Exception|SyntaxError)\b", code, re.IGNORECASE):
                        failures += 1

                    # Syntax correctness: only attempt for Python snippets
                    if lang == 'python':
                        py_checked += 1
                        try:
                            # Use ast.parse to detect syntax errors without executing
                            ast.parse(code)
                            py_ok += 1
                        except Exception:
                            # syntax error detected
                            pass

                    # Runtime performance heuristic: naive proxy using code size
                    # Shorter snippets are considered likely 'Good', medium 'Average', long 'Poor'
                    lines = code.count('\n') + 1 if code else 0
                    if lines <= 20:
                        runtime_counts['Good'] += 1
                    elif lines <= 100:
                        runtime_counts['Average'] += 1
                    else:
                        runtime_counts['Poor'] += 1

                # Populate stats
                stats['code_quality']['execution_failure_count'] = int(failures)
                stats['code_quality']['execution_success_pct'] = float(round((1 - (failures / total)) * 100, 2)) if total > 0 else 0.0
                stats['code_quality']['syntax']['python_checked'] = int(py_checked)
                stats['code_quality']['syntax']['python_syntax_ok'] = int(py_ok)
                stats['code_quality']['syntax']['pct_ok'] = float(round((py_ok / py_checked) * 100, 2)) if py_checked > 0 else 0.0
                stats['code_quality']['runtime'] = runtime_counts
            except Exception as e:
                logger.warning(f"Failed to compute code quality heuristics: {e}")

        # 3. Language Usage Stats
        if not history_df.empty and 'language' in history_df.columns:
            # Normalize language names (e.g., 'python' -> 'Python'), fill missing
            langs = history_df['language'].fillna('Unknown').astype(str).str.strip()
            langs = langs.replace({'': 'Unknown'})
            langs = langs.str.title()
            stats['language_stats'] = {
                str(k): int(v)
                for k, v in langs.value_counts().items()
            }

        logger.info("Successfully computed dashboard stats.")
        return stats
    except Exception as e:
        logger.error(f"Error computing dashboard stats: {e}")
        return {
            "average_rating": 0.0,
            "total_feedback": 0,
            "top_queries": {},
            "language_stats": {},
            "total_queries": 0
        }


def global_search(query: str) -> Dict[str, List[Dict[str, Any]]]:
    """
    Search across all users, history, and feedback.
    
    Args:
        query: Search term
        
    Returns:
        Dictionary with lists of matches for 'users', 'history', 'feedback'
    """
    results = {
        'users': [],
        'history': [],
        'feedback': []
    }
    
    if not query:
        return results
        
    query = query.lower()
    
    # Search Users
    try:
        users_df = _load_data(USERS_FILE)
        if not users_df.empty:
            # Search in username, email, user_id
            matches = users_df[
                users_df['username'].str.lower().str.contains(query, na=False) |
                users_df['email'].str.lower().str.contains(query, na=False) |
                users_df['user_id'].str.lower().str.contains(query, na=False)
            ]
            results['users'] = matches.to_dict('records')
    except Exception as e:
        logger.error(f"Search users failed: {e}")

    # Search History
    try:
        history_df = _load_data(HISTORY_FILE)
        if not history_df.empty:
            # Search in query, generated_code, language
            matches = history_df[
                history_df['query'].str.lower().str.contains(query, na=False) |
                history_df['generated_code'].str.lower().str.contains(query, na=False) |
                history_df['language'].str.lower().str.contains(query, na=False)
            ]
            results['history'] = matches.to_dict('records')
    except Exception as e:
        logger.error(f"Search history failed: {e}")

    # Search Feedback
    try:
        feedback_df = _load_data(FEEDBACK_FILE)
        if not feedback_df.empty:
            # Search in comments, query
            matches = feedback_df[
                feedback_df['comments'].str.lower().str.contains(query, na=False) |
                feedback_df['query'].str.lower().str.contains(query, na=False)
            ]
            results['feedback'] = matches.to_dict('records')
    except Exception as e:
        logger.error(f"Search feedback failed: {e}")
        
    return results


In [None]:
%%writefile backend/code_generator_module.py
import logging
import torch
from backend.model_loader import get_model

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def generate_code(prompt: str, language: str, model_name: str = "gemma") -> str:
    model, tokenizer = get_model(model_name)
    if not model or not tokenizer:
        return "Error: Model not loaded. Please check logs."

    try:
        device = "cuda" if torch.cuda.is_available() else "cpu"
        
        # Format prompt based on model
        if model_name == 'gemma':
            messages = [{"role": "user", "content": f"Write {language} code for:\n{prompt}"}]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif model_name == 'deepseek':
            messages = [{"role": "user", "content": f"You are an expert coding assistant. Write {language} code for: {prompt}"}]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif model_name == 'phi-2':
            # Phi-2 format: Instruct: <prompt>\nOutput:
            formatted_prompt = f"Instruct: Write {language} code for {prompt}\nOutput:"
        else:
            formatted_prompt = f"Generate {language} code: {prompt}"

        inputs = tokenizer(formatted_prompt, return_tensors="pt").to(device)

        with torch.no_grad():
            outputs = model.generate(
                **inputs, 
                max_new_tokens=300, 
                do_sample=True, 
                temperature=0.2,
                pad_token_id=tokenizer.eos_token_id
            )

        text = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
        return text.strip()

    except Exception as e:
        logger.error(f"Error generating code: {e}")
        return f"Error: {str(e)}"


In [None]:
%%writefile backend/code_explainer_module.py
import logging
import torch
from backend.model_loader import get_model

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def explain_code(code: str, style: str, model_name: str = "deepseek") -> str:
    model, tokenizer = get_model(model_name)
    if not model or not tokenizer:
        return "Error: Model not loaded."

    try:
        device = "cuda" if torch.cuda.is_available() else "cpu"
        
        # Enhanced prompts based on style
        style_instruction = ""
        if style == "Beginner-Friendly":
            style_instruction = "Explain this code simply for a beginner. Avoid jargon."
        elif style == "Technical Deep-Dive":
            style_instruction = "Provide a detailed technical analysis of logic, complexity, and potential optimizations."
        elif style == "Step-by-Step Guide":
            style_instruction = "Explain the code execution flow step-by-step."
        elif style == "Real-World Examples":
            style_instruction = "Explain what this code does and provide a real-world usage scenario."
        elif style == "Bullet Points":
            style_instruction = "Summarize the key functionality in concise bullet points."
        else:
            style_instruction = f"Explain this {style} code."

        prompt_content = f"{style_instruction}\n\nCode:\n{code}\n\nExplanation:"
        
        if model_name == 'gemma':
            messages = [{"role": "user", "content": prompt_content}]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif model_name == 'deepseek':
            messages = [{"role": "user", "content": prompt_content}]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif model_name == 'phi-2':
            # Phi-2 Instruct format
            formatted_prompt = f"Instruct: {prompt_content}\nOutput:"
        else:
            formatted_prompt = prompt_content

        inputs = tokenizer(formatted_prompt, return_tensors="pt").to(device)

        with torch.no_grad():
            outputs = model.generate(
                **inputs, 
                max_new_tokens=1024,  # Increased from 250 to avoid truncation
                do_sample=True, 
                temperature=0.7,
                pad_token_id=tokenizer.eos_token_id
            )

        text = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
        return text.strip()

    except Exception as e:
        logger.error(f"Error explaining code: {e}")
        return f"Error: {str(e)}"


In [None]:
%%writefile backend/chatbot_module.py
import logging
import torch
from backend.model_loader import get_model
from backend.user_management_module import get_user_activity

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def chat_with_ai(user_id: str, user_query: str, model_name: str = "gemma") -> str:
    """
    Chat with the AI assistant using user history as context.
    RESTRICTED: Does not generate code.
    """
    model, tokenizer = get_model(model_name)
    if not model or not tokenizer:
        return "Error: Model not loaded."

    try:
        device = "cuda" if torch.cuda.is_available() else "cpu"
        
        # 1. Retrieve Context (Last 5 activities)
        activities = get_user_activity(user_id)
        context_str = ""
        if activities:
            recent = activities[-5:]
            context_str = "User History Context:\n"
            for act in recent:
                atype = act.get('activity_type', 'unknown')
                details = act.get('query', '') or act.get('comments', '')
                context_str += f"- {atype}: {details[:100]}...\n"
        
        # 2. Construct Prompt (RESTRICTIVE)
        system_prompt = (
            "You are CodeGenie's helpful AI Assistant. "
            "Your role is STRICTLY to help the user navigate the application and understand their usage history. "
            "You must NOT generate code snippets or answer general programming questions. "
            "If asked to write code, politely refuse and guide them to use the 'CodeGenie' code generation feature in the sidebar. "
            "Be concise and friendly.\n\n"
        )
        
        full_prompt = f"{system_prompt}{context_str}\nUser: {user_query}\nAssistant:"
        
        # 3. Generate
        if model_name == 'gemma':
            messages = [
                {"role": "user", "content": full_prompt}
            ]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif model_name == 'deepseek':
             messages = [
                {"role": "user", "content": full_prompt}
            ]
             formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif model_name == 'phi-2':
            formatted_prompt = f"Instruct: {full_prompt}\nOutput:"
        else:
            formatted_prompt = full_prompt

        inputs = tokenizer(formatted_prompt, return_tensors="pt").to(device)

        with torch.no_grad():
            outputs = model.generate(
                **inputs, 
                max_new_tokens=200, 
                do_sample=True, 
                temperature=0.7,
                pad_token_id=tokenizer.eos_token_id
            )

        text = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
        return text.strip()

    except Exception as e:
        logger.error(f"Error in chat_with_ai: {e}")
        return f"I encountered an error: {str(e)}"


In [None]:
## Step 5: Streamlit App

In [None]:
%%writefile streamlit_app/style.css
/* Import Google Font */
@import url('https://fonts.googleapis.com/css2?family=Outfit:wght@300;400;500;600;700&family=Inter:wght@300;400;600&display=swap');

:root {
    --primary-gradient: linear-gradient(135deg, #6366f1 0%, #a855f7 50%, #ec4899 100%);
    --glass-bg: rgba(30, 41, 59, 0.7);
    --glass-border: rgba(255, 255, 255, 0.08);
    --text-primary: #f8fafc;
    --text-secondary: #94a3b8;
}

html, body, [class*="css"] {
    font-family: 'Outfit', 'Inter', sans-serif;
}

/* Modern Dark Gradient Background */
.stApp {
    background-color: #0f172a;
    background-image: 
        radial-gradient(at 0% 0%, rgba(99, 102, 241, 0.15) 0px, transparent 50%),
        radial-gradient(at 100% 0%, rgba(236, 72, 153, 0.15) 0px, transparent 50%),
        radial-gradient(at 100% 100%, rgba(168, 85, 247, 0.15) 0px, transparent 50%),
        radial-gradient(at 0% 100%, rgba(56, 189, 248, 0.15) 0px, transparent 50%);
    background-attachment: fixed;
}

/* Glassmorphism Containers */
[data-testid="stForm"], [data-testid="stVerticalBlockBorderWrapper"] > div {
    background-color: var(--glass-bg) !important;
    backdrop-filter: blur(16px);
    -webkit-backdrop-filter: blur(16px);
    border: 1px solid var(--glass-border) !important;
    border-radius: 16px !important;
    box-shadow: 0 4px 30px rgba(0, 0, 0, 0.1);
    transition: transform 0.2s ease, box-shadow 0.2s ease;
}

[data-testid="stVerticalBlockBorderWrapper"] > div:hover {
    box-shadow: 0 10px 40px rgba(0, 0, 0, 0.2);
    border-color: rgba(255, 255, 255, 0.15) !important;
}

/* Sidebar */
[data-testid="stSidebar"] {
    background-color: rgba(15, 23, 42, 0.98) !important;
    border-right: 1px solid var(--glass-border);
}

/* Buttons - Pill Shaped & Sleek */
.stButton > button {
    background: var(--primary-gradient);
    color: white !important;
    border: none !important;
    border-radius: 50px !important; /* Pill shape */
    padding: 0.6rem 1.5rem !important;
    font-weight: 600 !important;
    letter-spacing: 0.8px;
    text-transform: uppercase;
    font-size: 0.85rem !important;
    transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important;
    box-shadow: 0 4px 15px rgba(99, 102, 241, 0.3) !important;
}

.stButton > button:hover {
    transform: translateY(-2px) scale(1.02);
    box-shadow: 0 8px 25px rgba(99, 102, 241, 0.4) !important;
    filter: brightness(1.1);
}

.stButton > button:active {
    transform: translateY(0) scale(0.98);
}

/* Secondary Buttons */
button[kind="secondary"] {
    background: transparent !important;
    border: 1px solid rgba(255, 255, 255, 0.2) !important;
    box-shadow: none !important;
    border-radius: 50px !important;
}
button[kind="secondary"]:hover {
    border-color: #ef4444 !important;
    color: #ef4444 !important;
    background: rgba(239, 68, 68, 0.1) !important;
}

/* Typography */
h1, h2, h3 {
    background: linear-gradient(to right, #e0e7ff, #c084fc);
    -webkit-background-clip: text;
    -webkit-text-fill-color: transparent;
    font-weight: 700 !important;
    letter-spacing: -0.5px;
}

/* Inputs */
.stTextInput > div > div > input, 
.stTextArea > div > div > textarea, 
.stSelectbox > div > div > div {
    background-color: rgba(15, 23, 42, 0.6) !important;
    color: var(--text-primary) !important;
    border: 1px solid rgba(255, 255, 255, 0.1) !important;
    border-radius: 12px !important;
    transition: all 0.2s ease;
}

.stTextInput > div > div > input:focus, 
.stTextArea > div > div > textarea:focus {
    border-color: #8b5cf6 !important;
    box-shadow: 0 0 0 2px rgba(139, 92, 246, 0.2) !important;
    background-color: rgba(15, 23, 42, 0.8) !important;
}

/* Metrics Cards */
[data-testid="stMetric"] {
    background-color: rgba(255, 255, 255, 0.03);
    padding: 15px;
    border-radius: 12px;
    border: 1px solid rgba(255, 255, 255, 0.05);
}

/* Chat Messages */
.stChatMessage {
    background-color: rgba(30, 41, 59, 0.4) !important;
    border: 1px solid rgba(255, 255, 255, 0.05);
    border-radius: 12px;
}

[data-testid="stChatMessageAvatarUser"] {
    background-color: #3b82f6 !important;
}

[data-testid="stChatMessageAvatarAssistant"] {
    background: var(--primary-gradient) !important;
}


In [None]:
%%writefile streamlit_app/app.py
# -*- coding: utf-8 -*-
"""Streamlit AI Assistant App

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1FCQ6I4E0exthH-GDwhRDyReEWOaEHkwU
"""

import streamlit as st
import pandas as pd
import uuid
import os
import json
from typing import Dict, Any
from dotenv import load_dotenv
from PIL import Image

# Use matplotlib for plotting if available; avoid st.bar_chart which imports Altair
try:
    import matplotlib.pyplot as plt
    _HAS_MATPLOTLIB = True
except Exception:
    _HAS_MATPLOTLIB = False

# Load environment variables
load_dotenv()

# Verify HF Token is loaded
if not os.getenv("HF_TOKEN"):
    st.error("HF_TOKEN not found in environment variables. Please check your .env file.")
    st.stop()

# Import the backend functions from your modules
import sys
import os

# Add the parent directory to Python path so we can import from backend
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(backend_dir)

try:
    from backend.code_generator_module import generate_code
    from backend.code_explainer_module import explain_code
    from backend.feedback_logger_module import log_feedback
    from backend.user_history_module import log_user_query
    from backend.admin_dashboard_module import get_dashboard_stats, global_search
    from backend.user_management_module import (
        register_user, register_user_with_password, set_password_for_user,
        verify_user_password, generate_password_reset_otp, reset_password_with_otp,
        reset_password_with_security_question, promote_user_to_admin,
        get_all_users, get_user_by_id, get_user_by_username, get_user_activity,
        get_user_stats, replace_user, log_user_activity, delete_user
    )
    from backend.chatbot_module import chat_with_ai
    from backend.feedback_analysis_module import (
        generate_wordcloud_image, analyze_sentiments,
        generate_avatar_image, pil_image_to_bytes,
        save_user_avatar, load_user_avatar, get_avatar_path
    )
    from backend.jwt_utils import create_access_token, verify_token
except ImportError as e:
    st.error(f"Failed to import backend modules. Make sure all .py files are in the same directory. Error: {e}")
    st.stop()

# --- Page Configuration ---
st.set_page_config(
    page_title="CodeGenie AI",
    page_icon="🤖",
    layout="wide",
)

# --- Load Custom CSS ---
def local_css(file_name):
    with open(file_name) as f:
        st.markdown(f'<style>{f.read()}</style>', unsafe_allow_html=True)

try:
    local_css("streamlit_app/style.css")
except FileNotFoundError:
    st.warning("Style file not found. Using default theme.")

# --- Session State Initialization ---
# Initialize session state variables for authentication and app state
if 'token' not in st.session_state:
    st.session_state.token = None
if 'user_id' not in st.session_state:
    st.session_state.user_id = None
if 'username' not in st.session_state:
    st.session_state.username = None
if 'role' not in st.session_state:
    st.session_state.role = None
if 'generated_code' not in st.session_state:
    st.session_state.generated_code = ""
if 'explanation' not in st.session_state:
    st.session_state.explanation = ""
if 'last_query' not in st.session_state:
    st.session_state.last_query = ""
if 'last_language' not in st.session_state:
    st.session_state.last_language = ""

# --- Authentication Helper ---
def authenticate_user(token):
    payload = verify_token(token)
    if payload:
        return payload
    return None

# Check token validity on reload
if st.session_state.token:
    payload = authenticate_user(st.session_state.token)
    if not payload:
        st.session_state.token = None
        st.session_state.user_id = None
        st.session_state.role = None
        st.warning("Session expired. Please login again.")

# --- Authentication gate: require login/signup before using the app ---
if not st.session_state.token:
    # Show ONLY login/signup page - NO SIDEBAR
    st.title("Welcome to CodeGenie — please sign in or register")
    auth_tabs = st.tabs(["Login", "Sign up", "Forgot Password"])

    with auth_tabs[0]:
        st.subheader("Login")
        with st.form("login_form"):
            identifier = st.text_input("Username")
            password = st.text_input("Password", type="password")
            if st.form_submit_button("Login"):
                if not identifier or not password:
                    st.warning("Please provide User ID/Username and password.")
                else:
                    res = verify_user_password(identifier, password)
                    if res.get('success'):
                        # Generate JWT
                        user_id = res.get('user_id')
                        role = res.get('role', 'user')
                        token = create_access_token({"sub": user_id, "role": role})
                        
                        st.session_state.token = token
                        st.session_state.user_id = user_id
                        st.session_state.role = role
                        
                        user = get_user_by_id(user_id)
                        st.session_state.username = user.get('username') if user else None
                        try:
                            log_user_activity(st.session_state.user_id, 'login')
                        except Exception:
                            pass
                        st.success("Logged in successfully")
                        st.rerun()
                    else:
                        st.error(res.get('error', 'Login failed'))

    with auth_tabs[1]:
        st.subheader("Sign up")
        with st.form("signup_form"):
            # new_user_id = st.text_input("User ID:") # Removed
            new_username = st.text_input("Username:")
            new_email = st.text_input("Email:")
            new_password = st.text_input("Password:", type="password")
            confirm = st.text_input("Confirm Password:", type="password")
            st.markdown("---")
            st.write("**Security Question (for password recovery)**")
            sec_q = st.selectbox("Select a question:", [
                "What is your mother's maiden name?",
                "What was the name of your first pet?",
                "What city were you born in?",
                "What is your favorite food?"
            ])
            sec_a = st.text_input("Answer:")
            
            if st.form_submit_button("Sign up"):
                if not new_username or not new_password or not new_email:
                    st.warning("Please provide User ID, Username and Password.")
                elif new_password != confirm:
                    st.warning("Passwords do not match.")
                elif not sec_a:
                    st.warning("Please provide a security answer.")
                else:
                    res = register_user_with_password(
                        new_username, new_password, new_email,
                        security_question=sec_q, security_answer=sec_a
                    )
                    if res.get('success'):
                        # Auto login
                        new_user_id = res.get('user_id')
                        token = create_access_token({"sub": new_user_id, "role": "user"})
                        st.session_state.token = token
                        st.session_state.user_id = new_user_id
                        st.session_state.role = "user"
                        st.session_state.username = new_username
                        try:
                            log_user_activity(new_user_id, 'login')
                        except Exception:
                            pass
                        st.success("Account created and logged in")
                        st.rerun()
                    else:
                        st.error(res.get('error', 'Signup failed'))

    with auth_tabs[2]:
        st.subheader("Forgot Password")
        recovery_method = st.radio("Recovery Method:", ["OTP (Email)", "Security Question"])
        
        if recovery_method == "OTP (Email)":
            with st.form("forgot_otp_form"):
                fid = st.text_input("User ID or Username")
                if st.form_submit_button("Send OTP"):
                    if not fid:
                        st.warning("Please provide User ID or Username.")
                    else:
                        otp_res = generate_password_reset_otp(fid)
                        if otp_res.get('success'):
                            msg = "OTP generated."
                            if otp_res.get('email_sent'):
                                msg += " Sent to your registered email."
                            else:
                                msg += " (Email not sent - SMTP not configured)."
                            
                            st.info(msg)
                            # In dev, show OTP
                            st.write(f"**DEV ONLY - OTP:** {otp_res.get('otp')}")
                        else:
                            st.error(otp_res.get('error', 'Could not generate OTP'))
            
            st.markdown("---")
            with st.form("reset_otp_final"):
                fid2 = st.text_input("User ID or Username (again)")
                otp_input = st.text_input("OTP")
                new_pw = st.text_input("New password", type="password")
                if st.form_submit_button("Reset Password"):
                    if reset_password_with_otp(fid2, otp_input, new_pw).get('success'):
                        st.success("Password reset successful. Please login.")
                    else:
                        st.error("Reset failed. Invalid OTP or User.")
                        
        else: # Security Question
            with st.form("forgot_sec_form"):
                fid3 = st.text_input("User ID or Username")
                sec_ans_input = st.text_input("Security Answer")
                new_pw_sec = st.text_input("New password", type="password")
                
                if st.form_submit_button("Reset Password"):
                    if reset_password_with_security_question(fid3, sec_ans_input, new_pw_sec).get('success'):
                        st.success("Password reset successful. Please login.")
                    else:
                        st.error("Reset failed. Incorrect answer or User not found.")

    # Stop here until user authenticates
    st.stop()

# --- User is authenticated - now show sidebar with navigation ---
with st.sidebar:
    st.title("Navigation")
    
    # Display User Avatar in Sidebar
    user_avatar = load_user_avatar(st.session_state.user_id)
    if not user_avatar:
        user_avatar = generate_avatar_image(st.session_state.username or "User")
    
    col_av, col_info = st.columns([1, 2])
    with col_av:
        st.image(user_avatar, width=60)
    with col_info:
        st.write(f"**{st.session_state.username}**")
        st.caption(f"Role: {st.session_state.role}")

    def switch_page(new_page):
        st.session_state.page = new_page
        st.rerun()
    
    # Initialize page in session state if not exists
    if 'page' not in st.session_state:
        if st.session_state.role == 'admin':
            st.session_state.page = "AdminDashboard"
        else:
            st.session_state.page = "CodeGenie"
    
    # Create bordered containers for each navigation option
    with st.container(border=True):
        st.write("### Navigate To")
        
        # CodeGenie Option
        if st.session_state.role != 'admin':
            with st.container(border=True):
                col1, col2 = st.columns([1, 4])
                with col1:
                    st.write("🤖")
                with col2:
                    if st.button("CodeGenie", use_container_width=True, key="nav_codegenie"):
                        switch_page("CodeGenie")

        # My Profile Option
        with st.container(border=True):
            col1, col2 = st.columns([1, 4])
            with col1:
                st.write("👤")
            with col2:
                if st.button("My Profile", use_container_width=True, key="nav_profile"):
                    switch_page("MyProfile")
                    
        # Admin Dashboard Option (RBAC Check)
        if st.session_state.role == 'admin':
            with st.container(border=True):
                col1, col2 = st.columns([1, 4])
                with col1:
                    st.write("📊")
                with col2:
                    if st.button("Admin Dashboard", use_container_width=True, key="nav_admin"):
                        switch_page("AdminDashboard")
        
        # Code Explainer Option
        if st.session_state.role != 'admin':
            with st.container(border=True):
                col1, col2 = st.columns([1, 4])
                with col1:
                    st.write("🔍")
                with col2:
                    if st.button("Code Explainer", use_container_width=True, key="nav_code_explainer"):
                        switch_page("CodeExplainer")
        # AI Chatbot Option
        if st.session_state.role != 'admin':
            with st.container(border=True):
                col1, col2 = st.columns([1, 4])
                with col1:
                    st.write("💬")
                with col2:
                    if st.button("AI Assistant", use_container_width=True, key="nav_chatbot"):
                        switch_page("AIAssistant")

    
    # --- Share URL Section ---
    with st.container(border=True):
        st.write("### 🔗 Share URL")
        # ... (Keep existing share logic) ...
        st.caption("Localhost:8501")
    
    # Download History
    with st.container(border=True):
        if st.button("Download History (CSV)", use_container_width=True, key="download_history"):
            try:
                activities = get_user_activity(st.session_state.user_id)
                if activities:
                    df = pd.DataFrame(activities)
                    csv = df.to_csv(index=False).encode('utf-8')
                    st.download_button(
                        label="Click to Download CSV",
                        data=csv,
                        file_name=f"history_{st.session_state.user_id}.csv",
                        mime="text/csv",
                        key="download_csv_btn"
                    )
                else:
                    st.info("No history to download.")
            except Exception as e:
                st.error(f"Failed to download history: {e}")

    # Logout button with border
    with st.container(border=True):
        if st.button("Logout", use_container_width=True, key="nav_logout", type="secondary"):
            st.session_state.token = None
            st.session_state.user_id = None
            st.session_state.username = None
            st.session_state.role = None
            try:
                log_user_activity(st.session_state.user_id, 'logout')
            except Exception:
                pass
            st.success("Logged out successfully!")
            st.rerun()

# Get current page from session state
page = st.session_state.page

# ======================================================================================
# --- Page 1: CodeGenie (Main Application) ---
# ======================================================================================
if st.session_state.page == "CodeGenie":
    st.title("CodeGenie AI Assistant")
    st.subheader("Your AI-powered partner for code generation and explanation.")

    # --- Code Generation Section ---
    with st.container(border=True):
        st.header("Generate Code")
        prompt = st.text_area("Enter your code prompt:", height=100, placeholder="e.g., A Python function to check for palindromes")
        language = st.selectbox(
            "Select language:",
            ["Python", "C++", "JavaScript", "SQL", "HTML", "CSS", "Java"]
        )
        
        model_choice = st.selectbox(
            "Select Model:",
            ["gemma", "deepseek", "phi-2"],
            help="Note: CodeBERT cannot generate code."
        )

        if st.button("Generate Code"):
            if not prompt:
                st.warning("Please enter a prompt.")
            else:
                with st.spinner(f"Generating code using {model_choice}... Please wait."):
                    generated_code = generate_code(prompt, language, model_choice)
                    st.session_state.generated_code = generated_code
                    st.session_state.last_query = prompt  # Save for logging
                    st.session_state.last_language = language # Save for logging
                    st.session_state.explanation = "" # Reset explanation

                    try:
                        log_user_query(
                            user_id=st.session_state.user_id,
                            query=st.session_state.last_query,
                            language=st.session_state.last_language,
                            generated_code=st.session_state.generated_code,
                            explanation=st.session_state.explanation,
                            model_name=model_choice
                        )
                    except Exception as e:
                        st.warning(f"Warning: failed to log query immediately: {e}")

                    if isinstance(generated_code, str) and generated_code.strip().startswith("Error:"):
                        st.error(generated_code)

    # --- Code Display and Explanation Section ---
    if st.session_state.generated_code and not (isinstance(st.session_state.generated_code, str) and st.session_state.generated_code.strip().startswith("Error:")):
        st.header("Generated Code")
        st.code(st.session_state.generated_code, language=st.session_state.last_language.lower())

        # --- Feedback Section (Generation) ---
        st.markdown("---")
        with st.container(border=True):
            # Attractive Header for Generation Feedback
            st.markdown('<h3 style="color: #a855f7; margin-bottom: 10px;">✨ Rate this Generation</h3>', unsafe_allow_html=True)
            st.write("How satisfied are you with the *generated code*?")

            col1, col2 = st.columns([1, 2])
            with col1:
                rating = st.feedback("stars", key="gen_rating")
                if rating is None:
                    rating = 0
                else:
                    rating += 1
            with col2:
                comments = st.text_input("Comments:", placeholder="e.g., Efficient and clean...", key="gen_comments")

            if st.button("Submit Generation Feedback", key="btn_gen_feedback"):
                try:
                    log_feedback(
                        user_id=st.session_state.user_id,
                        query=st.session_state.last_query,
                        rating=rating,
                        comments=comments,
                        feedback_type='generation',
                        metadata={
                            'model': model_choice,
                            'language': st.session_state.last_language
                        }
                    )
                    st.success("Thank you! Your feedback on the code generation has been logged.")
                except Exception as e:
                    st.error(f"Failed to log feedback: {e}")


        with st.container(border=True):
            st.header("Explain Code")
            style = st.selectbox(
                "Select explanation style:",
                ["Beginner-Friendly", "Technical Deep-Dive", "Step-by-Step Guide", "Real-World Examples", "Bullet Points"]
            )

            if st.button("Explain This Code"):
                with st.spinner(f"Generating {style} explanation..."):
                    code_to_explain = st.session_state.generated_code
                    # Default to deepseek for explanation if available, or use the same model if it's not codebert
                    exp_model = "deepseek" 
                    explanation = explain_code(code_to_explain, style, exp_model)
                    st.session_state.explanation = explanation
                    st.session_state.last_explanation_model = exp_model

                    try:
                        log_user_query(
                            user_id=st.session_state.user_id,
                            query=st.session_state.last_query,
                            language=st.session_state.last_language,
                            generated_code=code_to_explain,
                            explanation=explanation
                        )
                        st.toast("Interaction logged to history.")
                    except Exception as e:
                        st.error(f"Failed to log user history: {e}")

        # Display explanation if it exists
        if st.session_state.explanation:
            st.markdown(st.session_state.explanation)

            # --- Feedback Section (Explanation) ---
            st.markdown("---")
            with st.container(border=True):
                st.markdown('<h3 style="color: #2dd4bf; margin-bottom: 10px;">🧠 Rate this Explanation</h3>', unsafe_allow_html=True)
                st.write("How helpful was this explanation?")

                col1, col2 = st.columns([1, 2])
                with col1:
                    exp_rating = st.feedback("stars", key="exp_rating_inline")
                    if exp_rating is None:
                        exp_rating = 0
                    else:
                        exp_rating += 1
                with col2:
                    exp_comments = st.text_input("Comments:", placeholder="e.g., Very clear...", key="exp_comments_inline")

                if st.button("Submit Explanation Feedback", key="btn_exp_feedback_inline"):
                    try:
                        log_feedback(
                            user_id=st.session_state.user_id,
                            query=st.session_state.last_query,
                            rating=exp_rating,
                            comments=exp_comments,
                            feedback_type='explanation',
                            metadata={
                                'model': st.session_state.get('last_explanation_model', 'deepseek'),
                                'style': style      # Defined in the scope above
                            }
                        )
                        st.success("Thank you! Your feedback on the explanation has been logged.")
                    except Exception as e:
                        st.error(f"Failed to log feedback: {e}")




# ======================================================================================
# --- Page 2: My Profile ---
# ======================================================================================
elif st.session_state.page == "MyProfile":
    st.title("My Profile")
    st.subheader("Manage your account settings and avatar.")

    with st.container(border=True):
        st.header("Profile Picture")
        
        col1, col2 = st.columns([1, 3])
        
        with col1:
            # Display current avatar
            current_avatar = load_user_avatar(st.session_state.user_id)
            if not current_avatar:
                current_avatar = generate_avatar_image(st.session_state.username or "User")
            st.image(current_avatar, width=150, caption="Current Avatar")
        
        with col2:
            st.write("Upload a new profile picture:")
            uploaded_file = st.file_uploader("Choose an image...", type=['png', 'jpg', 'jpeg'])
            
            if uploaded_file is not None:
                if st.button("Save New Avatar"):
                    try:
                        image_bytes = uploaded_file.getvalue()
                        if save_user_avatar(st.session_state.user_id, image_bytes):
                            st.success("Avatar updated successfully!")
                            st.rerun()
                        else:
                            st.error("Failed to save avatar.")
                    except Exception as e:
                        st.error(f"Error processing image: {e}")
            
            st.markdown("---")
            if st.button("Remove Avatar (Reset to Default)", type="secondary"):
                try:
                    path = get_avatar_path(st.session_state.user_id)
                    if os.path.exists(path):
                        os.remove(path)
                        st.success("Avatar reset to default.")
                        st.rerun()
                    else:
                        st.info("You are already using the default avatar.")
                except Exception as e:
                    st.error(f"Error removing avatar: {e}")

    with st.container(border=True):
        st.header("Account Details")
        st.write(f"**User ID:** {st.session_state.user_id}")
        st.write(f"**Username:** {st.session_state.username}")
        st.write(f"**Role:** {st.session_state.role}")

    with st.container(border=True):
        st.header("My Activity History")
        
        # Fetch user activity
        try:
            activities = get_user_activity(st.session_state.user_id)
            if activities:
                # Convert to DataFrame for easier display
                df_activity = pd.DataFrame(activities)
                
                # Filter/Rename columns for cleaner view
                if 'timestamp' in df_activity.columns:
                    df_activity['timestamp'] = pd.to_datetime(df_activity['timestamp']).dt.strftime('%Y-%m-%d %H:%M:%S')
                
                # Select relevant columns
                cols_to_show = ['timestamp', 'activity_type', 'query', 'language', 'model', 'rating', 'explanation']
                # Only keep columns that actually exist
                cols_to_show = [c for c in cols_to_show if c in df_activity.columns]
                
                st.dataframe(
                    df_activity[cols_to_show].sort_values('timestamp', ascending=False),
                    use_container_width=True,
                    hide_index=True
                )
            else:
                st.info("No activity history found.")
        except Exception as e:
            st.error(f"Failed to load history: {e}")

# ======================================================================================
# --- Page 3: Code Explainer (Standalone) ---
# ======================================================================================
elif st.session_state.page == "CodeExplainer":
    st.title("Code Explainer")
    st.subheader("Paste your code here and get a detailed explanation")

    with st.container(border=True):
        st.header("1. Paste Code")
        code_to_explain = st.text_area("Enter your code:", height=150, placeholder="e.g., def is_palindrome(s):\n    return s == s[::-1]")
        
        language = st.selectbox(
            "Select programming language:",
            ["Python", "C++", "JavaScript", "SQL", "Other"]
        )
        
        exp_model_choice = st.selectbox(
            "Select Model:",
            ["deepseek", "gemma", "phi-2"],
            key="explainer_model_select",
            help="Phi-2 is a powerful small model."
        )

    if code_to_explain and code_to_explain.strip():
        with st.container(border=True):
            st.header("2. Explanation Style")
            style = st.selectbox(
                "Select explanation style:",
                ["Beginner-Friendly", "Technical Deep-Dive", "Step-by-Step Guide", "Real-World Examples", "Bullet Points"],
                key="explainer_style"
            )

            if st.button("Explain Code"):
                with st.spinner(f"Generating explanation using {exp_model_choice}..."):
                    try:
                        explanation = explain_code(code_to_explain, style, exp_model_choice)
                        st.session_state.explanation_result = explanation
                        st.session_state.last_explained_code = code_to_explain
                        st.session_state.last_explained_language = language

                        try:
                            log_user_query(
                                user_id=st.session_state.user_id,
                                query=None,
                                language=language,
                                generated_code=code_to_explain,
                                explanation=explanation
                            )
                        except Exception as e:
                            st.warning(f"Note: Could not log to history: {e}")
                    except Exception as e:
                        st.error(f"Failed to generate explanation: {e}")

        if hasattr(st.session_state, 'explanation_result') and st.session_state.explanation_result:
            st.markdown(st.session_state.explanation_result)

            # --- Feedback Section (Standalone Explanation) ---
            st.markdown("---")
            with st.container(border=True):
                st.markdown('<h3 style="color: #2dd4bf; margin-bottom: 10px;">🧠 Rate this Explanation</h3>', unsafe_allow_html=True)
                st.write("How helpful was this explanation?")

                col1, col2 = st.columns([1, 2])
                with col1:
                    rating = st.slider("Rating (1=Not Helpful, 5=Very Helpful):", 1, 5, 3, key="explainer_rating_new")
                with col2:
                    comments = st.text_input("Comments:", placeholder="e.g., Could be clearer...", key="explainer_comments_new")

                if st.button("Submit Explanation Feedback", key="submit_explainer_feedback_new"):
                    try:
                        log_feedback(
                            user_id=st.session_state.user_id,
                            query=st.session_state.last_explained_code[:100] if hasattr(st.session_state, 'last_explained_code') else "Code snippet",
                            rating=rating,
                            comments=comments,
                            feedback_type='explanation',
                            metadata={
                                'model': exp_model_choice,
                                'style': style
                            }
                        )
                        st.success("Thank you! Your feedback has been logged.")
                    except Exception as e:
                        st.error(f"Failed to log feedback: {e}")

# ======================================================================================
# --- Page 4: Admin Dashboard ---
# ======================================================================================
# ======================================================================================
# --- Page 5: AI Assistant ---
# ======================================================================================
elif st.session_state.page == "AIAssistant":
    st.title("AI Assistant")
    st.subheader("Your personal guide to CodeGenie.")

    # Initialize chat history with user isolation
    if "chat_user_id" not in st.session_state or st.session_state.chat_user_id != st.session_state.user_id:
        st.session_state.chat_user_id = st.session_state.user_id
        st.session_state.messages = []

    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

    # React to user input
    if prompt := st.chat_input("Ask me anything about your code or this app..."):
        # Display user message in chat message container
        st.chat_message("user").markdown(prompt)
        # Add user message to chat history
        st.session_state.messages.append({"role": "user", "content": prompt})

        # Display assistant response in chat message container
        with st.chat_message("assistant"):
            with st.spinner("Thinking..."):
                # Use a default model for chat, e.g., gemma or deepseek
                response = chat_with_ai(st.session_state.user_id, prompt, model_name="gemma")
                st.markdown(response)
        
        # Add assistant response to chat history
        st.session_state.messages.append({"role": "assistant", "content": response})

elif st.session_state.page == "AdminDashboard":
    # Double check RBAC
    if st.session_state.role != 'admin':
        st.error("Access Denied. Admins only.")
        st.stop()

    st.title("Admin Dashboard")
    st.subheader("Comprehensive admin panel for monitoring and management.")

    if 'admin_dashboard_page' not in st.session_state:
        st.session_state.admin_dashboard_page = "Overview"
    
    # --- Admin Dashboard Navigation ---
    st.markdown("---")
    col1, col2, col3, col4, col5, col6, col7, col8, col9 = st.columns(9)
    
    with col1:
        if st.button("Overview", use_container_width=True, key="admin_overview"):
            st.session_state.admin_dashboard_page = "Overview"
            st.rerun()
    with col2:
        if st.button("Feedback", use_container_width=True, key="admin_feedback"):
            st.session_state.admin_dashboard_page = "Feedback"
            st.rerun()
    with col3:
        if st.button("Queries", use_container_width=True, key="admin_queries"):
            st.session_state.admin_dashboard_page = "Queries"
            st.rerun()
    with col4:
        if st.button("Languages", use_container_width=True, key="admin_languages"):
            st.session_state.admin_dashboard_page = "Languages"
            st.rerun()
    with col5:
        if st.button("Quality", use_container_width=True, key="admin_quality"):
            st.session_state.admin_dashboard_page = "CodeQuality"
            st.rerun()
    with col6:
        if st.button("Members", use_container_width=True, key="admin_members"):
            st.session_state.admin_dashboard_page = "Members"
            st.rerun()
    with col7:
        if st.button("Search", use_container_width=True, key="admin_search"):
            st.session_state.admin_dashboard_page = "GlobalSearch"
            st.rerun()
    with col8:
        if st.button("Refresh", use_container_width=True, key="admin_refresh"):
            st.cache_data.clear()
            st.rerun()
    
    with col9:
        if st.button("Eval", use_container_width=True, key="admin_eval"):
            st.session_state.admin_dashboard_page = "ModelEvaluation"
            st.rerun()
    st.markdown("---")

    # Load dashboard stats
    try:
        stats = get_dashboard_stats()
    except Exception as e:
        st.warning("Could not compute dashboard stats; showing fallback values.")
        stats = {}

    # --- PAGE 1: OVERVIEW METRICS ---
    if st.session_state.admin_dashboard_page == "Overview":
        st.header("Overview Metrics")
        with st.container(border=True):
            col1, col2, col3, col4 = st.columns(4)
            col1.metric("Total Queries", stats.get('total_queries', 0))
            col2.metric("Total Feedback", stats.get('total_feedback', 0))
            col3.metric("Avg Rating", f"{stats.get('average_rating', 0.0):.2f}/5.0")
            col4.metric("Active Users", len(get_all_users()))

    # --- PAGE 2: RATINGS & FEEDBACK ---
    elif st.session_state.admin_dashboard_page == "Feedback":
        st.header("Ratings & Feedback")
        
        # Load feedback data directly for detailed analysis
        try:
            with open('streamlit_app/feedback_log.json', 'r') as f:
                feedback_data = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            feedback_data = []

        if not feedback_data:
            st.info("No feedback data available yet.")
        else:
            # Prepare data
            comments = [item.get('comments', '') for item in feedback_data if item.get('comments')]
            ratings = [item.get('rating', 0) for item in feedback_data]
            
            # 1. Sentiment Analysis
            st.subheader("Sentiment Analysis")
            sentiment_stats = analyze_sentiments(comments)
            
            col1, col2 = st.columns([1, 2])
            with col1:
                st.metric("Avg Sentiment Score", f"{sentiment_stats.get('avg_compound', 0):.2f}", help="-1 (Neg) to +1 (Pos)")
                st.write("**Distribution:**")
                st.write(f"🟢 Positive: {sentiment_stats.get('positive', 0)}")
                st.write(f"⚪ Neutral: {sentiment_stats.get('neutral', 0)}")
                st.write(f"🔴 Negative: {sentiment_stats.get('negative', 0)}")
            
            with col2:
                # Pie chart for sentiment
                sent_df = pd.DataFrame({
                    'Sentiment': ['Positive', 'Neutral', 'Negative'],
                    'Count': [sentiment_stats.get('positive', 0), sentiment_stats.get('neutral', 0), sentiment_stats.get('negative', 0)]
                })
                if _HAS_MATPLOTLIB:
                    fig, ax = plt.subplots()
                    ax.pie(sent_df['Count'], labels=sent_df['Sentiment'], autopct='%1.1f%%', colors=['#66b3ff', '#99ff99', '#ff9999'])
                    st.pyplot(fig)
                else:
                    st.bar_chart(sent_df.set_index('Sentiment'))

            st.markdown("---")

            # 2. Word Cloud
            st.subheader("Feedback Word Cloud")
            if comments:
                wc_img = generate_wordcloud_image(comments, width=800, height=400)
                st.image(wc_img, use_container_width=True)
            else:
                st.info("Not enough comments to generate a word cloud.")

            st.markdown("---")
            
            # 3. Recent Feedback Table
            st.subheader("Recent Feedback")
            df_feedback = pd.DataFrame(feedback_data)
            if not df_feedback.empty:
                st.dataframe(df_feedback[['timestamp', 'user_id', 'rating', 'comments']].sort_values('timestamp', ascending=False), use_container_width=True)

    # --- PAGE 3: QUERIES ---
    elif st.session_state.admin_dashboard_page == "Queries":
        st.header("Queries & Charts")
        top_queries = stats.get('top_queries', {})
        
        if top_queries:
            st.subheader("Top 5 Trending Queries")
            df_queries = pd.DataFrame(list(top_queries.items()), columns=['Query', 'Count'])
            st.bar_chart(df_queries.set_index('Query'))
        else:
            st.info("No query data available.")

    # --- PAGE 4: LANGUAGES ---
    elif st.session_state.admin_dashboard_page == "Languages":
        st.header("Language Usage")
        lang_stats = stats.get('language_stats', {})
        
        if lang_stats:
            df_lang = pd.DataFrame(list(lang_stats.items()), columns=['Language', 'Count'])
            
            col1, col2 = st.columns([2, 1])
            with col1:
                st.bar_chart(df_lang.set_index('Language'))
            with col2:
                st.write("Details:")
                st.dataframe(df_lang, hide_index=True)
        else:
            st.info("No language data available.")

    # --- PAGE 5: CODE QUALITY ---
    elif st.session_state.admin_dashboard_page == "CodeQuality":
        st.header("Code Quality Metrics")
        cq = stats.get('code_quality', {})
        
        # Metrics Row
        col1, col2, col3 = st.columns(3)
        col1.metric("Execution Success Rate", f"{cq.get('execution_success_pct', 0)}%")
        col2.metric("Python Syntax Pass Rate", f"{cq.get('syntax', {}).get('pct_ok', 0)}%")
        col3.metric("Failures Detected", cq.get('execution_failure_count', 0))
        
        st.markdown("---")
        
        # Runtime Performance Chart
        st.subheader("Estimated Runtime Performance")
        runtime = cq.get('runtime', {})
        if runtime:
            df_runtime = pd.DataFrame(list(runtime.items()), columns=['Category', 'Count'])
            st.bar_chart(df_runtime.set_index('Category'))
        else:
            st.info("No runtime data.")

    # --- PAGE 6: MEMBERS ---
    elif st.session_state.admin_dashboard_page == "Members":
        st.header("Member Management")
        
        member_tabs = st.tabs(["All Members", "Promote Admin", "Delete Member"])
        
        with member_tabs[0]:
            st.subheader("Registered Members")
            users = get_all_users()
            if users:
                df_users = pd.DataFrame(users)
                # Handle missing columns gracefully
                cols = ['user_id', 'username', 'email', 'role', 'last_login']
                available_cols = [c for c in cols if c in df_users.columns]
                st.dataframe(df_users[available_cols], use_container_width=True)
            else:
                st.info("No members found.")

        with member_tabs[1]:
            st.subheader("Promote to Admin")
            st.info("Note: Maximum 2 admins allowed.")
            users = get_all_users()
            user_opts = {u['username']: u['user_id'] for u in users if u.get('role') != 'admin'}
            
            if user_opts:
                u_promote = st.selectbox("Select User to Promote", list(user_opts.keys()))
                if st.button("Promote to Admin"):
                    res = promote_user_to_admin(user_opts[u_promote])
                    if res.get('success'):
                        st.success(res.get('message'))
                        st.rerun()
                    else:
                        st.error(res.get('error'))
            else:
                st.info("No eligible users to promote.")

        with member_tabs[2]:
            st.subheader("Delete Member")
            users = get_all_users()
            user_del_opts = {u['username']: u['user_id'] for u in users if u['user_id'] != st.session_state.user_id} # Cannot delete self
            
            if user_del_opts:
                u_del = st.selectbox("Select User to Delete", list(user_del_opts.keys()))
                if st.button("Delete User", type="secondary"):
                    res = delete_user(user_del_opts[u_del])
                    if res.get('success'):
                        st.success(res.get('message'))
                        st.rerun()
                    else:
                        st.error(res.get('error'))
            else:
                st.info("No other users to delete.")

    # --- PAGE 7: GLOBAL SEARCH ---
    # --- PAGE 8: MODEL EVALUATION ---
    elif st.session_state.admin_dashboard_page == "ModelEvaluation":
        st.header("Model Evaluation & Comparison")
        
        # --- Visualization Section ---
        st.subheader("📊 Model Performance Analytics")
        
        model_stats = stats.get('model_stats', {})
        if model_stats:
            # Prepare data for charts
            data = []
            for m, s in model_stats.items():
                data.append({
                    'Model': m,
                    'Avg Rating': s.get('avg_rating', 0.0),
                    'Usage Count': s.get('usage_count', 0),
                    'Feedback Count': s.get('feedback_count', 0)
                })
            
            df_models = pd.DataFrame(data)
            
            col1, col2 = st.columns(2)
            
            with col1:
                st.write("**Average User Rating**")
                st.bar_chart(df_models.set_index('Model')['Avg Rating'], color="#a855f7")
            
            with col2:
                st.write("**Usage Distribution**")
                if _HAS_MATPLOTLIB:
                    fig, ax = plt.subplots()
                    ax.pie(df_models['Usage Count'], labels=df_models['Model'], autopct='%1.1f%%', startangle=90)
                    ax.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
                    st.pyplot(fig)
                else:
                    st.bar_chart(df_models.set_index('Model')['Usage Count'])
            
            st.dataframe(df_models, use_container_width=True)
            st.markdown("---")
        else:
            st.info("No model statistics available yet.")

        st.subheader("🧪 Live Model Testing")
        with st.container(border=True):
            eval_prompt = st.text_area("Test Prompt:", height=100, placeholder="e.g., Write a factorial function")
            eval_lang = st.selectbox("Language:", ["Python", "C++", "JavaScript", "SQL", "HTML", "CSS", "Java"], key="eval_lang")
            if st.button("Run Evaluation"):
                if not eval_prompt:
                    st.warning("Please enter a prompt.")
                else:
                    col1, col2, col3 = st.columns(3)
                    with col1:
                        st.subheader("Gemma")
                        with st.spinner("Gemma generating..."):
                            res = generate_code(eval_prompt, eval_lang, "gemma")
                            st.code(res, language=eval_lang.lower())
                    with col2:
                        st.subheader("DeepSeek")
                        with st.spinner("DeepSeek generating..."):
                            res = generate_code(eval_prompt, eval_lang, "deepseek")
                            st.code(res, language=eval_lang.lower())
                    with col3:
                        st.subheader("Phi-2")
                        with st.spinner("Phi-2 generating..."):
                            res = generate_code(eval_prompt, eval_lang, "phi-2")
                            st.code(res, language=eval_lang.lower())
    elif st.session_state.admin_dashboard_page == "GlobalSearch":
        st.header("Global Search")
        search_query = st.text_input("Search users, history, or feedback...", placeholder="Enter keyword")
        
        if search_query:
            results = global_search(search_query)
            
            st.subheader("Users")
            if results['users']:
                st.dataframe(pd.DataFrame(results['users']))
            else:
                st.write("No matching users.")
                
            st.subheader("History")
            if results['history']:
                st.dataframe(pd.DataFrame(results['history']))
            else:
                st.write("No matching history.")
                
            st.subheader("Feedback")
            if results['feedback']:
                st.dataframe(pd.DataFrame(results['feedback']))
            else:
                st.write("No matching feedback.")


In [None]:
## Step 6: Initialize Admin User

In [None]:
%%writefile create_admin_user.py
import sys
import os

# Add backend to path
sys.path.append(os.path.abspath("backend"))

try:
    from backend.user_management_module import register_user_with_password, promote_user_to_admin, get_user_by_username
except ImportError as e:
    print(f"❌ Import failed: {e}")
    sys.exit(1)

def create_admin():
    print("--- Creating Default Admin User ---")
    
    username = "admin"
    password = "Admin@123"
    email = "admin@example.com"
    
    # Check if admin exists
    user = get_user_by_username(username)
    
    if user:
        print(f"User '{username}' already exists. Promoting to admin...")
        res = promote_user_to_admin(user['user_id'])
    else:
        print(f"Registering new admin user '{username}'...")
        # New Signature: username, password, email, sec_q, sec_a
        res = register_user_with_password(username, password, email, "Secret Code", "1234")
        
        if res['success']:
            # Promote to admin
            res = promote_user_to_admin(res['user_id'])
    
    if res.get('success'):
        print("✅ Admin user 'admin' is ready.")
        print(f"Username: {username}")
        print(f"Password: {password}")
    else:
        print(f"❌ Failed to setup admin: {res.get('error')}")

if __name__ == "__main__":
    create_admin()


In [None]:
!python create_admin_user.py

In [None]:
## Step 7: Start Model Server

In [None]:
!pkill -f uvicorn
import gc
import torch
gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()

import threading
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import nest_asyncio
import torch
from backend.model_loader import load_models

# Allow nested event loops
nest_asyncio.apply()

app = FastAPI()

# Load models globally for the server
print("Initializing models for server...")
MODELS, TOKENIZERS = load_models()

class GenRequest(BaseModel):
    prompt: str
    language: str
    model: str

class ExplainRequest(BaseModel):
    code: str
    style: str
    model: str

@app.post("/generate")
async def generate(req: GenRequest):
    try:
        if req.model not in MODELS:
            raise HTTPException(status_code=400, detail=f"Model {req.model} not loaded")
            
        model = MODELS[req.model]
        tokenizer = TOKENIZERS[req.model]
        device = "cuda" if torch.cuda.is_available() else "cpu"
        
        # Format prompt
        if req.model == 'gemma':
            messages = [{"role": "user", "content": f"Write {req.language} code for:\n{req.prompt}"}]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif req.model == 'deepseek':
            messages = [{"role": "user", "content": f"You are an expert coding assistant. Write {req.language} code for: {req.prompt}"}]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif req.model == 'phi-2':
            formatted_prompt = f"Instruct: Write {req.language} code for {req.prompt}\nOutput:"
        else:
            formatted_prompt = f"Generate {req.language} code: {req.prompt}"
            
        inputs = tokenizer(formatted_prompt, return_tensors="pt").to(device)
        
        with torch.no_grad():
            outputs = model.generate(
                **inputs, 
                max_new_tokens=300, 
                temperature=0.2, 
                do_sample=True, 
                pad_token_id=tokenizer.eos_token_id
            )
            
        text = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
        return {"code": text.strip()}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/explain")
async def explain(req: ExplainRequest):
    try:
        if req.model not in MODELS:
            raise HTTPException(status_code=400, detail=f"Model {req.model} not loaded")
            
        model = MODELS[req.model]
        tokenizer = TOKENIZERS[req.model]
        device = "cuda" if torch.cuda.is_available() else "cpu"
        
        prompt_content = f"Explain this {req.style} code:\n\n{req.code}"
        
        if req.model == 'gemma':
            messages = [{"role": "user", "content": prompt_content}]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif req.model == 'deepseek':
            messages = [{"role": "user", "content": prompt_content}]
            formatted_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        elif req.model == 'phi-2':
            formatted_prompt = f"Instruct: {prompt_content}\nOutput:"
        else:
            formatted_prompt = prompt_content

        inputs = tokenizer(formatted_prompt, return_tensors="pt").to(device)
        
        with torch.no_grad():
            outputs = model.generate(
                **inputs, 
                max_new_tokens=250, 
                temperature=0.7, 
                do_sample=True, 
                pad_token_id=tokenizer.eos_token_id
            )
            
        text = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
        return {"explanation": text.strip()}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

def run_server():
    uvicorn.run(app, host="0.0.0.0", port=8000)

thread = threading.Thread(target=run_server, daemon=True)
thread.start()
print("✅ Model Server running at http://localhost:8000")


In [None]:
## Step 8: Launch Application

In [None]:
#@title Launch App
from pyngrok import ngrok
import os

# Kill existing ngrok
!pkill -f streamlit
!pkill ngrok

# Authenticate ngrok
token = os.getenv("NGROK_TOKEN")
if token:
    ngrok.set_auth_token(token)
else:
    print("⚠️ NGROK_TOKEN not found in .env")

# Start Streamlit in background
get_ipython().system_raw('streamlit run streamlit_app/app.py &')

# Open Tunnel
try:
    public_url = ngrok.connect(8501).public_url
    print(f"🚀 Streamlit App is live at: {public_url}")
except Exception as e:
    print(f"Error starting ngrok: {e}")
