In [1]:
%pip install transformers

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install -U PyPDF2
%pip install python-docs

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
%pip install sentence-transformers scikit-learn
%pip install numpy
%pip install pandas

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [4]:
# Cell 2: imports & global setup
import os
import re
import json
import sqlite3
import subprocess
from io import BytesIO
from typing import Dict, Tuple

from PyPDF2 import PdfReader
from pdf2image import convert_from_path
from PIL import Image

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

from transformers import TrOCRProcessor, VisionEncoderDecoderModel
from sentence_transformers import SentenceTransformer, util
import torch
print(torch.version.cuda)
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

  from .autonotebook import tqdm as notebook_tqdm


None
Device: cpu


In [5]:
def read_pdf_text(file_path) -> str:
    if isinstance(file_path, (str, os.PathLike)):
        reader = PdfReader(str(file_path))
    else:
        reader = PdfReader(file_path)
    text = []
    for page in reader.pages:
        page_text = page.extract_text() or ""
        text.append(page_text)
    return "\n".join(text)

def read_text_file(file_path) -> str:
    if isinstance(file_path, (str, os.PathLike)):
        with open(file_path, "r", encoding="utf-8") as f:
            return f.read()
    else:
        return file_path.read().decode("utf-8")

In [6]:
QA_REGEX = r"(Q\d+):\s*(.+?)\s*A\d+:\s*(.+?)(?=\nQ\d+:|$)"
MARKS_IN_Q = r"\((\d+)\)" 

def parse_exam(text: str, default_marks: int = 5) -> Tuple[Dict[str, dict], Dict[str, str]]:
    
    matches = re.findall(QA_REGEX, text, flags=re.DOTALL)
    questions = {}
    model_answers = {}
    for q_id, q_text, a_text in matches:
        m = re.search(MARKS_IN_Q, q_text)
        marks = int(m.group(1)) if m else default_marks
        clean_q_text = re.sub(MARKS_IN_Q, "", q_text).strip()
        questions[q_id] = {"text": clean_q_text, "marks": marks}
        model_answers[q_id] = a_text.strip()
    if not questions:
        raise ValueError("No Q/A pairs detected. Ensure format 'Q1: ... A1: ...'.")
    return questions, model_answers


In [7]:
trocr_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-handwritten")
trocr_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-handwritten")

Fetching 1 files: 100%|██████████| 1/1 [00:00<?, ?it/s]
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
Some weights of VisionEncoderDecoderModel were not initialized from the model checkpoint at microsoft/trocr-base-handwritten and are newly initialized: ['encoder.pooler.dense.bias', 'encoder.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [8]:
from pdf2image import convert_from_path
POPPLER_PATH = r"C:\poppler-25.07.0\Library\bin"
def ocr_pdf_to_text(pdf_path: str) -> str:
    """
    Converts a multi-page PDF to text using TrOCR.
    """
    images = convert_from_path(pdf_path, poppler_path=POPPLER_PATH)
    lines = []
    for img in images:
        pixel_values = trocr_processor(images=img, return_tensors="pt").pixel_values
        gen_ids = trocr_model.generate(pixel_values)
        text = trocr_processor.batch_decode(gen_ids, skip_special_tokens=True)[0]
        lines.append(text)
    return "\n".join(lines)

In [9]:
embedder = SentenceTransformer("all-MiniLM-L6-v2")

In [10]:
def chunk_text(text: str, chunk_size: int = 200, overlap: int = 50):
    words = text.split()
    if not words:
        return []
    chunks, i = [], 0
    while i < len(words):
        chunks.append(" ".join(words[i:i+chunk_size]))
        i += max(chunk_size - overlap, 1)
    return chunks

def map_answers_to_questions(questions: Dict[str, dict], ocr_text: str,
                             chunk_size=200, overlap=50, threshold=0.6) -> Dict[str, str]:
    """
    Returns a dict: { "Q1": "student text...", "Q2": "...", "Flagged": "low-confidence chunks" }
    """
    chunks = chunk_text(ocr_text, chunk_size=chunk_size, overlap=overlap)
    q_keys = list(questions.keys())
    q_texts = [q["text"] for q in questions.values()]
    q_emb = embedder.encode(q_texts)
    mapped = {k: "" for k in q_keys}
    flagged = []

    for ch in chunks:
        ch_emb = embedder.encode([ch])
        sims = cosine_similarity(ch_emb, q_emb)[0]
        best_idx = int(np.argmax(sims))
        best_sim = float(sims[best_idx])
        if best_sim >= threshold:
            mapped[q_keys[best_idx]] += ch + " "
        else:
            flagged.append(ch)

    if flagged:
        mapped["Flagged"] = " ".join(flagged)
    return mapped

In [None]:
def grade_with_mistral(question_text: str,
                       model_answer: str,
                       student_answer: str,
                       max_marks: int) -> dict:
    """
    Calls `ollama run mistral` and expects JSON { "Marks": number, "Feedback": "..." }.
    Uses proportional scoring based on max_marks and expected completeness.
    """
    prompt = f"""
        You are an exam evaluator.

        Question: {question_text}
        Model Answer: {model_answer}
        Student Answer: {student_answer}
        Max Marks: {max_marks}

        Scoring instructions:
        - Grade strictly out of {max_marks}.
        - Higher-mark questions require more coverage of key points for receiving full marks.
        - Give partial marks proportionally based on correctness, completeness, specificity, and relevance.
        - Reduce some marks if there are factual errors and irrelevant content.
        - If the student's answer is unclear or unreadable, return Marks: 0 and Feedback: "Answer Unclear!!! manual review is required".
        - Respond as strict JSON with keys "Marks" (number) and "Feedback" (string), nothing else.
        JSON:
        """.strip()

    proc = subprocess.run(
        ["ollama", "run", "mistral"],
        input=prompt.encode("utf-8"),
        capture_output=True,
        check=False
    )
    raw = proc.stdout.decode("utf-8", errors="ignore").strip()
    try:
        start = raw.find("{")
        end = raw.rfind("}")
        payload = raw[start:end+1] if start != -1 and end != -1 else raw
        data = json.loads(payload)
        # normalize
        marks = float(data.get("Marks", 0))
        feedback = str(data.get("Feedback", "")).strip()
        # clamp marks
        if marks < 0: marks = 0
        if marks > max_marks: marks = max_marks
        return {"Marks": marks, "Feedback": feedback}
    except Exception:
        return {"Marks": 0.0, "Feedback": "Unclear – manual review required (invalid LLM JSON)."}

In [12]:
DB_PATH = "grades.db"

def init_db(db_path=DB_PATH):
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute("""
    CREATE TABLE IF NOT EXISTS students (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT,
        uploaded_pdf TEXT
    );""")
    cur.execute("""
    CREATE TABLE IF NOT EXISTS answers (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        student_id INTEGER,
        question_number TEXT,
        raw_ocr TEXT,
        mapped_answer TEXT,
        FOREIGN KEY (student_id) REFERENCES students(id)
    );""")
    cur.execute("""
    CREATE TABLE IF NOT EXISTS grades (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        answer_id INTEGER,
        marks REAL,
        feedback TEXT,
        FOREIGN KEY (answer_id) REFERENCES answers(id)
    );""")
    conn.commit()
    conn.close()

init_db()

In [13]:
def insert_student(name: str, pdf_name: str, db_path=DB_PATH) -> int:
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute("INSERT INTO students (name, uploaded_pdf) VALUES (?,?)", (name, pdf_name))
    sid = cur.lastrowid
    conn.commit()
    conn.close()
    return sid

def insert_answer(student_id: int, q_num: str, raw_ocr: str, mapped: str, db_path=DB_PATH) -> int:
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO answers (student_id, question_number, raw_ocr, mapped_answer) VALUES (?,?,?,?)",
        (student_id, q_num, raw_ocr, mapped)
    )
    aid = cur.lastrowid
    conn.commit()
    conn.close()
    return aid

def insert_grade(answer_id: int, marks: float, feedback: str, db_path=DB_PATH):
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO grades (answer_id, marks, feedback) VALUES (?,?,?)",
        (answer_id, marks, feedback)
    )
    conn.commit()
    conn.close()

In [14]:
def load_exam(exam_path: str) -> Tuple[Dict[str, dict], Dict[str, str]]:
    ext = os.path.splitext(exam_path)[1].lower()
    if ext == ".pdf":
        text = read_pdf_text(exam_path)
    elif ext == ".txt":
        text = read_text_file(exam_path)
    else:
        raise ValueError("Unsupported exam file type; use .pdf or .txt")
    return parse_exam(text)

In [15]:
def grade_batch(exam_path: str, student_pdf_paths: list) -> dict:
    questions, model_answers = load_exam(exam_path)
    results = {}
    for pdf_path in student_pdf_paths:
        # 1) OCR
        raw = ocr_pdf_to_text(pdf_path)

        # 2) Map to questions
        mapped = map_answers_to_questions(questions, raw, chunk_size=220, overlap=80, threshold=0.6)

        # 3) Store student
        student_name = os.path.basename(pdf_path)
        sid = insert_student(student_name, student_name)

        # 4) Grade per question
        student_result = {}
        for q_id, q_meta in questions.items():
            student_ans = mapped.get(q_id, "").strip()
            aid = insert_answer(sid, q_id, raw, student_ans)
            graded = grade_with_mistral(q_meta["text"], model_answers[q_id], student_ans, q_meta["marks"])
            insert_grade(aid, graded["Marks"], graded["Feedback"])
            student_result[q_id] = graded

        # optionally capture flagged chunks
        if "Flagged" in mapped and mapped["Flagged"].strip():
            student_result["Flagged"] = mapped["Flagged"].strip()

        results[student_name] = student_result
    return results

In [16]:
exam_path = "./data/exam_files/qna.txt"  # or .txt
student_pdf_paths = [
    "data/student_pdfs/s1.pdf"
]
results = grade_batch(exam_path, student_pdf_paths)
print(json.dumps(results, indent=2))


{
  "s1.pdf": {
    "Q1": {
      "Marks": 4.5,
      "Feedback": "The student has correctly differentiated between e-Government and e-Governance, explaining their definitions, focus, communication, scope, and objectives with examples. However, the stages of e-Governance are explained but not described in detail as per Gartner\u2019s Four-Stage Model."
    },
    "Q2": {
      "Marks": 4.0,
      "Feedback": "The answer provides a good definition of e-Government and outlines some key components when viewed as an Information System. However, it could be more specific about software applications, data and databases, processes and workflows, people and stakeholders, security and governance aspects."
    },
    "Flagged": "to the next time a b What you should be too much to be a good sense of the"
  }
}
