# Automatic Grader with Azure OpenAI ChatGPT
This notebook can grade students’ assignments automatically by downloading them from Moodle LMS. It will unzip the assignment file from Moodle and create a folder for each student. If a student submits a zip file, it will also unzip it in their folder. The folder should contain either some Docx files or one PDF file. For Docx files, the notebook will extract and merge all the texts into one answer. For PDF files, it will only extract the text from the first page as the answer.

The notebook will then use a marking scheme as prompts and let Azure OpenAI ChatGPT evaluate the answer according to the rules. It will also estimate the probability that the answer is copied from the internet or generated by AI.

The notebook will use Azure OpenAI text-embedding-3-large to get the embedding of the answer. It will then use K-means clustering to group the answers based on their embeddings and show the teachers the different types of answers. It will also perform PCA on the embeddings and plot the first three principal components in 3D. This will help the teachers see how similar or different the answers are.

### Install packages

In [None]:
%pip install -q pypandoc docx2txt PyPDF2 openpyxl python-dotenv openai num2words matplotlib plotly scipy scikit-learn pandas tiktoken ipywidgets seaborn ipympl
%load_ext dotenv
%dotenv

### Common Functions

In [None]:
# read text file and return the content
def read_text_file(path):
    with open(path, 'r') as file:
        data = file.read().replace('\n', '')
    return data

def write_text_to_file(path, content):
    with open(path, 'w') as file:
        file.write(content)

Extract all submissions to a tmp folder


In [None]:
# Import the zipfile module
from zipfile import ZipFile
# Create a zip file object using ZipFile class
with ZipFile("data/submission.zip", "r") as zip_obj:
    # Extract all the files into a directory
    zip_obj.extractall("tmp/") 

In [None]:
# Import the os module
import os
import pandas as pd

# Define the path to list
temp_path = "tmp/"

def is_folder_contains_file(folder_path, extension): 
    # Get a list of all files and directories in the path 
    names = os.listdir(folder_path) 
    for name in names: 
        if name.lower().endswith(extension.lower()): 
            return True 
    return False   
    
# Get a list of all files and directories in the path
def get_submissions_df(path):
    assignment_folders = []
    names = os.listdir(path)
    # Loop through the list
    for name in names:
        # Join the path and the name
        full_path = os.path.join(path, name)
        # Check if it is a directory
        if os.path.isdir(full_path):
            # Print the directory name
            assignment_folders.append({
                "Student": name.split("_")[0],
                "Path": full_path,
                "ContainsDocxFile": is_folder_contains_file(full_path, ".docx"),                
                "ContainsPdfFile": is_folder_contains_file(full_path, ".pdf"),
                "ContainsZipFile": is_folder_contains_file(full_path, ".zip")
                })
    df = pd.DataFrame([p for p in assignment_folders])
    return df
df = get_submissions_df(temp_path)

In [None]:
df

### Ensure that all the files submitted are valid

In [None]:
def filter_df_by_not_contains_any_expected_files(df):
    return df[(df["ContainsDocxFile"] == False) & (df["ContainsPdfFile"] == False) & (df["ContainsZipFile"] == False)]
filter_df_by_not_contains_any_expected_files(df)

Handle zip file.

In [None]:
import os
import shutil

def flatten(directory):
    for dirpath, _, filenames in os.walk(directory, topdown=False):
        for filename in filenames:
            i = 0
            source = os.path.join(dirpath, filename)
            target = os.path.join(directory, filename)

            while os.path.exists(target):
                i += 1
                file_parts = os.path.splitext(os.path.basename(filename))

                target = os.path.join(
                    directory,
                    file_parts[0] + "_" + str(i) + file_parts[1],
                )

            shutil.move(source, target)

            print("Moved ", source, " to ", target)

        if dirpath != directory:
            os.rmdir(dirpath)
            print("Deleted ", dirpath)

def get_first_file_path(path, ext):
    names = os.listdir(path)
    for name in names:
        if name.endswith(ext):
            return os.path.join(path, name)

def extract_zip_file_in_place(path):
    zip_path = get_first_file_path(path, ".zip")
    print(zip_path)
    import zipfile
    # Create a zip file object using ZipFile class
    with zipfile.ZipFile(zip_path, "r") as zip_obj:
        # Extract all the files into a directory
        zip_obj.extractall(path)
    flatten(path) 


def filter_df_by_contains_zip_file(df):
    return df[(df["ContainsZipFile"] == True)]

paths = filter_df_by_contains_zip_file(df)["Path"].values
for path in paths:
    extract_zip_file_in_place(path)

In [None]:
df = get_submissions_df(temp_path)
## check all rows contains Docx or PDF file
def filter_df_by_contains_docx_or_pdf_file(df):
    return df[(df["ContainsDocxFile"] == True) | (df["ContainsPdfFile"] == True)]

filter_df_by_contains_docx_or_pdf_file(df)

## Processing Docx files

In [None]:
def filter_df_by_contains_docx(df):
    return df[(df["ContainsDocxFile"] == True)]
words_df = filter_df_by_contains_docx(df)
paths = words_df["Path"].values

def get_all_docx_files(path):
    import glob
    return glob.glob(path + "/*.docx")

import docx2txt
from functools import reduce

students_words_files = list(map(get_all_docx_files, paths)) # List of lists of word files

file_contents =[];
for word_files in students_words_files:  
    file_contents.append(reduce(lambda x, y: x + y, map(lambda f: docx2txt.process(f), word_files), "\n\n"))
# reduce(map(lambda f: docx2txt.process(f), word_files), lambda x, y: x + y, "")
words_df.loc[:, "Sources"] = students_words_files
words_df.loc[:, "Answers"] = file_contents


In [None]:
def filter_df_by_contains_pdf(df):
    return df[(df["ContainsPdfFile"] == True)]
pdfs_df = filter_df_by_contains_pdf(df)
paths = pdfs_df["Path"].values

def get_add_pdf_files(path):
    import glob
    return glob.glob(path + "/*.pdf")

import PyPDF2
from functools import reduce

def convert_pdf_all_pages_to_txt(path):
    pdfFileObj = open(path, 'rb')
    reader = PyPDF2.PdfReader(pdfFileObj)
    num_pages = len(reader.pages)
    count = 0
    text = ""
    while count < num_pages:
        pageObj = reader.pages[count]
        count += 1
        text += pageObj.extract_text()
        text += "\n\n"
    return text

students_pdf_files = list(map(get_add_pdf_files, paths)) # List of lists of word files

file_contents =[];
for pdf_files in students_pdf_files:
    file_contents.append(reduce(lambda x, y: x + y, map(convert_pdf_all_pages_to_txt, pdf_files), "\n\n"))

pdfs_df.loc[:, "Sources"] = students_pdf_files
pdfs_df.loc[:, "Answers"] = file_contents
pdfs_df


In [None]:
# combine two dataframes into one and export to excel
df_answers = pd.concat([words_df, pdfs_df])
df_answers.to_excel("data/answers.xlsx", index=False)

## Grading students’ responses using Azure OpenAI ChatGPT

In [None]:
import os
import json
from openai import AzureOpenAI

client = AzureOpenAI(
    api_version="2024-12-01-preview",
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    api_key=os.getenv("AZURE_OPENAI_KEY")
)

def mark_result(marks, copyFromInternet, generativeAI, manualReview, comments):
    return {
        "marks": marks,
        "copyFromInternet": copyFromInternet,
        "generativeAI": generativeAI,    
        "manualReview": manualReview,
        "comments": comments    
    }


def get_json_chatGpt(student, prompt):    
    response = client.chat.completions.create(
        messages=[
            {"role": "system", "content": "You are a teaching assistant."},
            {"role": "user", "content": prompt},
        ],
        temperature=0.9,
        max_tokens=1600,
        top_p=0.0,
        frequency_penalty=0,
        presence_penalty=0,
        model="gpt-4o-mini",
        tools=[
            {
                "type": "function",
                "function": {
                    "name": "mark_result",
                    "description": "Return the grading result for the student answer.",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "marks": {"type": "integer"},
                            "copyFromInternet": {"type": "number"},
                            "generativeAI": {"type": "number"},
                            "manualReview": {"type": "boolean"},
                            "comments": {"type": "string"}
                        },
                        "required": ["marks", "copyFromInternet", "generativeAI", "manualReview", "comments"]
                    }
                }
            }
        ],
        tool_choice={"type": "function", "function": {"name": "mark_result"}}
    )
    # Extract the tool call result from the response
    tool_calls = response.choices[0].message.tool_calls
    if tool_calls and tool_calls[0].function and tool_calls[0].function.arguments:
        response.choices[0].message.content = tool_calls[0].function.arguments
    else:
        response.choices[0].message.content = "{}"

    print(response)
    
    # Convert response to JSON for saving to file
    response_json = {
        "id": response.id,
        "choices": [
            {
                "message": {
                    "content": response.choices[0].message.content,
                    "role": response.choices[0].message.role
                },
                "index": response.choices[0].index,
                "finish_reason": response.choices[0].finish_reason
            }
        ],
        "usage": {
            "prompt_tokens": response.usage.prompt_tokens,
            "completion_tokens": response.usage.completion_tokens,
            "total_tokens": response.usage.total_tokens
        }
    }
    
    write_text_to_file(f"tmp/{student}.json", json.dumps(response_json))
    tokens = response.usage.total_tokens
    return json.loads(response.choices[0].message.content), tokens

def grade_answer(student, student_answer, marking_scheme):    
    prompt=marking_scheme.replace("<ANSWER></ANSWER>", student_answer)
    retry = 0; 
    while True:
        try:
            content, tokens = get_json_chatGpt(student,prompt)
            break             
        except Exception as e:            
            if retry < 2:                
                retry += 1
                print(e)
                print("retry: " + str(retry))
                continue            
            return 0, "Error", 0, 0, True, 0, True
    marks = content['marks']
    comments = content['comments']       
    copyFromInternet = content['copyFromInternet']
    generativeAI = content['generativeAI']        
    manualReview = content['manualReview']     
    return marks, comments, copyFromInternet, generativeAI, manualReview, tokens, False    

def grade_answers(df_answers, marking_scheme):
    for index, row in df_answers.iterrows():      
        student = row["Student"]
        print(student)
        answer = row["Answers"]
       
        marks, comments, copyFromInternet, generativeAI, manualReview, tokens, error = grade_answer(student, answer, marking_scheme)
        df_answers.loc[index, "Marks"] = marks
        df_answers.loc[index, "Comments"] = comments
        df_answers.loc[index, "CopyFromInternet"] = copyFromInternet
        df_answers.loc[index, "GenerativeAI"] = generativeAI
        df_answers.loc[index, "ChatGptTokens"] = tokens     
        df_answers.loc[index, "ManualReview"] = manualReview
        df_answers.loc[index, "Error"] = error
    return df_answers

marking_scheme = read_text_file("marking_scheme.txt")

# get second row answer for df_answers
student = df_answers.iloc[[2]]["Student"].values[0]
student_answer = df_answers.iloc[[2]]["Answers"].values[0]
# print(student_answer)
# print(marking_scheme)
# grade_answer(student, student_answer, marking_scheme)

df_marked = grade_answers(df_answers, marking_scheme)
df_marked.to_excel("data/marks.xlsx", index=False)

In [None]:
df_marked

## Embeddings and clustering

In [None]:
import os
from openai import AzureOpenAI

model_name = "text-embedding-3-large"
deployment = "text-embedding-3-large"

api_version = "2024-02-01"

def get_embedding(text):
    client = AzureOpenAI(
        api_version=api_version,
        azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        api_key=os.getenv("AZURE_OPENAI_KEY")
    )
    response = client.embeddings.create(
        input=text,
        model=model_name
    )    
    return response.data[0].embedding

In [None]:
df_marked = pd.read_excel("data/marks.xlsx") 
df_Answers = df_marked[['Student','Answers']]
df_Answers

Data cleaning by removing redundant whitespace and cleaning up

In [None]:
import re


pd.options.mode.chained_assignment = None #https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#evaluation-order-matters

# s is input text
def normalize_text(s, sep_token = " \n "):
    s = re.sub(r'\s+',  ' ', s).strip()
    s = re.sub(r". ,","",s)
    # remove all instances of multiple spaces
    s = s.replace("..",".")
    s = s.replace(". .",".")
    s = s.replace("\n", "")
    s = s.strip()    
    return s if len(s) > 0 else "Do nothing"

df_Answers['Answers']= df_Answers["Answers"].apply(lambda x : normalize_text(x))

Remove any answers that are too long for the token limit (8192 tokens).

In [None]:
import tiktoken


tokenizer = tiktoken.get_encoding("cl100k_base")
df_Answers['n_tokens'] = df_Answers["Answers"].apply(lambda x: len(tokenizer.encode(x)))
df_Answers = df_Answers[df_Answers.n_tokens<8192]
len(df_Answers)

In [None]:
df_Answers.head()

In [None]:
df_Answers['ada_v2'] = df_Answers["Answers"].apply(lambda x : get_embedding(x)) 
# df_Answers.set_index( ['Student'], inplace = True)
# engine should be set to the deployment name you chose when you deployed the text-embedding-ada-002 (Version 2) model

In [None]:
import json
df_Answers.to_excel("data/embeddings.xlsx", index=True)
df_Answers.apply(lambda x : write_text_to_file(f"tmp/embeddings_{x.Student}.json", json.dumps(x.ada_v2)), axis=1)
df_Answers

### Clustering based on the Embeddings

Reload embeddings.

In [None]:
import json
import pandas as pd
import numpy as np

# df_embeddings = df_Answers.copy()
df_embeddings = pd.read_excel("data/embeddings.xlsx") 
def reload_embeddings(student):
    return list(json.loads(read_text_file(f"tmp/embeddings_{student.Student}.json")))
df_embeddings["ada_v2"] = df_embeddings.apply(lambda s : reload_embeddings(s), axis=1)
df_embeddings.drop(['Unnamed: 0'], axis=1, inplace=True)
df_embeddings.set_index( ['Student'], inplace = True)
df_embeddings.head()

In [None]:
from sklearn.cluster import KMeans

matrix = np.array(df_embeddings["ada_v2"].to_list())
n_clusters = 7
kmeans = KMeans(n_clusters=n_clusters, init="k-means++", random_state=42, n_init='auto')
kmeans.fit(matrix)
labels = kmeans.labels_
df_embeddings["Cluster"] = labels 
df_embeddings.head()

In [None]:
import seaborn as sns
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import numpy as np

# Create a larger figure with more room for margins
plt.figure(figsize=(16, 10))

# Set up t-SNE
tsne = TSNE(n_components=2, perplexity=5, random_state=42, init='random', learning_rate=200)
vis_dims2 = tsne.fit_transform(matrix)

x = [x for x,y in vis_dims2]
y = [y for x,y in vis_dims2]

palette = sns.color_palette("inferno", n_clusters).as_hex()  # Match palette size to n_clusters

# Create the scatter plot with all clusters
for category, color in enumerate(palette):
    # Get indices of points in this cluster
    cluster_indices = np.where(df_embeddings["Cluster"] == category)[0]
    
    # Only plot if there are points in this cluster
    if len(cluster_indices) > 0:
        # Extract x and y values for this cluster
        xs = np.array(x)[cluster_indices]
        ys = np.array(y)[cluster_indices]
        
        # Plot the individual points with consistent size
        s_value = 30  # Size for each point
        plt.scatter(xs, ys, s=s_value, color=color, alpha=0.3)
        
        # Calculate and plot the cluster center
        avg_x = np.mean(xs)
        avg_y = np.mean(ys)
        plt.scatter([avg_x], [avg_y], marker='x', color=color, s=200)

# Add title and labels with explicit fontsize
plt.title("Embeddings visualized using t-SNE", fontsize=14)
plt.xlabel("t-SNE dimension 1", fontsize=12)
plt.ylabel("t-SNE dimension 2", fontsize=12)

# Add padding around the plot instead of using tight_layout
plt.subplots_adjust(left=0.1, right=0.95, top=0.9, bottom=0.1)

# Show the plot
plt.show()

Export the final result cluster

In [None]:
df_marked_tmp=pd.read_excel("data/marks.xlsx") 
df_embeddings_tmp=df_embeddings.copy()
df_marked_tmp.set_index( ['Student'], inplace = True)
# df_embeddings_tmp.set_index( ['Student'], inplace = True)
df_final = pd.merge(df_marked_tmp, df_embeddings_tmp[["n_tokens","ada_v2","Cluster"]], how='left', left_index=True, right_index=True)

cols = ['Marks', 'Comments', 'Answers','CopyFromInternet','GenerativeAI','ChatGptTokens','ManualReview','Error','Cluster']

df_final= df_final[cols]
df_final.to_excel("data/final.xlsx", index=True)
df_final.head(5)


### Reduce the embedding dimensionality

In [None]:
from sklearn.decomposition import PCA

pca_df = df_embeddings.copy()
matrix = pca_df["ada_v2"].to_list()
pca = PCA(n_components=3)
vis_dims = pca.fit_transform(matrix)
pca_df["embed_vis"] = vis_dims.tolist()
pca_df

The ratio of the total variance each principal component captures

In [None]:
print(str(sum(pca.explained_variance_ratio_)*100)+"%")

Analyzing the Change in Explained Variance Ratio

In [None]:
import numpy as np
nums = np.arange(14)

var_ratio = []
for num in nums:
  pca = PCA(n_components=num)
  pca.fit(matrix)
  var_ratio.append(np.sum(pca.explained_variance_ratio_))

import matplotlib.pyplot as plt

plt.figure(figsize=(4,2),dpi=150)
plt.grid()
plt.plot(nums,var_ratio,marker='o')
plt.xlabel('n_components')
plt.ylabel('Explained variance ratio')
plt.title('n_components vs. Explained Variance Ratio')  

In [None]:
%matplotlib widget
import matplotlib.pyplot as plt
import numpy as np

fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(projection='3d')
cmap = plt.get_cmap("tab20")

clusters = pca_df["Cluster"].to_list()

# Plot each sample category individually such that we can set label name.
for i, clusterId in enumerate(clusters):
    sub_matrix = np.array(pca_df[pca_df["Cluster"] == clusterId]["embed_vis"].to_list())
    
    x=sub_matrix[:, 0]
    y=sub_matrix[:, 1]
    z=sub_matrix[:, 2]
    colors = [cmap(i/len(clusters))] * len(sub_matrix)
    ax.scatter(x, y, zs=z, zdir='z', c=colors, label=clusterId)

    students = pca_df[pca_df["Cluster"] == clusterId].index.values.tolist()
    for i, txt in enumerate(students):
        ax.text(x[i], y[i], z[i], txt, size=8, zorder=1, color='k')

ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
# ax.legend(bbox_to_anchor=(1.1, 1))