#### ==========================================================
#### üìò RAG-BASED AUTO GRADER 
#### ==========================================================
#### Requirements:
#### pip install sentence-transformers openai PyPDF2 numpy pandas tiktoken tqdm


In [1]:
# !pip install PyPDF2
# !pip install sentence-transformers openai numpy pandas tqdm genai anthropic 
# ! pip install PyPDF2 python-docx python-pptx pandas sentence-transformers tqdm openai


In [2]:

import os, re, json
import pandas as pd
from tqdm import tqdm
from PyPDF2 import PdfReader
from docx import Document
from pptx import Presentation
from sentence_transformers import SentenceTransformer, util
import openai
import numpy as np


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
DATA_PATH = ".././Srinivasan/data/"
CHUNK_SIZE = 550
CHUNK_OVERLAP = 50
EMBED_MODEL = "all-MiniLM-L6-v2"

In [None]:
# ----------- CONFIG -------------



embedder = SentenceTransformer(EMBED_MODEL)

In [5]:
# ==========================================================
# üåê CLIENT INITIALIZATION
# ==========================================================
if ACTIVE_LLM_PROVIDER == "openai":
    # openai is already imported in cell 2
    openai.api_key = OPENAI_API_KEY
    def ask_llm(prompt, model="gpt-4o-mini", temperature=0.2):
        resp = openai.ChatCompletion.create(
            model=model,
            messages=[{"role": "system", "content": "Return STRICT JSON only."},
                      {"role": "user", "content": prompt}],
            temperature=temperature
        )
        return resp["choices"][0]["message"]["content"].strip()

elif ACTIVE_LLM_PROVIDER == "gemini":
    import google.generativeai as genai
    genai.configure(api_key=GOOGLE_API_KEY)
    def ask_llm(prompt, model="gemini-2.0-flash-lite", temperature=0.2):
        model = genai.GenerativeModel(model)
        resp = model.generate_content(prompt)
        return resp.text.strip()

elif ACTIVE_LLM_PROVIDER == "claude":
    import anthropic
    client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
    def ask_llm(prompt, model="claude-3-5-sonnet-20240620", temperature=0.2):
        msg = client.messages.create(
            model=model,
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}],
            temperature=temperature
        )
        return msg.content[0].text.strip()

elif ACTIVE_LLM_PROVIDER == "llama":
    from huggingface_hub import InferenceClient
    client = InferenceClient(token=HUGGINGFACE_TOKEN)

    def ask_llm(prompt, model="meta-llama/Meta-Llama-3-8B-Instruct", temperature=0.2):
        """
        Uses conversational mode since Llama models are registered under that task.
        """
        messages = [
            {"role": "system", "content": "Return STRICT JSON only."},
            {"role": "user", "content": prompt}
        ]
        resp = client.chat_completion(
            model=model,
            messages=messages,
            max_tokens=1024,
            temperature=temperature,
        )
        return resp["choices"][0]["message"]["content"].strip()


elif ACTIVE_LLM_PROVIDER == "copilot":
    # Example for Azure OpenAI / GitHub Copilot-like API
    import openai
    openai.api_type = "azure"
    openai.api_key = AZURE_API_KEY
    openai.api_base = "https://your-azure-endpoint.openai.azure.com"
    openai.api_version = "2024-03-01-preview"
    def ask_llm(prompt, model="gpt-4", temperature=0.2):
        resp = openai.ChatCompletion.create(
            engine=model,
            messages=[{"role": "system", "content": "Return STRICT JSON only."},
                      {"role": "user", "content": prompt}],
            temperature=temperature
        )
        return resp["choices"][0]["message"]["content"].strip()

else:
    raise ValueError(f"Unsupported provider: {ACTIVE_LLM_PROVIDER}")

print(f"‚úÖ Active LLM provider: {ACTIVE_LLM_PROVIDER}")

‚úÖ Active LLM provider: openai


In [6]:
# ---------- GENERIC TEXT EXTRACTORS ----------
def extract_text_from_pdf(path):
    reader = PdfReader(path)
    return "\n".join([p.extract_text() or "" for p in reader.pages])

def extract_text_from_docx(path):
    doc = Document(path)
    return "\n".join([p.text for p in doc.paragraphs if p.text.strip()])

def extract_text_from_pptx(path):
    prs = Presentation(path)
    text = []
    for s in prs.slides:
        for sh in s.shapes:
            if hasattr(sh, "text"):
                text.append(sh.text)
    return "\n".join(text)

def extract_text_from_csv(path):
    df = pd.read_csv(path)
    return " ".join(df.astype(str).fillna("").values.flatten())

def extract_text_from_txt(path):
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()

In [7]:
# ---------- UNIVERSAL LOADER ----------
def extract_text_from_any(path):
    ext = os.path.splitext(path)[1].lower()
    if ext == ".pdf":
        return extract_text_from_pdf(path)
    elif ext == ".csv":
        return extract_text_from_csv(path)
    elif ext == ".docx":
        return extract_text_from_docx(path)
    elif ext == ".pptx":
        return extract_text_from_pptx(path)
    elif ext == ".txt":
        return extract_text_from_txt(path)
    else:
        print(f"‚ö†Ô∏è Unsupported file type: {path}")
        return ""

In [8]:
# -------------------- CHUNKING -----------------------------
def chunk_text(text, chunk_size=550, overlap=50):
    tokens = re.split(r'\s+', text)
    chunks = []
    for i in range(0, len(tokens), chunk_size - overlap):
        chunk = " ".join(tokens[i:i + chunk_size])
        if chunk.strip():
            chunks.append(chunk)
    return chunks

In [9]:
def load_documents(path, chunk_size=550, overlap=50):
    all_chunks = []

    if os.path.isfile(path):
        # Handle single file
        text = extract_text_from_any(path)
        chunks = chunk_text(text, chunk_size, overlap)
        for c in chunks:
            all_chunks.append({"source": os.path.basename(path), "content": c})
        return all_chunks

    elif os.path.isdir(path):
        # Handle folder with multiple files
        for file in os.listdir(path):
            fpath = os.path.join(path, file)
            if os.path.isfile(fpath):
                text = extract_text_from_any(fpath)
                chunks = chunk_text(text, chunk_size, overlap)
                for c in chunks:
                    all_chunks.append({"source": file, "content": c})
        return all_chunks

    else:
        print(f"‚ö†Ô∏è Invalid path: {path}")
        return []

In [10]:
# -------------------- LOAD RUBRIC -----------------------


def load_rubric_text(path="./data/rubric.csv"):
    ext = os.path.splitext(path)[1].lower()
    if ext == ".csv":
        df = pd.read_csv(path).fillna("")
        return "\n".join([
            f"{r.Criterion} | {r.Weight} | {r.Level} | {r.Description}"
            for r in df.itertuples()
        ])
    return extract_text_from_any(path)

In [11]:
# ==========================================================
# RETRIEVAL
# ==========================================================
notes_docs = load_documents(os.path.join(DATA_PATH, "notes.pdf"))
rubric_text = load_rubric_text(os.path.join(DATA_PATH, "rubric.csv"))
all_texts = [d["content"] for d in notes_docs]
embeddings = embedder.encode(all_texts, convert_to_tensor=True)



In [12]:
# -------------------- RETRIEVAL ----------------------------
def retrieve_context(query, top_k=3):
    q_emb = embedder.encode(query, convert_to_tensor=True)
    hits = util.semantic_search(q_emb, embeddings, top_k=top_k)[0]
    return [notes_docs[h["corpus_id"]] for h in hits]

def prepare_indexed_context(retrieved):
    return "\n\n".join([f"[R{i}] {r['content']}" for i, r in enumerate(retrieved, 1)])


In [13]:
# ==========================================================
# COVERAGE CHECK (GROUNDING)
# ==========================================================
def split_sentences(txt):
    return [s.strip() for s in re.split(r'(?<=[.!?])\s+', txt) if s.strip()]

def coverage_check(answer, retrieved, sim_threshold=0.5):
    sents = split_sentences(answer)
    if not sents or not retrieved:
        return 0.0, sents
    retr_texts = [r["content"] for r in retrieved]
    retr_emb = embedder.encode(retr_texts, convert_to_tensor=True)
    sent_emb = embedder.encode(sents, convert_to_tensor=True)
    unsupported = []
    for i, e in enumerate(sent_emb):
        max_sim = float(util.cos_sim(e, retr_emb).max().cpu().item())
        if max_sim < sim_threshold:
            unsupported.append(sents[i])
    ratio = 0.0 if not sents else (len(sents)-len(unsupported))/len(sents)
    return ratio, unsupported

In [14]:
# -------------------- PROMPT BUILDER -----------------------
# ==========================================================
# PROMPT (STRICTLY GROUNDED + STRICTNESS CONTROL)
# ==========================================================
def build_prompt(question, answer, context, rubric_text, max_score=50, strictness=3):
    """
    Builds a balanced, notes-grounded grading prompt.
    """


    return f"""
You are an academic auto-grader evaluating a student's reflection response.


### REFERENCE MATERIAL (from course notes)
Use this content as the authoritative source when grading. Only grade ideas that are supported by this material:
{context}

### INSTRUCTOR RUBRIC (from file)
Use this rubric exactly as written. The criteria, weights, and levels define how grading should be done:
{rubric_text}

### QUESTION
{question}

### STUDENT ANSWER
{answer}

---

### YOUR TASK
Grade the student's answer based **only** on the above notes and rubric.

1. **Relevance:** If the student's content does not appear in the notes, treat it as off-topic or unsupported, based on the strictness level.
2. **Per Criterion Evaluation:**
   - Identify the rubric criterion name.
   - Assign a "score" between 0‚Äì5.
   - Write a short "comments" paragraph (1‚Äì2 sentences) specific to the topic, explaining what was good or missing.
   - Avoid generic phrases like ‚Äúgood job‚Äù or ‚Äúneeds more detail.‚Äù Instead, reference the actual content (e.g., Dakota, land, sacred power, geography, identity, etc.).
3. **Overall Feedback:**
   - Compute the overall grade according to the rubric‚Äôs weights and levels.
   - Provide a short "feedback_summary" (2‚Äì3 sentences) summarizing performance.
   - Mention specific areas of strength or improvement related to the question.
4. **Unsupported Content:**
   - List exact sentences or ideas from the student‚Äôs answer that are **not supported by the notes**.
5. **Correct Answer Retrieval:**
   - From the provided notes, find and quote or paraphrase the most relevant passage that represents the correct answer. 
   - Include its source name or page number if visible.

---

### OUTPUT FORMAT (STRICT JSON ONLY)
Return only a valid JSON object in this structure:

{{
  "criteria": [
    {{
      "criterion": "<criterion name from rubric>",
      "score": 4,
      "comments": "Shows clear understanding of the Dakota concept of sacred geography, but lacks examples from the notes."
    }},
    {{
      "criterion": "<criterion name from rubric>",
      "score": 5,
      "comments": "Well written, clear, and supported by the course material."
    }}
  ],
  "unsupported_claims": [
    "The Dakota people worshipped in temples."
  ],
  "final_score": 0,
  "max_score": {max_score},
  "feedback_summary": "Good comprehension and structure, though some claims are not supported by notes.",
  "correct_answer": {{
    "source": "notes.pdf page 3",
    "content": "The 'sacred power of place' refers to the Dakota belief that land itself is sacred and embodies memory and identity..."
  }}
}}

### RULES
- Output **JSON only** ‚Äî no markdown, no explanations.
- Follow the **strictness level** when deciding leniency or harshness.
- Always include `"criterion"`, `"score"`, `"comments"` for each rubric section.
- Always provide `"correct_answer"` with source and content.
"""


In [15]:
# -------------------- SAFE JSON PARSER -----------------------

import json, re

def safe_json_parse(raw_output: str):
    """
    Safely parse possibly malformed JSON output from an LLM.
    Tries multiple cleaning strategies automatically.
    """
    if not raw_output or not raw_output.strip():
        raise ValueError("Empty response from model ‚Äî no JSON returned.")

    candidates = [raw_output]

    # remove markdown fences
    candidates.append(re.sub(r"```(json)?", "", raw_output).strip())

    # replace single quotes with double quotes cautiously
    candidates.append(re.sub(r"'", '"', raw_output))

    # remove trailing commas
    candidates.append(re.sub(r",\s*([}\]])", r"\1", raw_output))

    for text in candidates:
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            continue

    # final fallback: extract possible JSON substring
    match = re.search(r"\{.*\}", raw_output, re.S)
    if match:
        try:
            return json.loads(match.group(0))
        except json.JSONDecodeError:
            pass

    print("‚ö†Ô∏è Could not parse model JSON output:\n", raw_output[:600])
    raise json.JSONDecodeError("LLM output not valid JSON", raw_output, 0)


In [16]:
myPrompt = ""

def grade_answer(question, answer, max_score=50, top_k=3, sim_threshold=0.5, strictness=3):
    """
    Notes-grounded grading (balanced):
    - Grades student answer using rubric + note context.
    - Lists unsupported claims.
    - Returns the best-matched 'correct answer' snippet from notes.
    - Keeps JSON clean and human-readable.
    """

    # --- Retrieve note chunks related to question ---
    query = f"{question} {answer}"
    retrieved = retrieve_context(query, top_k=top_k)
    context = prepare_indexed_context(retrieved)

    # --- Build prompt for the LLM ---
    prompt = build_prompt(
        question=question,
        answer=answer,
        context=context,
        rubric_text=rubric_text,
        max_score=max_score,
        strictness = strictness
    )
    myPrompt = prompt

    # --- Ask the LLM to grade ---
    raw_output = ask_llm(prompt)

    # --- Parse JSON safely ---
    data = safe_json_parse(raw_output)

    # --- Normalize per-criterion fields ---
    for i, c in enumerate(data.get("criteria", []), 1):
        if "criterion" not in c:
            c["criterion"] = (
                c.get("criteria_name")
                or c.get("name")
                or f"Criterion {i}"
            )
        try:
            c["score"] = float(c.get("score", 0))
        except:
            c["score"] = 0.0
        c["score"] = max(0.0, min(5.0, c["score"]))
        if "comments" not in c:
            c["comments"] = ""

    # --- Parse rubric weights dynamically ---
    weights = {}
    for line in rubric_text.splitlines():
        parts = [p.strip() for p in line.split("|")]
        if len(parts) >= 3:
            crit = parts[0].split("(")[0].strip()
            try:
                w = float(parts[1])
                if 0 <= w <= 1.0:
                    weights[crit] = w
            except:
                pass
    if not weights and data.get("criteria"):
        eq = 1.0 / len(data["criteria"])
        for c in data["criteria"]:
            base = c["criterion"].split("(")[0].strip()
            weights[base] = eq

    # --- Compute weighted total (no strict penalty) ---
    total_weight, weighted_sum = 0.0, 0.0
    for c in data.get("criteria", []):
        base = c["criterion"].split("(")[0].strip()
        w = weights.get(base, 1.0 / len(data["criteria"]))
        total_weight += w
        weighted_sum += (c["score"] / 5.0) * w

    final_score = round((weighted_sum / total_weight) * max_score, 2) if total_weight else 0.0

    # --- Identify unsupported claims using embeddings ---
    unsupported = []
    answer_sents = split_sentences(answer)
    retr_texts = [r["content"] for r in retrieved]
    retr_emb = embedder.encode(retr_texts, convert_to_tensor=True)
    sent_emb = embedder.encode(answer_sents, convert_to_tensor=True)
    for i, e in enumerate(sent_emb):
        max_sim = float(util.cos_sim(e, retr_emb).max().cpu().item())
        if max_sim < sim_threshold:
            unsupported.append(answer_sents[i])

    # --- Retrieve 'correct answer' snippet from notes ---
    q_emb = embedder.encode(question, convert_to_tensor=True)
    sims = util.cos_sim(q_emb, retr_emb)[0].cpu().tolist()
    best_idx = int(max(range(len(sims)), key=lambda i: sims[i]))
    best_snippet = retrieved[best_idx]["content"].strip()
    source_name = retrieved[best_idx]["source"]

    # --- Compose final clean JSON output ---
    output = {
        "criteria": [
            {
                "criterion": c["criterion"],
                "score": c["score"],
                "comments": c["comments"],
            }
            for c in data.get("criteria", [])
        ],
        "unsupported_claims": unsupported,
        "final_score": min(max_score, round(final_score, 2)),
        "max_score": max_score,
        "feedback_summary": data.get(
            "feedback_summary",
            "Evaluation based on provided notes and rubric. Unsupported statements noted."
        ),
        "correct_answer": {
            "source": source_name,
            "content": best_snippet[:700] + ("..." if len(best_snippet) > 700 else "")
        }
    }

    return output


In [25]:
question = "Why is land considered central to the religion and cultural identity of the Dakota people?"
answer = "my name is devendran"
result = grade_answer(question, answer, max_score=50, top_k=3, sim_threshold=0.5)
final_result = json.dumps(result, indent=2)
print(final_result)


{
  "criteria": [
    {
      "criterion": "Critical Analysis (understanding of course materials)",
      "score": 0.0,
      "comments": "The response does not engage with the course materials or address the question regarding the centrality of land to the Dakota people's religion and cultural identity."
    },
    {
      "criterion": "Academic and Scholarly Presentation",
      "score": 0.0,
      "comments": "The response lacks clarity and is not related to the question, failing to communicate any ideas effectively."
    },
    {
      "criterion": "Portrays Insight (follows instructional questions)",
      "score": 0.0,
      "comments": "There is no engagement with the instructional questions or course material, resulting in an unacceptable response."
    }
  ],
  "unsupported_claims": [
    "my name is devendran"
  ],
  "final_score": 0.0,
  "max_score": 50,
  "feedback_summary": "The response does not address the question or utilize course materials, resulting in a failing grad

In [18]:
%pip install withpi

from withpi import PiClient


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [23]:
scoring_spec = [
    # --- Output Quality & Correctness ---
    {"question": "Does the response follow the specified JSON schema (criteria, scores, comments, etc.)?"},
    {"question": "Is the 'final_score' correctly scaled according to the rubric and max_score?"},
    {"question": "Are all rubric criteria evaluated and present in the output?"},
    {"question": "Do the per-criterion scores and comments align logically with the rubric description?"},
    {"question": "Does the response summarize the student‚Äôs strengths and weaknesses accurately?"},

    # --- Relevance & Grounding ---
    {"question": "Is the evaluation grounded in the reference notes, not external knowledge?"},
    {"question": "Are unsupported claims correctly identified and listed?"},
    {"question": "Does the grader avoid giving credit to off-topic or irrelevant content?"},
    {"question": "Does the grader correctly recognize content that directly appears in the notes?"},

    # --- Tone & Feedback Style ---
    {"question": "Is the feedback written in a professional, academic tone?"},
    {"question": "Are the comments constructive and informative rather than generic?"},
    {"question": "Is the feedback balanced ‚Äî neither too lenient nor excessively harsh?"},

    # # --- Strictness Level Compliance ---
    # {"question": "Does the evaluation reflect the specified strictness level (1‚Äì5)?"},
    # {"question": "If strictness is high (‚â•4), are unsupported or vague answers penalized appropriately?"},
    # {"question": "If strictness is low (‚â§2), is the grader more lenient toward minor missing details?"},
    # {"question": "Does the strictness behavior match the expected tone in the prompt instructions?"},

    # --- Overall Response Validity ---
    {"question": "Does the grading fulfill all requirements stated in the prompt?"},
    {"question": "Does the overall result seem consistent, fair, and reliable for the given input?"},
]


In [24]:

import os
os.environ["WITHPI_API_KEY"] = "sk_d24f2baee781461b85d70c40fe4eedbd"

query = f"{question} {answer}"
retrieved = retrieve_context(query, top_k=3)
context = prepare_indexed_context(retrieved)

    # --- Build prompt for the LLM ---
myPrompt = build_prompt(
        question=question,
        answer=answer,
        context=context,
        rubric_text=rubric_text,
        max_score=50,
)

pi = PiClient()
scores = pi.scoring_system.score(
  llm_input= myPrompt,
  llm_output=final_result,
  scoring_spec=scoring_spec
)
print('Total Score:', scores.total_score)
print('Question Scores:', scores.question_scores)



Total Score: 0.605
Question Scores: {'Are all rubric criteria evaluated and present in the output?': 0.6289, 'Are the comments constructive and informative rather than generic?': 0.6211, 'Are unsupported claims correctly identified and listed?': 0.5078, 'Do the per-criterion scores and comments align logically with the rubric description?': 0.7461, 'Does the grader avoid giving credit to off-topic or irrelevant content?': 0.5234, 'Does the grader correctly recognize content that directly appears in the notes?': 0.5859, 'Does the grading fulfill all requirements stated in the prompt?': 0.6133, 'Does the overall result seem consistent, fair, and reliable for the given input?': 0.582, 'Does the response follow the specified JSON schema (criteria, scores, comments, etc.)?': 0.6719, 'Does the response summarize the student‚Äôs strengths and weaknesses accurately?': 0.6094, "Is the 'final_score' correctly scaled according to the rubric and max_score?": 0.4102, 'Is the evaluation grounded in 