# Google Cloud Platform Vertex AI Grader
Use LLM to grade student programming answer

In [None]:
project_id = "cyrus-testing-2023"
location = "us-central1"
model_name="codechat-bison"
base_folder = "data/ITE3101_practical_tests/ite-3101-practical-test-ab-submissions/"
answer_excel = "student_answer.xlsx"
answer_excel_path = base_folder + answer_excel
standard_answer_path = base_folder + "standard_answer_ab.xlsx"

Run the following following generated commands in terminal.

In [None]:
commands =f"""gcloud auth application-default login
gcloud auth application-default set-quota-project {project_id}
gcloud auth login
gcloud config set project {project_id}
gcloud services enable aiplatform.googleapis.com"""
print(commands)

In [None]:
%pip install langchain openai python-dotenv pandas openpyxl tqdm google-cloud-aiplatform

In [None]:
import pandas as pd

student_answer_df = pd.read_excel(answer_excel_path)
student_answer_df.head()

In [None]:
standard_answer = pd.read_excel(standard_answer_path)
standard_answer.head()

In [None]:
standard_answer_dict = standard_answer.set_index(
    'Question Name').to_dict(orient='index')

In [None]:
from langchain.chat_models import ChatVertexAI
from langchain.prompts.chat import ChatPromptTemplate
import langchain
langchain.debug = False
from langchain.output_parsers import PydanticOutputParser
from langchain.pydantic_v1 import BaseModel, Field, validator
from langchain.prompts import PromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate

# Define your desired data structure.
class ScoreResult(BaseModel):
    score: int = Field(description="Score")
    comments: str = Field(description="Comments")
    calculation: str = Field(description="Calculation")

parser = PydanticOutputParser(pydantic_object=ScoreResult)



def score_answer(instruction, starter, answer, mark, student_answer, student_commit, temperature=0.1,prompt_file="grader_prompt.txt"):
    template = "You are a Python programming instructor who grades student Python exercises."
    with open(prompt_file) as f:
        grader_prompt = f.read()

    data = {"instruction": instruction,
            "starter": starter,
            "answer": answer,
            "mark": mark,
            "student_answer": student_answer,
            "student_commit": student_commit}

    prompt = PromptTemplate(
        template="You are a Python programming instructor who grades student Python exercises.\n{format_instructions}\n",
        input_variables=[],
        partial_variables={"format_instructions": parser.get_format_instructions()},
    )
    system_message_prompt = SystemMessagePromptTemplate(prompt=prompt)
    human_message_prompt = HumanMessagePromptTemplate(prompt=PromptTemplate(
                                    template=grader_prompt,
                                    input_variables=["instruction", "starter", "answer", "mark", "student_answer", "student_commit"],
                                )
                            )

    chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

    llm = ChatVertexAI(
        model_name=model_name, 
        location=location,
        max_output_tokens=1024, 
        temperature=0.2
    )
    runnable = chat_prompt | llm | parser
    
    # Get the result
    data = {"instruction": instruction,
            "starter": starter,
            "answer": answer,
            "mark": mark,
            "student_answer": student_answer,
            "student_commit": student_commit}
    output = runnable.invoke(data)
    return output


In [None]:
from tqdm import tqdm
from langchain_core.exceptions import OutputParserException

parser_error_count = 0
failed_cases = []
last_exception = ""
rows = []
for index, row in tqdm(student_answer_df.iterrows(), total=len(student_answer_df)):
    for key, value in standard_answer_dict.items():
        question = key
        instruction = value["Instruction"]
        starter = value["Starter"]
        answer = value["Answer"]
        mark = value["Mark"]
        student_answer = row[question + " Content"]
        student_commit = row[question + " Commit"]

        for _ in range(3):  # Retry 3 times
            try:
                result = score_answer(
                    instruction, starter, answer, mark, student_answer, student_commit)               
                row[question + " Score"] = result.score
                row[question + " Comments"] = result.comments
                row[question + " Calculation"] = result.calculation
                break  # Break the loop if successful  
            except OutputParserException as ex:               
                last_exception = ex
                parser_error_count += 1
                continue
        else:            
            failed_cases.append({"directory": row['Directory'], "question": question,  "exception": last_exception})

    rows.append(row)

print(f"Total OutputParserException: {parser_error_count}")
print(f"Total failed cases: {len(failed_cases)}")

### Special Handling the failed case again
You need to manually re-run it by turning the temperture and change the Prompt file.

In [None]:
backup_student_answer_df = student_answer_df.copy()

In [None]:
print(f"Total failed cases: {len(failed_cases)}")

orginal_model_name = model_name
# You may change to use more powerful model
# model_name = "codechat-bison@002"

if len(failed_cases) > 0:
    print("Failed cases:")
    for failed_case in failed_cases:
        print(failed_case)
        # Get row from student_answer_df by Directory
        row = student_answer_df.loc[student_answer_df['Directory'] == failed_case["directory"]]        
        question = failed_case['question']
        instruction = standard_answer_dict[question]["Instruction"]
        starter = standard_answer_dict[question]["Starter"]
        answer = standard_answer_dict[question]["Answer"]
        mark = standard_answer_dict[question]["Mark"]      
        student_answer = row[question + " Content"]
        print(student_answer)
        student_commit = row[question + " Commit"]
        result = score_answer(instruction, starter, answer, mark, student_answer, student_commit, temperature=0.3)         
        #update student_answer_df with result
        row[question + " Score"] = result.score
        row[question + " Comments"] = result.comments
        row[question + " Calculation"] = result.calculation
        # replace row in student_answer_df
        # student_answer_df.loc[student_answer_df['Directory'] == failed_case["directory"]] = row
        #remove failed case from failed_cases
        failed_cases.remove(failed_case)

model_name = orginal_model_name

In [None]:
scored_df = pd.DataFrame(rows)
scored_df.head()

In [None]:
default_value = 0  # or any other value you want to replace NaN with
scored_df[scored_df.filter(like='Score').columns] = scored_df.filter(like='Score').fillna(default_value).astype(int)
scored_df[scored_df.filter(like='Commit').columns] = scored_df.filter(like='Commit').fillna(default_value).astype(int)
scored_df['total_score'] = scored_df.filter(like='Score').sum(axis=1)
scored_df.head()

In [None]:
score_columns = scored_df.filter(like='Score').columns
scored_df = scored_df[[
    col for col in scored_df.columns if col not in score_columns] + list(score_columns)]
scored_df.head()

In [None]:
import os
excel_file_path = os.path.join(base_folder, f'student_score_{model_name}.xlsx')
scored_df.to_excel(excel_file_path, index=False)