# Step 4: Scoring Preprocessing
Extract handwritten responses from scanned sheets, run OCR, auto-grade with Gemini, and generate per-question review pages for manual checks.

In [243]:
from grading_utils import setup_paths, create_directories

prefix = "VTC Test"
paths = setup_paths(prefix, "sample")

# Extract commonly used paths
pdf_file = paths["pdf_file"]
name_list_file = paths["name_list_file"]
marking_scheme_file = paths["marking_scheme_file"]
standard_answer = marking_scheme_file

Extract cache for sample

In [None]:
! cd .. && tar -xzvf cache.tar.gz

In [245]:
from grading_utils import init_gemini_client

# Initialize Gemini client
client = init_gemini_client()

✓ Vertex AI Express Mode initialized


## Setup Vertex AI Express Mode with API Key

This notebook now uses **Vertex AI Express Mode** with API key authentication instead of OAuth/ADC.

**Steps to get your API key:**
1. Visit https://aistudio.google.com/apikey
2. Create or select your API key
3. Copy the API key and add it to the `.env` file in the parent directory:
   ```
   GOOGLE_GENAI_API_KEY=your-actual-api-key-here
   ```

**Benefits of Express Mode:**
- ✓ Simpler authentication (just an API key)
- ✓ No need for gcloud CLI authentication
- ✓ No service account JSON files
- ✓ Easy to use in notebooks and scripts

In [246]:
import os

# Extract paths from setup
file_name = paths["file_name"]
base_path = paths["base_path"]
base_path_images = paths["base_path_images"]
base_path_annotations = paths["base_path_annotations"]
base_path_questions = paths["base_path_questions"]
base_path_javascript = paths["base_path_javascript"]

# Create all necessary directories
create_directories(paths)

In [247]:
from grading_utils import load_annotations

annotations_path = base_path_annotations + "annotations.json"
annotations_list, annotations_dict, questions_from_annotations = load_annotations(annotations_path)

In [248]:
# Use questions from loaded annotations
questions = questions_from_annotations

# Extract question_with_answer (excludes NAME, ID, CLASS)
question_with_answer = [q for q in questions if q not in ["NAME", "ID", "CLASS"]]
questions

['NAME', 'ID', 'CLASS', 'Q1', 'Q2', 'Q3', 'Q4', 'Q5']

## Validate Provided Standard Answer for each question

In [249]:
## Load data directly from Marking Scheme Excel
import pandas as pd


# Try to load Name List sheet from name list file
try:
    name_list_df = pd.read_excel(name_list_file, sheet_name="Name List")
    print(f"✓ Loaded Name List from: {name_list_file}")
except Exception as e:
    print(f"⚠️ Failed to load Name List: {e}")
    name_list_df = None

# Load Marking Scheme sheet - this contains all the data needed
marking_scheme_df = pd.read_excel(standard_answer, sheet_name="Marking Scheme")
print(f"✓ Loaded Marking Scheme directly")
print(f"  Columns: {list(marking_scheme_df.columns)}")

# Create Answer sheet dictionary for backward compatibility
# Map question_number to marking_scheme for standard_answer lookup
standard_answer_df = marking_scheme_df[['question_number', 'question_text', 'marking_scheme', 'marks']].copy()
standard_answer_df.columns = ['Question', 'QuestionText', 'Answer', 'Mark']

print(f"✓ Prepared data for scoring")
standard_answer_df.head()

✓ Loaded Name List from: ../sample/VTC Test Name List.xlsx
✓ Loaded Marking Scheme directly
  Columns: ['question_number', 'question_text', 'marking_scheme', 'marks']
✓ Prepared data for scoring


Unnamed: 0,Question,QuestionText,Answer,Mark
0,Q1,The VTC is the largest provider of VPET in Hon...,- **Definition (2 marks)**: Correctly stating ...,10
1,Q2,Compare IVE (Hong Kong Institute of Vocational...,- **IVE Qualification (5 marks)**: Correctly i...,10
2,Q3,"VTC emphasizes the ""Think and Do"" approach. Ex...","- **""Think"" Component (3 marks)**: Explaining ...",10
3,Q4,If a Secondary 6 student does not achieve the ...,- **Programme Identification (5 marks)**: Corr...,10
4,Q5,Why does the VTC collaborate closely with indu...,- **General Rationale (4 marks)**: Explaining ...,10


Covert Question to str

In [250]:
standard_answer_df["Question"] = standard_answer_df["Question"].astype(str)

In [251]:
from termcolor import colored

# check question_with_answer in standard_answer_df Question column
for question in question_with_answer:
    if question not in standard_answer_df["Question"].values:
        print(colored("Question {} is not in standard_answer!".format(question), 'red'))

for question in standard_answer_df["Question"].values:
    if question not in question_with_answer:
        print(colored("Question {} is not in annotations!".format(question), 'red'))
            

In [252]:
standard_question_text = standard_answer_df.set_index("Question").to_dict()["QuestionText"]
standard_answer = standard_answer_df.set_index("Question").to_dict()["Answer"]
standard_answer

{'Q1': '- **Definition (2 marks)**: Correctly stating "**Vocational and Professional Education and Training**".\n- **Focus (4 marks)**: Explaining that it focuses on **practical skills** or **specialized trades**.\n- **Workforce Impact (4 marks)**: Explaining the benefit to the workforce (e.g., **reducing skills gap**, **employment readiness**, supporting the economy).\n\n**Grading Note**: Apply the 0-10 scale rubric based on the clarity of explanation and use of terminology.\n\n---\n\n**General Grading Guide:**\n### General Rubric for Partial Marks (0-10 Scale)\n\n- **9-10 marks**: The answer is complete, accurate, uses correct terminology, and is well-explained.\n- **6-8 marks**: The answer is mostly correct but misses a specific detail (e.g., forgets the full name of a diploma) or the explanation is slightly vague.\n- **3-5 marks**: The student shows basic understanding but misses the core point or only answers half the question.\n- **0-2 marks**: The answer is largely incorrect, ir

In [253]:
standard_mark = standard_answer_df.set_index("Question").to_dict()["Mark"]
standard_mark

{'Q1': 10, 'Q2': 10, 'Q3': 10, 'Q4': 10, 'Q5': 10}

Check for the regeneration of question.

In [254]:
import os
import json

questionAndControl = {}
for path, currentDirectory, files in os.walk(base_path_questions):
    for file in files:
        if file == "control.json":
            question = path[len(base_path_questions) + 1 :]
            f = open(os.path.join(path, file))
            data = json.load(f)
            if "regenerate" in data:
                questionAndControl[question] = data
            f.close()

questionAndControl

{}

In [255]:
import shutil
import os

from_directory = os.path.join(os.getcwd(), "..","templates", "javascript")
shutil.copytree(from_directory, base_path_javascript, dirs_exist_ok=True)
ico = os.path.join(os.getcwd(), "..","templates", "favicon.ico")
# copy ico file  to base_path
shutil.copyfile(ico, base_path+"/favicon.ico")

'../marking_form/VTC Test/favicon.ico'

Generate the index.html

In [256]:
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
import markdown

file_loader = FileSystemLoader("../templates")
env = Environment(loader=file_loader)

# Add markdown filter
def markdown_filter(text):
    if text is None:
        return ""
    return markdown.markdown(text)

env.filters['markdown'] = markdown_filter

template = env.get_template("index.html")

output = template.render(
    studentsScriptFileName=file_name,
    textAnswer=questions,
    optionAnswer=[],
)
# open text file
path = Path(os.path.join(base_path, "index.html"))
text_file = open(path, "w")
text_file.write(output)
text_file.close()

In [257]:
from grading_utils import create_gemini_config

def ocr(prompt: str, filePath: str):
    """
    OCR function using Vertex AI Express Mode
    
    Args:
        prompt: The prompt describing what to extract
        filePath: Path to the image file
    
    Returns:
        Extracted text as string
    """
    # Read the image file
    with open(filePath, "rb") as f:
        data = f.read()
    
    # Create configuration
    config = create_gemini_config(
        temperature=0,
        top_p=0.5,
        max_output_tokens=4096,
    )
    
    # Generate content
    response = client.models.generate_content(
        model="gemini-3-flash-preview",
        contents=[
            {
                "role": "user",
                "parts": [
                    {"inline_data": {"mime_type": "image/png", "data": data}},
                    {"text": prompt}
                ]
            }
        ],
        config=config,
    )
    
    # Return extracted text (Gemini returns text in response.text)
    if response.text:
        return response.text.strip()
    return ""


In [258]:
import tempfile
from PIL import Image, ImageEnhance
import hashlib
import json
import os

# Initialize cache directory
cache_dir = "../cache"
os.makedirs(cache_dir, exist_ok=True)

def get_cache_key(cache_type, **params):
    """Generate cache key including model parameters
    
    Args:
        cache_type: Type of cache (e.g., 'ocr', 'grade_answer', 'grade_moderator')
        **params: Parameters to include in cache key
    
    Returns:
        Tuple of (cache_type, hash_key) for folder-based caching
    """
    # Create a dictionary of all parameters for hashing
    key_data = {
        "type": cache_type,
        **params
    }
    # Convert to JSON string for hashing
    key_str = json.dumps(key_data, sort_keys=True)
    # Return tuple of cache_type and hash for folder structure
    hash_key = hashlib.sha256(key_str.encode()).hexdigest()
    return (cache_type, hash_key)

def get_from_cache(cache_key):
    """Retrieve result from cache using folder structure
    
    Args:
        cache_key: Tuple of (cache_type, hash_key)
    
    Returns:
        Cached data or None if not found
    """
    cache_type, hash_key = cache_key
    cache_subdir = os.path.join(cache_dir, cache_type)
    cache_file = os.path.join(cache_subdir, f"{hash_key}.json")
    
    if os.path.exists(cache_file):
        try:
            with open(cache_file, 'r') as f:
                return json.load(f)
        except:
            return None
    return None

def save_to_cache(cache_key, data):
    """Save result to cache using folder structure
    
    Args:
        cache_key: Tuple of (cache_type, hash_key)
        data: Data to cache
    """
    cache_type, hash_key = cache_key
    cache_subdir = os.path.join(cache_dir, cache_type)
    
    # Create subdirectory if it doesn't exist
    os.makedirs(cache_subdir, exist_ok=True)
    
    cache_file = os.path.join(cache_subdir, f"{hash_key}.json")
    try:
        with open(cache_file, 'w') as f:
            json.dump(data, f)
    except Exception as e:
        print(f"Warning: Failed to save cache - {e}")

def ocr_image_from_file(question, image_path, left, top, width, height):
    if question == "NAME" :
        return ""
    
    imageFile = tempfile.NamedTemporaryFile(suffix=".png").name
    with Image.open(image_path) as im:
        # The crop method from the Image module takes four coordinates as input.
        # The right can also be represented as (left+width)
        # and lower can be represented as (upper+height).
        (left, top, right, lower) = (
            left,
            top,
            left + width,
            top + height,
        )
        # Here the image "im" is cropped and assigned to new variable im_crop
        im_crop = im.crop((left, top, right, lower))
        imageEnhance = ImageEnhance.Sharpness(im_crop)
        # showing resultant image
        im_crop = imageEnhance.enhance(3)
        im_crop.save(imageFile, format="png")
        
    if question == "ID" :
        text_message = """
            Extract text in this image.
            It is a Student ID in 9 digit number.
            Return only the 9-digit Student ID with no other words, no bullets, and no numbering.
            Strip whitespace. If you cannot extract Student ID, please return 'No text found!!!'.
            """
    elif question == "CLASS":
        text_message = """
            Extract the class code from this image. It is printed/computer text, typically letters and numbers (e.g., "3A", "S1", "Class B").
            Return only the class value with no other words, no bullets, and no numbering. Strip whitespace.
            If you cannot extract the class value, please return 'No text found!!!'.
            """
    else:    
        text_message ="""
            Extract only the handwritten text from this image (English, numbers, math symbols, and common punctuation).
            Ignore printed or pre-printed computer text, headers, labels, or barcodes; capture handwriting only.
            Preserve the student's original formatting: keep existing line breaks and any numbering or bullets that appear in the handwriting (e.g., "1.", "2.", "- "). Do not introduce new numbering or bullets.
            Return exactly the extracted handwritten text with no added labels, summaries, paraphrasing, or translation.
            Strip leading/trailing whitespace on each line. If you cannot extract text, please return 'No text found!!!'."""       

    try:
        # Compute file hash for cache key
        with open(imageFile, 'rb') as f:
            file_hash = hashlib.sha256(f.read()).hexdigest()
        
        # Check cache
        cache_key = get_cache_key("ocr", model="gemini-3-flash-preview", prompt=text_message, image_hash=file_hash, temperature=0, top_p=0.5)
        cached_result = get_from_cache(cache_key)
        
        if cached_result is not None:
            print(f"[CACHE] {question} {image_path} {cached_result}")
            ocr_text = cached_result.get("result", "")
        else:
            ocr_text = ocr(text_message, imageFile)
            # Save to cache
            save_to_cache(cache_key, {"result": ocr_text})
            print(f"[NEW] {question} {image_path} {ocr_text}")
        
        if ocr_text == "No text found!!!":
            return ""
        return ocr_text
    except Exception as e:
        print(question, image_path, e)    
        return ""

In [259]:
from pydantic import BaseModel, Field
from grading_utils import create_gemini_config

class GradingResult(BaseModel):
    """Pydantic model for grading results - used both as response schema and data container"""
    similarity_score: float = Field(description="Similarity score from 0 to 1")
    mark: float = Field(description="Actual mark awarded based on marking scheme")
    reasoning: str = Field(description="Brief explanation of the score")


def grade_answer(question_text, submitted_answer, marking_scheme_text, total_marks):
    """
    Grade a student's answer using Vertex AI Express Mode with structured output
    
    Args:
        question_text: The original question text
        submitted_answer: The student's answer
        marking_scheme_text: Detailed marking scheme/rubric with correct answer
        total_marks: Total marks available
    
    Returns:
        GradingResult object with similarity_score, mark and reasoning
    """
    # Check cache first
    cache_key = get_cache_key(
        "grade_answer",
        model="gemini-3-flash-preview",
        temperature=0,
        top_p=0.3,
        max_output_tokens=8192,
        question=question_text,
        answer=submitted_answer,
        scheme=marking_scheme_text,
        marks=total_marks
    )
    
    cached_result = get_from_cache(cache_key)
    if cached_result is not None:
        return GradingResult(**cached_result)
    
    prompt = f'''You are an expert grader. Evaluate the student's answer based on the question and marking scheme provided.

<QUESTION>
{question_text}
</QUESTION>

<MARKING_SCHEME>
{marking_scheme_text}
</MARKING_SCHEME>

<TOTAL_MARKS>
{total_marks}
</TOTAL_MARKS>

<STUDENT_ANSWER>
{submitted_answer}
</STUDENT_ANSWER>

Evaluate how well the student's answer matches the marking scheme for this specific question.
Consider partial credit possibilities outlined in the marking scheme.
Provide:
1. reasoning: Brief explanation of the scoring (think through this first)
2. similarity_score: A score from 0 to 1 indicating how well the answer matches
3. mark: The actual mark to award (from 0 to {total_marks})'''
    
    config = create_gemini_config(
        temperature=0,
        top_p=0.3,
        max_output_tokens=1024 * 8,
        response_mime_type="application/json",
        response_schema=GradingResult,
    )
    
    retry = 0
    while retry < 3:
        try:
            response = client.models.generate_content(
                model="gemini-3-flash-preview",
                contents=[{"role": "user", "parts": [{"text": prompt}]}],
                config=config,
            )
            
            # Extract score and reasoning from structured response
            if hasattr(response, 'parsed') and response.parsed is not None:
                # Return the parsed Pydantic model directly
                similarity_score_val = max(0.0, min(1.0, response.parsed.similarity_score))
                mark = max(0.0, min(float(total_marks), response.parsed.mark))
                result = GradingResult(similarity_score=similarity_score_val, mark=mark, reasoning=response.parsed.reasoning)
                # Save to cache using model_dump() for Pydantic v2 compatibility
                save_to_cache(cache_key, result.model_dump())
                return result
            else:
                # Fallback to text parsing if structured output not available
                text = response.text if response.text else ""
                try:
                    import json
                    parsed = json.loads(text)
                    similarity_score_val = float(parsed.get('similarity_score', 0))
                    mark = float(parsed.get('mark', 0))
                    reasoning = parsed.get('reasoning', 'N/A')
                    similarity_score_val = max(0.0, min(1.0, similarity_score_val))
                    mark = max(0.0, min(float(total_marks), mark))
                    result = GradingResult(similarity_score=similarity_score_val, mark=mark, reasoning=reasoning)
                    # Save to cache using model_dump() for Pydantic v2 compatibility
                    save_to_cache(cache_key, result.model_dump())
                    return result
                except:
                    print("Retry")
                    retry += 1
                    continue
        except Exception as e:
            print(f"Error in grade_answer: {e}")
            retry += 1
            continue
    
    return GradingResult(similarity_score=0, mark=0, reasoning="Error: Could not retrieve scoring")


def grade_answers(answers, question):
    """
    Grade multiple answers for a question
    
    Args:
        answers: List of student answers
        question: Question label/name
    
    Returns:
        List of GradingResult objects
    """
    # Get question text, marking scheme, and marks - must exist for all questions
    # Handle case mismatch (e.g., 'NAME' vs 'Name')
    question_text = standard_question_text.get(question, "")
    marking_scheme_text = standard_answer.get(question, "")
    total_marks = standard_mark.get(question, 0)
    
    # If not found, try case-insensitive lookup
    if not marking_scheme_text or not question_text:
        question_lower = question.lower()
        for key in standard_answer.keys():
            if key.lower() == question_lower:
                marking_scheme_text = standard_answer[key]
                question_text = standard_question_text.get(key, "")
                total_marks = standard_mark.get(key, 0)
                break
    
    results = []
    for submitted_answer in answers:
        submitted_answer = str(submitted_answer)
        if submitted_answer.strip() == "":
            results.append(GradingResult(similarity_score=0, mark=0, reasoning="Empty answer"))
            continue
        result = grade_answer(question_text, submitted_answer, marking_scheme_text, total_marks)
        results.append(result)
    
    return results

In [260]:
import json
from pydantic import BaseModel, Field
from typing import List


class ModerationItem(BaseModel):
    """Individual moderation result for one answer"""
    moderated_mark: float = Field(description="Final moderated mark")
    flag: bool = Field(description="True if adjusted or needs review")
    note: str = Field(description="Short reason for moderation")


class ModerationResponse(BaseModel):
    """Response containing all moderation items"""
    items: List[ModerationItem] = Field(description="List of moderation items")


def grade_moderator(question, answers, grading_results, row_numbers):
    """
    Use Gemini to harmonize marks across similar answers for a single question.
    
    This function ensures consistency by identifying similar answers that received
    different marks and adjusting them to be fair and consistent.
    
    Args:
        question: Question label/name
        answers: List of student answers (strings)
        grading_results: List of GradingResult objects from grade_answers
        row_numbers: List of row numbers for student identification (required)
    
    Returns:
        List of dicts with keys: moderated_mark, flag, note
    """
    # Validate row_numbers is provided
    if not row_numbers or len(row_numbers) != len(answers):
        raise ValueError(f"row_numbers is required and must match the number of answers ({len(answers)})")
    
    # Get question metadata
    question_text = standard_question_text.get(question, "")
    marking_scheme_text = standard_answer.get(question, "")
    total_marks = standard_mark.get(question, 0)

    # Prepare entries for moderation with row numbers
    entries = []
    for row_num, ans, res in zip(row_numbers, answers, grading_results):
        entries.append({
            "row": int(row_num),
            "answer": str(ans or ""),
            "mark": float(res.mark),
            "reasoning": str(res.reasoning or ""),
        })

    # Check cache
    cache_key = get_cache_key(
        "grade_moderator",
        model="gemini-3-pro-preview",
        temperature=0,
        top_p=0.3,
        question=question_text,
        scheme=marking_scheme_text,
        total_marks=total_marks,
        entries=entries,
    )
    
    cached = get_from_cache(cache_key)
    if cached is not None:
        return cached

    # Build moderation prompt
    prompt = f"""You are a grading moderator ensuring fairness and consistency.

Question: {question_text}
Marking scheme: {marking_scheme_text}
Total marks: {total_marks}

You are given {len(entries)} student responses with their current marks and grading reasons. 
Your task is to review all responses and ensure that similar answers receive similar marks.

For each response, decide:
1. Should the mark be adjusted for consistency with peer responses?
2. Does it need human review?

Return a JSON object with an "items" array of exactly {len(entries)} objects in the same order.
Each object must have:
- "moderated_mark": number between 0 and {total_marks}
- "flag": boolean (true if you adjusted the mark or want human review, false otherwise)
- "note": string (max 120 chars) explaining your decision; when referencing peers, use their row number (e.g., "row 2", "row 4")

Be concise and fair. Maintain ordering."""

    content = json.dumps(entries, ensure_ascii=False)
    
    # Create Gemini config with structured output
    config = create_gemini_config(
        temperature=0,
        top_p=0.3,
        max_output_tokens=65535,
        response_mime_type="application/json",
        response_schema=ModerationResponse,
    )

    try:
        response = client.models.generate_content(
            model="gemini-3-pro-preview",
            contents=[
                {
                    "role": "user",
                    "parts": [
                        {"text": prompt},
                        {"text": "\nResponses:\n" + content},
                    ],
                }
            ],
            config=config,
        )
        
        moderation = []
        
        # Try to use structured output first
        if hasattr(response, 'parsed') and response.parsed is not None:
            for item in response.parsed.items:
                moderated_mark = max(0.0, min(float(total_marks), float(item.moderated_mark)))
                flag = bool(item.flag)
                note = str(item.note)
                moderation.append({
                    "moderated_mark": moderated_mark,
                    "flag": flag,
                    "note": note,
                })
        else:
            # Fallback to text parsing
            text = response.text if hasattr(response, "text") else None
            if not text:
                raise ValueError("Empty Gemini response")
            
            parsed = json.loads(text)
            parsed_items = parsed.get("items", parsed) if isinstance(parsed, dict) else parsed
            
            for item, original in zip(parsed_items, entries):
                try:
                    moderated_mark = max(0.0, min(float(total_marks), float(item.get("moderated_mark", original["mark"]))))
                    flag = bool(item.get("flag", False))
                    note = str(item.get("note", ""))
                except Exception:
                    # If parsing fails, keep original mark
                    moderated_mark = float(original["mark"])
                    flag = False
                    note = "parse_error"
                
                moderation.append({
                    "moderated_mark": moderated_mark,
                    "flag": flag,
                    "note": note,
                })
        
        # Save to cache
        save_to_cache(cache_key, moderation)
        return moderation
        
    except Exception as e:
        print(f"grade_moderator error for question '{question}': {e}")
        # Fallback: return original marks without moderation
        return [
            {
                "moderated_mark": float(res.mark),
                "flag": False,
                "note": "moderation_error",
            }
            for res in grading_results
        ]

In [261]:
import os
import pandas as pd


def get_the_list_of_files(path):
    """
    Get the list of files in the directory
    """
    files = []
    for dirpath, dirnames, filenames in os.walk(path):
        files.extend(filenames)
        break
    return sorted(files)


images = get_the_list_of_files(base_path_images)

# get max page from annotations_list
max_page = 0
for annotation in annotations_list:
    if annotation["page"] > max_page:
        max_page = annotation["page"]
max_page = max_page + (1 if max_page % 2 == 1 else max_page + 2) # Scanner will have a blank page!

# filter images by file name divided by page
images_by_page = []
for page in range(max_page):
    images_by_page.append([])
    for image in images:
        p = int(image.split(".")[0])
        if p % max_page == page:
            images_by_page[page].append(image)


def get_df(question):
    row = annotations_dict[question].copy()
    row["Similarity"] = 0
    row["Reasoning"] = ""
    row["Image"] = images_by_page[row["page"]]
    # append base_path_images to each image
    row["Image"] = ["images/" + image for image in row["Image"]]

    # expend row to dataframe for each image in row["Image"]
    data = pd.DataFrame(row)
    data = data.explode("Image")
    data = data.reset_index(drop=True)

    data["Answer"] = data.apply(
        lambda row: ocr_image_from_file(question,
            base_path + "/" + row["Image"],
            row["left"],
            row["top"],
            row["width"],
            row["height"],
        ),
        axis=1,
    )
    # add column RowNumber
    data["RowNumber"] = data.index + 1
    data["maskPage"] = data["page"]

    # Check if this is a metadata question (ID, NAME, CLASS) - no grading needed
    if question in ["ID", "NAME", "CLASS"]:
        # For metadata questions, just populate with zeros/empty values
        data["Similarity"] = 0.0
        data["Reasoning"] = ""
        data["MarkRaw"] = 0.0
        data["Mark"] = 0.0
        data["ModeratorFlag"] = False
        data["ModeratorNote"] = ""
    else:
        # For regular questions, perform grading and moderation
        scoring_results = grade_answers(data["Answer"].tolist(), question)
        
        # Use row numbers for moderation (reliable, unlike OCR'd student IDs)
        row_numbers = data["RowNumber"].tolist()
        
        # Apply moderation to harmonize marks across similar answers
        moderation = grade_moderator(question, data["Answer"].tolist(), scoring_results, row_numbers)
        
        # Extract all fields from GradingResult objects and moderation
        data["Similarity"] = [result.similarity_score for result in scoring_results]
        data["Reasoning"] = [result.reasoning for result in scoring_results]
        data["MarkRaw"] = [result.mark for result in scoring_results]
        data["Mark"] = [m["moderated_mark"] for m in moderation]
        data["ModeratorFlag"] = [m["flag"] for m in moderation]
        data["ModeratorNote"] = [m["note"] for m in moderation]

    data["page"] = data["Image"].apply(
        lambda x: x.replace("images/", "").replace(".jpg", "")
    )

    return data


def save_template_output(output, question, filename):
    path = Path(base_path_questions, question)
    path.mkdir(parents=True, exist_ok=True)
    path = Path(os.path.join(path, filename))
    text_file = open(path, "w")
    text_file.write(output)
    text_file.close()


# question = "NAME"
# get_df(question)

Generate individual question page.

In [262]:
from ipywidgets import IntProgress
from IPython.display import display

max_count = len(questions)
f = IntProgress(min=0, max=max_count) # instantiate the bar
display(f) # display the bar

for question in questions:
    dataTable = get_df(question)
    os.makedirs(base_path_questions + "/" + question, exist_ok=True)
    dataTable.to_csv(base_path_questions + "/" + question + "/data.csv", index=False)

    if question == "ID" or question == "NAME" or question == "CLASS":
        template = env.get_template("questions/index-answer.html")
    else:
        template = env.get_template("questions/index.html")
    output = template.render(
        studentsScriptFileName=file_name,
        question=question,
        standardAnswer=standard_answer[question] if question in standard_answer else "",
        standardMark=standard_mark[question] if question in standard_mark else "",
        estimatedBoundingBox=annotations_dict[question],
        dataTable=dataTable,
    )
    save_template_output(output, question, "index.html")

    template = env.get_template("questions/question.js")
    output = template.render(
        dataTable=dataTable,
        estimatedBoundingBox=annotations_dict[question],
    )
    save_template_output(output, question, "question.js")

    template = env.get_template("questions/style.css")
    output = template.render(
        dataTable=dataTable,
    )
    save_template_output(output, question, "style.css")
    f.value += 1

IntProgress(value=0, max=8)

[CACHE] ID ../marking_form/VTC Test/images/0.jpg {'result': '123456789'}
[CACHE] ID ../marking_form/VTC Test/images/2.jpg {'result': '987654321'}
[CACHE] ID ../marking_form/VTC Test/images/4.jpg {'result': '234567890'}
[CACHE] ID ../marking_form/VTC Test/images/6.jpg {'result': '345678912'}
[CACHE] CLASS ../marking_form/VTC Test/images/0.jpg {'result': 'A'}
[CACHE] CLASS ../marking_form/VTC Test/images/2.jpg {'result': 'B'}
[CACHE] CLASS ../marking_form/VTC Test/images/4.jpg {'result': 'C'}
[CACHE] CLASS ../marking_form/VTC Test/images/6.jpg {'result': 'D'}
[CACHE] Q1 ../marking_form/VTC Test/images/0.jpg {'result': 'Vocational and Professional\nEducation and Traing'}
[CACHE] Q1 ../marking_form/VTC Test/images/2.jpg {'result': 'Vacational and professional\nEduate Training'}
[CACHE] Q1 ../marking_form/VTC Test/images/4.jpg {'result': 'Hong Kong skilled labor force'}
[CACHE] Q1 ../marking_form/VTC Test/images/6.jpg {'result': 'Vocational and Professional\nEducation and Training'}
[CACHE]

In [263]:
from ipywidgets import IntProgress
from IPython.display import display
import pandas as pd

max_count = len(questions)
f = IntProgress(min=0, max=max_count) # instantiate the bar
display(f) # display the bar

for question in questions:
    data_path = base_path_questions + "/" + question + "/data.csv"
    dataTable = pd.read_csv(data_path)
    dataTable = dataTable.replace(".*No text found!!!.*", "", regex=True)
    
    scoring_results = grade_answers(dataTable["Answer"].tolist(), question)
    dataTable["Similarity"] = [result.similarity_score for result in scoring_results]
    dataTable["Reasoning"] = [result.reasoning for result in scoring_results]
    
    dataTable.to_csv(base_path_questions + "/" + question + "/data.csv", index=False)

    if question == "ID" or question == "NAME" or question == "CLASS":
        template = env.get_template("questions/index-answer.html")
    else:
        template = env.get_template("questions/index.html")
    output = template.render(
        studentsScriptFileName=file_name,
        question=question,
        standardAnswer=standard_answer[question] if question in standard_answer else "",
        standardMark=standard_mark[question] if question in standard_mark else "",
        estimatedBoundingBox=annotations_dict[question],
        dataTable=dataTable,
    )
    save_template_output(output, question, "index.html")

    template = env.get_template("questions/question.js")
    output = template.render(
        dataTable=dataTable,
        estimatedBoundingBox=annotations_dict[question],
    )
    save_template_output(output, question, "question.js")

    template = env.get_template("questions/style.css")
    output = template.render(
        dataTable=dataTable,
    )
    save_template_output(output, question, "style.css")
    f.value += 1

IntProgress(value=0, max=8)

## Validate Student ID

In [264]:
# load csv file to dataframe
import pandas as pd

id_from_oscr = pd.read_csv(base_path_questions + "/" + "ID" + "/data.csv")["Answer"].tolist()
id_from_oscr = [str(int(float(x))) if pd.notna(x) else x for x in id_from_oscr]

id_from_namelist = name_list_df["ID"].to_list()

# check duplicate id
duplicate_id = []
for id in id_from_oscr:
    if id_from_oscr.count(id) > 1:
        duplicate_id.append(id)
duplicate_id = list(set(duplicate_id))
if len(duplicate_id) > 0:
    print(colored("Duplicate ID: {}".format(duplicate_id), "red"))

id_from_oscr = [str(id) for id in id_from_oscr]
id_from_namelist = [str(id) for id in id_from_namelist]

# compare oscr_id and validate_id
ocr_missing_id = []
name_list_missing_id = []
for id in id_from_oscr:    
    if id not in id_from_namelist:       
        name_list_missing_id.append(id)

for id in id_from_namelist:
    if id not in id_from_oscr:   
        ocr_missing_id.append(id)

## OCR scan error case

In [265]:
from termcolor import colored
if len(ocr_missing_id) > 0:
    print(colored("Some IDs OCR is not in NameList and you need to fix it manually!", "red"))
    for id in name_list_missing_id:
        print(colored(id, "red"))

## Potential Absent Case

In [266]:
from termcolor import colored

if len(ocr_missing_id) > 0:
    print(colored("Number of absentee {}.".format(len(ocr_missing_id)), "red"))
    print(colored("ID in Name List does not find from OCR!", "red"))
    for id in ocr_missing_id:
        print(colored(id, "red"))

# Start Python HTTPServer

The webserver log is in output/server.log.

If you are in development and don't want the notebook being blocked by running webserver, you can open a terminal and run the below command.

file_name=XXXX python server.py 8000

In [267]:
print("file_name=\"{}\" python server.py".format(file_name))

file_name="VTC Test" python server.py


In [268]:
# You can also uncomment the following line to run the web server but if it crashes, you need to restart the kernel.
# !cd .. && file_name=TestScript python server.py