<a href="https://colab.research.google.com/github/saudaziz555/LLM-streamlit-app/blob/main/Finalstreamlit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Updated

In [None]:
# Cell 1: Package Installation
%%capture
# Update package lists and install poppler-utils
!apt-get update
!apt-get install -y poppler-utils

# Install Python packages
!pip install streamlit google-generativeai pandas pillow pdf2image tenacity
!pip install PyMuPDF openpyxl bitsandbytes accelerate
!pip uninstall unsloth -y && pip install --upgrade --no-cache-dir --no-deps git+https://github.com/unslothai/unsloth.git
!pip install pyngrok
!pip install rapidfuzz
!pip install unsloth_zoo

In [None]:
%%writefile app.py
import streamlit as st
import os
import pandas as pd
from PIL import Image
import re
import time
import logging
import google.generativeai as genai
from unsloth import FastVisionModel
import torch
import fitz  # PyMuPDF
from concurrent.futures import ThreadPoolExecutor, as_completed
import tenacity
import io
import warnings
from rapidfuzz import fuzz
import sqlite3
from google.colab import userdata
import IPython
import ipykernel



# GEMINI API KEY
try:
    gen_key = userdata.get('GENAI_API_KEY')
    if gen_key is None:
        raise ValueError("GENAI_API_KEY not found in secrets.")
except Exception as e:
    print(f"Error getting API key: {e}")


# Set up logging
logging.basicConfig(
    filename='processing_log.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='a'
)
logger = logging.getLogger(__name__)
logger.info("Streamlit app started.")

# Base directory
BASE_DIR = "/content/drive/MyDrive"
DB_FILE = BASE_DIR+"/results.db"
TABLE_NAME = "student_results"
LOG_TABLE = "process_logs"

# Initialize Gemini
try:
    genai.configure(api_key=gen_key)
    gemini_model = genai.GenerativeModel("gemini-1.5-flash")
    print("Gemini model initialized successfully.") # Optional success message
except Exception as e:
    logging.error(f"Failed to initialize Gemini model: {str(e)}")
    # st.error(f"Failed to initialize Gemini model: {str(e)}") # Assuming 'st' is Streamlit or similar
    gemini_model = None

# Initialize LLaMA
@st.cache_resource
def load_llama_model():
    try:
        model, tokenizer = FastVisionModel.from_pretrained(
            "unsloth/Llama-3.2-11B-Vision-Instruct",
            load_in_4bit=True,
            use_gradient_checkpointing="unsloth",
        )
        FastVisionModel.for_inference(model)
        return model, tokenizer
    except Exception as e:
        logging.error(f"Failed to load LLaMA model: {str(e)}")
        st.error(f"Error loading LLaMA model: {e}. Consider upgrading to Colab Pro+.")
        return None, None

llama_model, llama_tokenizer = load_llama_model()

# Retry decorator for Gemini API
@tenacity.retry(
    wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
    stop=tenacity.stop_after_attempt(5),
    retry=tenacity.retry_if_exception_message(match="429")
)
def generate_gemini_content(contents):
    if gemini_model is None:
        raise Exception("Gemini model not initialized.")
    time.sleep(2)
    return gemini_model.generate_content(contents)

def load_file(file_path, zoom_x=2.0, zoom_y=2.0):
    try:
        if file_path.lower().endswith(('.jpg', '.jpeg', '.png')):
            img = Image.open(file_path).convert("RGB")
            return [img]
        elif file_path.lower().endswith('.pdf'):
            doc = fitz.open(file_path)
            images = []
            for page_num in range(doc.page_count):
                page = doc.load_page(page_num)
                mat = fitz.Matrix(zoom_x, zoom_y)
                pix = page.get_pixmap(matrix=mat, alpha=False)
                img = Image.open(io.BytesIO(pix.tobytes()))
                images.append(img)
            return images
        else:
            print(f"Unsupported file format: {os.path.basename(file_path)}")
            return []
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return []

def process_gemini_file(file_name, folder_path):
    file_path = os.path.join(folder_path, file_name)
    prompt = "Extract: Student ID (number, must be 7 digits), all Outcomes (number: value), Total Marks (number)."
    row_dict = {}
    try:
        images = load_file(file_path)
        if not images:
            logger.warning(f"No images loaded from {file_name}")
            return row_dict, file_name
        for img in images:
            response = generate_gemini_content([img, prompt])
            generated_text = response.text.replace("*", "").strip()
            if not generated_text:
                logger.warning(f"Empty response from Gemini for {file_name}")
                return row_dict, file_name
            # Extract Student ID
            student_id_match = re.search(r'Student ID[:\s]*(\d+)', generated_text, re.IGNORECASE)
            if student_id_match:
                row_dict['Student ID'] = str(student_id_match.group(1))
            # Extract Outcomes
            outcome_matches = re.findall(r'Outcome\s*(\d+)[:\s]*(\d+)', generated_text, re.IGNORECASE)
            for num, mark in outcome_matches:
                row_dict[f'Outcome {num}'] = int(mark)
            # Extract Total Marks
            total_marks_match = re.search(r'Total Marks[:\s]*(\d+)', generated_text, re.IGNORECASE)
            if total_marks_match:
                row_dict['Total Marks'] = int(total_marks_match.group(1))
            if not ('Student ID' in row_dict and any('Outcome' in k for k in row_dict) and 'Total Marks' in row_dict):
                logger.warning(f"Incomplete data extracted from {file_name}")
                return row_dict, file_name
        return row_dict, None
    except Exception as e:
        logger.error(f"Error processing {file_name} with Gemini: {str(e)}")
        return row_dict, file_name

def process_llama_file(file_name, folder_path):
    file_path = os.path.join(folder_path, file_name)
    row_dict = {}
    try:
        images = load_file(file_path)
        if not images:
            logger.warning(f"No images loaded from {file_name}")
            return row_dict, file_name
        for img in images:
            if llama_model is None or llama_tokenizer is None:
                logger.warning(f"LLaMA model not initialized for {file_name}")
                return row_dict, file_name
            prompt = "Extract: Student ID (number, must be 7 digits), all Outcomes (number: value), Total Marks (number)."
            messages = [
                {"role": "user", "content": [
                    {"type": "text", "text": prompt},
                    {"type": "image", "image": img},
                ]}
            ]
            input_text = llama_tokenizer.apply_chat_template(messages, add_generation_prompt=True)
            inputs = llama_tokenizer(
                img,
                input_text,
                add_special_tokens=False,
                return_tensors="pt",
            ).to("cuda")
            output_tokens = llama_model.generate(
                **inputs,
                max_new_tokens=100,
                use_cache=True,
                temperature=0.0,
                do_sample=False
            )
            generated_text = llama_tokenizer.decode(output_tokens[0], skip_special_tokens=True).strip()
            generated_text = generated_text.replace("*", "").replace("Course Performance:", "")
            # Extract Student ID
            student_id_match = re.search(r'Student ID[:\s]*(\d+)', generated_text, re.IGNORECASE)
            if student_id_match:
                row_dict['Student ID'] = str(student_id_match.group(1))
            # Extract Outcomes
            outcome_matches = re.findall(r'Outcome\s*(\d+)[:\s]*(\d+)', generated_text, re.IGNORECASE)
            for num, mark in outcome_matches:
                row_dict[f'Outcome {num}'] = int(mark)
            # Extract Total Marks
            total_marks_match = re.search(r'Total Marks[:\s]*(\d+)', generated_text, re.IGNORECASE)
            if total_marks_match:
                row_dict['Total Marks'] = int(total_marks_match.group(1))
            if not ('Student ID' in row_dict and any('Outcome' in k for k in row_dict) and 'Total Marks' in row_dict):
                logger.warning(f"Incomplete data extracted from {file_name}")
                return row_dict, file_name
        return row_dict, None
    except Exception as e:
        logger.error(f"Error processing {file_name} with LLaMA: {str(e)}")
        return row_dict, file_name

def process_files(model_name, selected_files, selected_folder, batch_size=5, sleep_time=0.5):
    folder_path = os.path.join(BASE_DIR, selected_folder)
    list_rows = []
    all_keys = set()
    failed_files = []
    total_files = len(selected_files)
    processed_files = 0

    def process_single_file(file_name):
        return process_gemini_file(file_name, folder_path) if model_name == "Gemini" else process_llama_file(file_name, folder_path)

    batched_files = [selected_files[i:i + batch_size] for i in range(0, len(selected_files), batch_size)]
    with ThreadPoolExecutor(max_workers=4) as executor:
        for batch in batched_files:
            future_to_file = {executor.submit(process_single_file, file_name): file_name for file_name in batch}
            for future in as_completed(future_to_file):
                processed_files += 1
                progress = processed_files / total_files
                st.session_state.progress_bar.progress(min(progress, 1.0))
                row_dict, failed_file = future.result()
                if row_dict:
                    all_keys.update(row_dict.keys())
                    list_rows.append(row_dict)
                if failed_file:
                    failed_files.append(failed_file)
            time.sleep(sleep_time)

    if list_rows:
        all_keys_list = list(all_keys)
        student_id_key = 'Student ID'
        total_marks_key = 'Total Marks'
        outcome_keys = sorted([key for key in all_keys_list if key.startswith('Outcome')],
                             key=lambda x: int(re.search(r'\d+', x).group()))
        other_keys = [key for key in all_keys_list if key not in [student_id_key, total_marks_key] and not key.startswith('Outcome')]
        ordered_keys = [student_id_key] + outcome_keys + [total_marks_key] + other_keys
        normalized_rows = [{key: row.get(key, None) for key in ordered_keys} for row in list_rows]
        df = pd.DataFrame(normalized_rows, columns=ordered_keys)
        df['Student ID'] = df['Student ID'].astype(str)
        logger.info(f"DataFrame created with {len(df)} rows and columns: {ordered_keys}")
        return df, failed_files
    return pd.DataFrame(), failed_files

# --- MODIFIED DATABASE SECTION STARTS HERE ---

def init_db():
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    # Create normalized tables
    cursor.execute(f"""CREATE TABLE IF NOT EXISTS students (
                    student_id TEXT PRIMARY KEY,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )""")
    cursor.execute(f"""CREATE TABLE IF NOT EXISTS outcomes (
                    outcome_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    student_id TEXT,
                    outcome_number INTEGER,
                    score INTEGER,
                    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    UNIQUE(student_id, outcome_number),
                    FOREIGN KEY(student_id) REFERENCES students(student_id)
                )""")
    cursor.execute(f"""CREATE TABLE IF NOT EXISTS total_marks (
                    mark_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    student_id TEXT UNIQUE,
                    total INTEGER,
                    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY(student_id) REFERENCES students(student_id)
                )""")
    cursor.execute(f"""CREATE TABLE IF NOT EXISTS {LOG_TABLE} (
                    log_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TIMESTAMP,
                    action TEXT,
                    details TEXT
                )""")
    conn.commit()
    conn.close()

init_db()

def log_action(action, details):
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    cursor.execute(f"INSERT INTO {LOG_TABLE} (timestamp, action, details) VALUES (?, ?, ?)", (timestamp, action, details))
    conn.commit()
    conn.close()

def save_to_sqlite(df):
    conn = sqlite3.connect(DB_FILE)
    try:
        for _, row in df.iterrows():
            student_id = str(row['Student ID']).strip()
            # Insert or ignore student
            conn.execute("INSERT OR IGNORE INTO students (student_id) VALUES (?)", (student_id,))
            # Insert outcomes with conflict handling
            outcomes = [(k.replace('Outcome ', ''), v) for k, v in row.items()
                       if k.startswith('Outcome') and pd.notnull(v)]
            for outcome_num, score in outcomes:
                conn.execute("""INSERT INTO outcomes (student_id, outcome_number, score)
                              VALUES (?, ?, ?)
                              ON CONFLICT(student_id, outcome_number) DO UPDATE SET
                              score = excluded.score,
                              recorded_at = CURRENT_TIMESTAMP""",
                           (student_id, outcome_num, score))
            # Insert total marks
            if pd.notnull(row.get('Total Marks')):
                conn.execute("""INSERT INTO total_marks (student_id, total)
                              VALUES (?, ?)
                              ON CONFLICT(student_id) DO UPDATE SET
                              total = excluded.total,
                              recorded_at = CURRENT_TIMESTAMP""",
                           (student_id, row['Total Marks']))
        conn.commit()
    except sqlite3.Error as e:
        conn.rollback()
        logger.error(f"Database error: {str(e)}")
        st.error("Failed to save data to database. Check logs for details.")
    finally:
        conn.close()



def load_from_sqlite():
    conn = sqlite3.connect(DB_FILE)
    try:
        # Load all outcomes dynamically
        outcomes_df = pd.read_sql_query("""
            SELECT student_id, outcome_number, score
            FROM outcomes
        """, conn)

        if outcomes_df.empty:
            outcomes_pivot = pd.DataFrame()
        else:
            outcomes_df['Outcome'] = 'Outcome ' + outcomes_df['outcome_number'].astype(str)
            outcomes_pivot = outcomes_df.pivot(index='student_id', columns='Outcome', values='score').reset_index()

        # Load total marks
        total_marks_df = pd.read_sql_query("""
            SELECT student_id, total AS 'Total Marks'
            FROM total_marks
        """, conn)

        # Merge both
        df = pd.merge(outcomes_pivot, total_marks_df, on='student_id', how='outer')
        df = df.rename(columns={'student_id': 'Student ID'})
        return df
    except sqlite3.Error as e:
        logger.error(f"Database error: {str(e)}")
        return pd.DataFrame()
    finally:
        conn.close()

# Adding a function to view the log records
def view_logs():
    conn = sqlite3.connect(DB_FILE)
    df = pd.read_sql_query(f"SELECT * FROM {LOG_TABLE}", conn)
    conn.close()
    return df

def merge_student_ids(extracted_df, student_ids_df):
  # Merge extracted data with student IDs from Excel using fuzzy matching.
  # Ensures compatibility of data types and matches IDs based on similarity.
  try:
        extracted_df.columns = extracted_df.columns.str.strip()
        student_ids_df.columns = student_ids_df.columns.str.strip()
        if 'Student ID' not in student_ids_df.columns:
            raise ValueError("Student IDs file must contain a 'Student ID' column.")
        if 'Student ID' not in extracted_df.columns:
            raise ValueError("Extracted data must contain a 'Student ID' column.")
        student_ids_df['Student ID'] = student_ids_df['Student ID'].astype(str).str.strip()
        extracted_df['Student ID'] = extracted_df['Student ID'].astype(str).str.strip()
        logger.info(f"student_ids_df['Student ID'] dtype: {student_ids_df['Student ID'].dtype}")
        logger.info(f"extracted_df['Student ID'] dtype: {extracted_df['Student ID'].dtype}")
        matches = []
        threshold = 80
        for student_id in student_ids_df['Student ID']:
            best_match = None
            highest_score = 0
            for extracted_id in extracted_df['Student ID']:
                score = fuzz.ratio(student_id, extracted_id)
                if score > highest_score and score >= threshold:
                    highest_score = score
                    best_match = extracted_id
            matches.append((student_id, best_match))
        match_df = pd.DataFrame(matches, columns=['Student ID', 'Matched Student ID'])
        merged_df = pd.merge(student_ids_df, match_df, on='Student ID', how='left')
        final_df = pd.merge(merged_df, extracted_df, left_on='Matched Student ID', right_on='Student ID', how='left')
        final_df = final_df.drop(columns=['Matched Student ID', 'Student ID_y']).rename(columns={'Student ID_x': 'Student ID'})
        merged_df = final_df.fillna('Not Found')
        return merged_df

  except Exception as e:
        logger.error(f"Error during merging student IDs: {str(e)}")
        st.error(f"Error during merging student IDs: {str(e)}")
        return pd.DataFrame()

# --- MAIN APP ---

def main():
    st.title("Student Results Extractor")
    st.write("Extract Student IDs, Outcomes, and Total Marks from images or PDFs.")

    # View logs section
    st.subheader("📋 View Action Logs")
    if st.button("View Logs"):
        logs_df = view_logs()
        if not logs_df.empty:
            st.dataframe(logs_df)
        else:
            st.info("No logs available.")

    if not os.path.exists(BASE_DIR):
        st.error("Google Drive not mounted. Run in Colab: `from google.colab import drive; drive.mount('/content/drive')`")
        return

    model_choice = st.selectbox("Select Model:", ["Gemini", "LLaMA"])

    available_folders = [f for f in os.listdir(BASE_DIR) if os.path.isdir(os.path.join(BASE_DIR, f))]
    available_folders.sort()
    selected_folder = st.selectbox("Select Folder from MyDrive:", available_folders)

    if selected_folder:
        folder_path = os.path.join(BASE_DIR, selected_folder)
        available_files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.pdf'))]
        available_files.sort()
        if not available_files:
            st.warning(f"No JPG or PDF files found in '{selected_folder}'.")
            return

        select_all = st.checkbox("Select All Files", value=False)
        selected_files = st.multiselect(
            "Select Files:",
            available_files,
            default=available_files if select_all else []
        )

        student_ids_file = st.file_uploader("Upload Student IDs Excel File (for merging)", type=['xlsx'],
                                           help="Excel file with Student IDs to merge with extracted data.")

        if st.button("Process Files"):
            if not selected_files:
                st.warning("Please select at least one file.")
            else:
                with st.spinner(f"Processing {len(selected_files)} files with {model_choice}..."):
                    st.session_state.progress_bar = st.progress(0.0)
                    start_time = time.time()
                    df, failed_files = process_files(model_choice, selected_files, selected_folder)
                    end_time = time.time()

                if not df.empty:
                    st.success(f"Processing completed in {end_time - start_time:.2f} seconds")
                    st.write("Extracted Results:")
                    st.dataframe(df)
                    save_to_sqlite(df)
                    st.success("Saved to SQLite database.")
                    csv_path = "student_results.csv"
                    df.to_csv(csv_path, index=False)
                    st.download_button(
                        label="Download Results as CSV",
                        data=open(csv_path, "rb").read(),
                        file_name="student_results.csv",
                        mime="text/csv"
                    )

                if student_ids_file and not df.empty:
                    st.subheader("Merged Results with Student IDs")
                    student_ids_df = pd.read_excel(student_ids_file)
                    merged_df = merge_student_ids(df, student_ids_df)
                    if not merged_df.empty:
                        st.dataframe(merged_df)
                        merged_csv_path = "merged_student_results.csv"
                        merged_df.to_csv(merged_csv_path, index=False)
                        with open(merged_csv_path, "rb") as f:
                            st.download_button(
                                label="Download Merged Results as CSV",
                                data=f,
                                file_name="merged_student_results.csv",
                                mime="text/csv"
                            )
                    else:
                        st.error("No data extracted from files.")

                if failed_files:
                    st.warning("Failed to process or extract data from:")
                    for file in failed_files:
                        st.write(f"- {file}")
                    st.write("Check '/content/processing_log.log' for details.")
                else:
                    st.success("All files processed successfully.")


    st.subheader("📊 View Stored Results")

    if st.button("Load from Database"):
        stored_data = load_from_sqlite()
        if not stored_data.empty:
            st.session_state["loaded_data"] = stored_data
            st.success("Data loaded successfully.")
        else:
            st.warning("No data in database.")

    # Display after load
    if "loaded_data" in st.session_state:
        st.write("### Stored Results")
        st.dataframe(st.session_state["loaded_data"], use_container_width=True)

if __name__ == "__main__":
    main()

Writing app.py


In [None]:
from google.colab import drive
from google.colab import userdata
from pyngrok import ngrok
import subprocess
import time


# Retrieve ngrok token from Colab Secrets
ngrok_token = userdata.get('NGROK_TOKEN')
if not ngrok_token:
    raise ValueError("ngrok token not found. Please make sure it's added under Colab > Settings > Secrets.")

# Set ngrok auth token
ngrok.set_auth_token(ngrok_token)

# Mount Google Drive (optional if you are using files from Drive)
drive.mount('/content/drive')

# Kill any previous ngrok tunnels
ngrok.kill()

# Launch the Streamlit app
process = subprocess.Popen(['streamlit', 'run', 'app.py', '--server.port', '8501'])
time.sleep(5)  # Wait for Streamlit to start

# Create a public URL using ngrok
public_url = ngrok.connect(8501)
print(f"Streamlit app is running at: {public_url}")

# Keep the app alive
try:
    while True:
        time.sleep(60)
except KeyboardInterrupt:
    print("Stopping Streamlit and ngrok")
    process.terminate()
    ngrok.kill()


Mounted at /content/drive
Streamlit app is running at: NgrokTunnel: "https://900d-34-171-242-210.ngrok-free.app" -> "http://localhost:8501"
