In [161]:
import os
from dotenv import load_dotenv
from openai import OpenAI
import base64
import json
import sympy as sp
load_dotenv()
import re
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

In [162]:
from typing import TypedDict, Optional, Dict, Any
class GradingState(TypedDict, total=False):
    question_image: str
    rubric_image: str
    student_image: str

    question_json: Optional[Dict[str, Any]]
    rubric_json: Optional[Dict[str, Any]]
    student_json: Optional[Dict[str, Any]]
    sympy_result: Optional[Dict[str, Any]]
    final_grade: Optional[Dict[str, Any]]

In [163]:
def ocr_question(state):
    path = state["question_image"]
    with open(path, "rb") as f:
        b64 = base64.b64encode(f.read()).decode("utf-8")
    resp = client.responses.create(
        model="gpt-4o-mini",
        temperature= 0,
        input=[
            {
                "role": "user",
                "content": [
                    {"type": "input_text", "text": "Please OCR this image and return JSON with two keys: problem_text, math_expressions. Output should be SymPy-compatible format. Do NOT nest math_expressions inside another object. Do NOT create a second-level key named math_expressions."},
                    {"type": "input_image", "image_url": f"data:image/jpeg;base64,{b64}"},
                ],
            }
        ]
    )
    text = resp.output_text
    print("-"*30)
    print("OCR question")
    print("-"*30)
    print()
    print(text)
    text = re.sub(r"^```json\s*", "", text)
    text = re.sub(r"^```\s*", "", text)
    text = re.sub(r"\s*```$", "", text)
    question_json = json.loads(text)
    return {"student_json": question_json}
    return {"question_json": text}

In [164]:
def ocr_rubric(state):
    path = state["rubric_image"]
    with open(path, "rb") as f:
        b64 = base64.b64encode(f.read()).decode("utf-8")
    resp = client.responses.create(
        model="gpt-4o-mini",
        temperature= 0,
        input=[
            {
                "role": "user",
                "content": [
                    {"type": "input_text", "text": "Please OCR this image and return JSON with keys: answer_reasoning, math_expressions. Output should be SymPy-compatible format. Do NOT nest math_expressions inside another object. Do NOT create a second-level key named math_expressions."},
                    {"type": "input_image", "image_url": f"data:image/jpeg;base64,{b64}"},
                ],
            }
        ]
    )
    text = resp.output_text
    print("-"*30)
    print("OCR correct answer")
    print("-"*30)
    print()
    print(text)
    text = re.sub(r"^```json\s*", "", text)
    text = re.sub(r"^```\s*", "", text)
    text = re.sub(r"\s*```$", "", text)
    rubric_json = json.loads(text)
    return {"student_json": rubric_json}

In [165]:
def ocr_student(state):
    path = state["student_image"]
    with open(path, "rb") as f:
        b64 = base64.b64encode(f.read()).decode("utf-8")
    resp = client.responses.create(
        model="gpt-4o-mini",
        temperature= 0,
        input=[
            {
                "role": "user",
                "content": [
                    {"type": "input_text", "text": "Please OCR this image and return JSON with keys: student_reasoning, math_expressions. Output should be SymPy-compatible format. Do NOT nest math_expressions inside another object. Do NOT create a second-level key named math_expressions."},
                    {"type": "input_image", "image_url": f"data:image/jpeg;base64,{b64}"},
                ],
            }
        ]
    )
    text = resp.output_text
    print("-"*30)
    print("OCR student answer")
    print("-"*30)
    print(text)
    text = re.sub(r"^```json\s*", "", text)
    text = re.sub(r"^```\s*", "", text)
    text = re.sub(r"\s*```$", "", text)
    student_json = json.loads(text)
    return {"student_json": student_json}

In [166]:
def sympy_verify(state):
    student_json = state.get("student_json", {})
    expressions = student_json.get("math_expressions", [])

    results = {
        "correct": [],
        "incorrect": []
    }

    for expr in expressions:
        try:
            if "=" in expr:
                left, right = expr.split("=")
                left_sym = sp.sympify(left.strip())
                right_sym = sp.sympify(right.strip())
                diff = sp.simplify(left_sym - right_sym)
                if diff == 0:
                 results["correct"].append({
                    "expression": expr,
                    "reason": "both sides are equal."
                    })
                else:
                    results["incorrect"].append({
                        "expression": expr,
                        "reason": f"Simplification gives {diff}, not 0." # just an example to test whether there are any calculation error.
                    })
        except Exception as e:
            continue


    print("-"*30)
    print("sympy_result")
    print("-"*30)
    print(results)

    return {"sympy_result": results}

In [167]:
def llm_grade(state):

    question_json = state.get("question_json")
    rubric_json = state.get("rubric_json")
    student_json = state.get("student_json")
    sympy_result = state.get("sympy_result")

    prompt = f"""
You are a precise grading assistant.

You are given:

QUESTION:
{json.dumps(question_json)}

RUBRIC:
{json.dumps(rubric_json)}

STUDENT ANSWER:
{json.dumps(student_json)}

SYMPY CHECK RESULT:
{json.dumps(sympy_result)}

Task:
- Determine whether the student solution is correct.
- Consider reasoning clarity and rubric requirements.
- Assign a score (full credit: 10) according to the rubric.

Return STRICT RAW JSON with exactly this structure:

{{
  "is_correct": boolean,
  "score": number,
  "feedback": string
}}
"""

    resp = client.responses.create(
        model="gpt-4o-mini",
        temperature=0,
        input=prompt
    )

    text = resp.output_text

    print("-" * 30)
    print("LLM FINAL GRADE")
    print("-" * 30)
    print(text)

    text = re.sub(r"^```json\s*", "", text)
    text = re.sub(r"^```\s*", "", text)
    text = re.sub(r"\s*```$", "", text)

    final_grade = json.loads(text)

    return {"final_grade": final_grade}


In [168]:
from langgraph.graph import StateGraph, END
builder = StateGraph(GradingState)

builder.add_sequence(
    [
        ("ocr_question", ocr_question),
        ("ocr_rubric", ocr_rubric),
        ("ocr_student", ocr_student),
        ("sympy_verify", sympy_verify),
        ("llm_grade", llm_grade),
    ]
)

builder.set_entry_point("ocr_question")
builder.add_edge("llm_grade", END)

graph = builder.compile()
result = graph.invoke(
    {
        "question_image": "question.jpg",
        "rubric_image": "rubric.jpg",
        "student_image": "student_answer.jpg",
    }
)

------------------------------
OCR question
------------------------------

```json
{
  "problem_text": "Consider a linear program with decision variables x1 and x2 and constraints:\n\n-x1 + 3*x2 <= 30\n-3*x1 + x2 <= 30\nx1 >= 0, x2 >= 0.\n\n(a) Is the feasible region unbounded? Explain.",
  "math_expressions": [
    "-x1 + 3*x2 <= 30",
    "-3*x1 + x2 <= 30",
    "x1 >= 0",
    "x2 >= 0"
  ]
}
```
------------------------------
OCR correct answer
------------------------------

```json
{
  "answer_reasoning": "The feasible region is unbounded. Because the coefficients of x1 in the inequality constraints are negative (specifically -1 and -3), increasing x1 actually makes the left-hand side smaller, ensuring the inequalities â‰¤ 30 remain satisfied for any arbitrarily large value of x1 as long as x2 is kept at a constant small value (like 0).",
  "math_expressions": [
    "-x1 + 3*x2 <= 30",
    "3*x2 <= x1 + 30",
    "x2 <= 3*x1 + 30",
    "x1 >= 0",
    "x2 >= 0",
    "-x1 + 3*0 <= 30