# Automatic Grader with Gemini Pro
This notebook can grade students’ assignments automatically by downloading them from Moodle LMS. It will unzip the assignment file from Moodle and create a folder for each student. If a student submits a zip file, it will also unzip it in their folder. The folder should contain either some Docx files or one PDF file. For Docx files, the notebook will extract and merge all the texts into one answer. For PDF files, it will only extract the text from the first page as the answer.

The notebook will then use a marking scheme as prompts and let Gemini Pro evaluate the answer according to the rules. It will also estimate the probability that the answer is copied from the internet or generated by AI.

The notebook will use textembedding-gecko-multilingual@001 to get the embedding of the answer. It will then use K-means clustering to group the answers based on their embeddings and show the teachers the different types of answers. It will also perform PCA on the embeddings and plot the first three principal components in 3D. This will help the teachers see how similar or different the answers are.

### Install packages

In [None]:
%pip install -q pypandoc docx2txt PyPDF2 openpyxl python-dotenv google-cloud-aiplatform google-cloud-core num2words matplotlib plotly scipy scikit-learn pandas tiktoken ipywidgets seaborn ipympl

### Common Functions

In [None]:
# read text file and return the content
def read_text_file(path):
    with open(path, 'r') as file:
        data = file.read().replace('\n', '')
    return data

def write_text_to_file(path, content):
    with open(path, 'w') as file:
        file.write(content)

Extract all submissions to a tmp folder


In [None]:
# Import the zipfile module
from zipfile import ZipFile
# Create a zip file object using ZipFile class
with ZipFile("data/submission.zip", "r") as zip_obj:
    # Extract all the files into a directory
    zip_obj.extractall("tmp/submission") 

In [None]:
# Import the os module
import os
import pandas as pd

# Define the path to list
temp_path = "tmp/submission/"

def is_folder_contains_file(folder_path, extension): 
    # Get a list of all files and directories in the path 
    names = os.listdir(folder_path) 
    for name in names: 
        if name.endswith(extension): 
            return True 
    return False   
    
# Get a list of all files and directories in the path
def get_submissions_df(path):
    assignment_folders = []
    names = os.listdir(path)
    # Loop through the list
    for name in names:
        # Join the path and the name
        full_path = os.path.join(path, name)
        # Check if it is a directory
        if os.path.isdir(full_path):
            # Print the directory name
            assignment_folders.append({
                "Student": name.split("_")[0],
                "Path": full_path,
                "ContainsDocxFile": is_folder_contains_file(full_path, ".docx"),                
                "ContainsPdfFile": is_folder_contains_file(full_path, ".pdf"),
                "ContainsZipFile": is_folder_contains_file(full_path, ".zip")
                })
    df = pd.DataFrame([p for p in assignment_folders])
    return df
df = get_submissions_df(temp_path)

In [None]:
df

### Ensure that all the files submitted are valid

In [None]:
def filter_df_by_not_contains_any_expected_files(df):
    return df[(df["ContainsDocxFile"] == False) & (df["ContainsPdfFile"] == False) & (df["ContainsZipFile"] == False)]
filter_df_by_not_contains_any_expected_files(df)

Handle zip file.

In [None]:
import os
import shutil

def flatten(directory):
    for dirpath, _, filenames in os.walk(directory, topdown=False):
        for filename in filenames:
            i = 0
            source = os.path.join(dirpath, filename)
            target = os.path.join(directory, filename)

            while os.path.exists(target):
                i += 1
                file_parts = os.path.splitext(os.path.basename(filename))

                target = os.path.join(
                    directory,
                    file_parts[0] + "_" + str(i) + file_parts[1],
                )

            shutil.move(source, target)

            print("Moved ", source, " to ", target)

        if dirpath != directory:
            os.rmdir(dirpath)
            print("Deleted ", dirpath)

def get_first_file_path(path, ext):
    names = os.listdir(path)
    for name in names:
        if name.endswith(ext):
            return os.path.join(path, name)

def extract_zip_file_in_place(path):
    zip_path = get_first_file_path(path, ".zip")
    print(zip_path)
    import zipfile
    # Create a zip file object using ZipFile class
    with zipfile.ZipFile(zip_path, "r") as zip_obj:
        # Extract all the files into a directory
        zip_obj.extractall(path)
    flatten(path) 


def filter_df_by_contains_zip_file(df):
    return df[(df["ContainsZipFile"] == True)]

paths = filter_df_by_contains_zip_file(df)["Path"].values
for path in paths:
    extract_zip_file_in_place(path)

In [None]:
df = get_submissions_df(temp_path)
## check all rows contains Docx or PDF file
def filter_df_by_contains_docx_or_pdf_file(df):
    return df[(df["ContainsDocxFile"] == True) | (df["ContainsPdfFile"] == True)]

filter_df_by_contains_docx_or_pdf_file(df)

## Processing Docx files

In [None]:
def filter_df_by_contains_docx(df):
    return df[(df["ContainsDocxFile"] == True)]
words_df = filter_df_by_contains_docx(df)
paths = words_df["Path"].values

def get_all_docx_files(path):
    import glob
    return glob.glob(path + "/*.docx")

import docx2txt
from functools import reduce

students_words_files = list(map(get_all_docx_files, paths)) # List of lists of word files

file_contents =[];
for word_files in students_words_files:  
    file_contents.append(reduce(lambda x, y: x + y, map(lambda f: docx2txt.process(f), word_files), "\n\n"))
# reduce(map(lambda f: docx2txt.process(f), word_files), lambda x, y: x + y, "")
words_df.loc[:, "Sources"] = students_words_files
words_df.loc[:, "Answers"] = file_contents


In [None]:
def filter_df_by_contains_pdf(df):
    return df[(df["ContainsPdfFile"] == True)]
pdfs_df = filter_df_by_contains_pdf(df)
paths = pdfs_df["Path"].values

def get_add_pdf_files(path):
    import glob
    return glob.glob(path + "/*.pdf")

import PyPDF2
from functools import reduce

def convert_pdf_all_pages_to_txt(path):
    pdfFileObj = open(path, 'rb')
    reader = PyPDF2.PdfReader(pdfFileObj)
    num_pages = len(reader.pages)
    count = 0
    text = ""
    while count < num_pages:
        pageObj = reader.pages[count]
        count += 1
        text += pageObj.extract_text()
        text += "\n\n"
    return text

students_pdf_files = list(map(get_add_pdf_files, paths)) # List of lists of word files

file_contents =[];
for pdf_files in students_pdf_files:
    file_contents.append(reduce(lambda x, y: x + y, map(convert_pdf_all_pages_to_txt, pdf_files), "\n\n"))

pdfs_df.loc[:, "Sources"] = students_pdf_files
pdfs_df.loc[:, "Answers"] = file_contents
pdfs_df


In [None]:
# combine two dataframes into one and export to excel
df_answers = pd.concat([words_df, pdfs_df])
df_answers.to_excel("data/answers.xlsx", index=False)

## Grading students’ responses using Gemini Pro with Grounding

In [None]:
project_id = 'cyrus-testing-2023'
!gcloud config set project {project_id}
!gcloud auth application-default set-quota-project {project_id}

In [None]:
from google.cloud import aiplatform

aiplatform.init(
    # your Google Cloud Project ID or number
    # environment default used is not set
    project=project_id
)

In [None]:
import json
import vertexai
from vertexai.generative_models import GenerativeModel
import vertexai.preview.generative_models as generative_models

vertexai.init(project=project_id, location="us-central1")
model = GenerativeModel("gemini-1.0-pro-001")

def get_json_gemini(student, prompt):        
    generation_config = {
        "max_output_tokens": 4096,
        "temperature": 0.2,
        "top_p": 0.2,
    }
    safety_settings = {
        generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
        generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
        generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
        generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_ONLY_HIGH,
    }   
  
    model_response  = model.generate_content(
        [prompt],
        generation_config=generation_config,
        safety_settings=safety_settings
    )
    text = model_response.candidates[0].content.parts[0].text
    print(text)

    write_text_to_file(f"tmp/{student}.json", json.dumps(text))
    tokens = model_response.usage_metadata.total_token_count
    
    return json.loads(text) , tokens


In [None]:
def grade_answer(student,student_answer, marking_scheme):    
    prompt=marking_scheme.replace("<ANSWER></ANSWER>", student_answer)
    retry = 0; 
    while True:
        try:
            content, tokens = get_json_gemini(student,prompt)
            break             
        except Exception as e:            
            if retry < 2:                
                retry += 1
                print(e)
                print("retry: " + str(retry))
                continue            
            return 0, "Error", 0, 0, True, 0, True
    marks = content['marks']
    comments = content['comments']       
    copyFromInternet = content['copyFromInternet']
    generativeAI = content['generativeAI']        
    manualReview = content['manualReview']     
    return marks, comments, copyFromInternet, generativeAI, manualReview, tokens, False    

def grade_answers(df_answers, marking_scheme):  
    for index, row in df_answers.iterrows():              
        student = row["Student"]
        print(student)
        answer = row["Answers"]       
        marks, comments, copyFromInternet, generativeAI, manualReview, tokens, error = grade_answer(student, answer, marking_scheme)
        df_answers.loc[index, "Marks"] = marks
        df_answers.loc[index, "Comments"] = comments
        df_answers.loc[index, "CopyFromInternet"] = copyFromInternet
        df_answers.loc[index, "GenerativeAI"] = generativeAI
        df_answers.loc[index, "ChatGptTokens"] = tokens     
        df_answers.loc[index, "ManualReview"] = manualReview
        df_answers.loc[index, "Error"] = error       
    return df_answers

marking_scheme = read_text_file("marking_scheme.txt")

# get second row answer for df_answers
# student = df_answers.iloc[[2]]["Student"].values[0]
# student_answer = df_answers.iloc[[2]]["Answers"].values[0]
# print(student_answer)
# grade_answer(student_answer, marking_scheme)

df_marked = grade_answers(df_answers, marking_scheme)
df_marked.to_excel("data/marks.xlsx", index=False)

In [None]:
df_marked

## Embeddings and clustering

In [None]:
df_marked = pd.read_excel("data/marks.xlsx") 
df_Answers = df_marked[['Student','Answers']]
df_Answers

Data cleaning by removing redundant whitespace and cleaning up

In [None]:
import re
import pandas as pd

pd.options.mode.chained_assignment = None #https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#evaluation-order-matters

# s is input text
def normalize_text(s, sep_token = " \n "):
    s = re.sub(r'\s+',  ' ', s).strip()
    s = re.sub(r". ,","",s)
    # remove all instances of multiple spaces
    s = s.replace("..",".")
    s = s.replace(". .",".")
    s = s.replace("\n", "")
    s = s.strip()    
    return s if len(s) > 0 else "Do nothing"

df_Answers['Answers']= df_Answers["Answers"].apply(lambda x : normalize_text(x))

Remove any answers that are too long for the token limit (8192 tokens).

In [None]:
import tiktoken
tokenizer = tiktoken.get_encoding("cl100k_base")
df_Answers['n_tokens'] = df_Answers["Answers"].apply(lambda x: len(tokenizer.encode(x)))
df_Answers = df_Answers[df_Answers.n_tokens<8192]
len(df_Answers)

In [None]:
df_Answers.head()

In [None]:
from typing import  Optional, List
from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel

def get_embedding(
    text: str,
    task: str = "CLUSTERING",
    model_name: str = "text-multilingual-embedding-preview-0409",
    dimensionality: Optional[int] = 256,
) -> List[float]:
    """Embeds texts with a pre-trained, foundational model."""
    model = TextEmbeddingModel.from_pretrained(model_name)
    inputs = [TextEmbeddingInput(text, task)]
    kwargs = dict(output_dimensionality=dimensionality) if dimensionality else {}
    embeddings = model.get_embeddings(inputs, **kwargs)
    return [embedding.values for embedding in embeddings][0]


In [None]:
df_Answers['embedding'] = df_Answers["Answers"].apply(lambda x : get_embedding(x)) 
# df_Answers.set_index( ['Student'], inplace = True)
# engine should be set to the deployment name you chose when you deployed the text-embedding-ada-002 (Version 2) model

In [None]:
import json
df_Answers.to_excel("data/embeddings.xlsx", index=True)
df_Answers.apply(lambda x : write_text_to_file(f"tmp/embeddings_{x.Student}.json", json.dumps(x.embedding)), axis=1)
df_Answers

### Clustering based on the Embeddings

Reload embeddings.

In [None]:
import json
import pandas as pd
import numpy as np

# df_embeddings = df_Answers.copy()
df_embeddings = pd.read_excel("data/embeddings.xlsx") 
def reload_embeddings(student):
    return list(json.loads(read_text_file(f"tmp/embeddings_{student.Student}.json")))
df_embeddings["embedding"] = df_embeddings.apply(lambda s : reload_embeddings(s), axis=1)
df_embeddings.drop(['Unnamed: 0'], axis=1, inplace=True)
df_embeddings.set_index( ['Student'], inplace = True)
df_embeddings.head()

In [None]:
from sklearn.cluster import KMeans

matrix = np.array(df_embeddings["embedding"].to_list())
n_clusters = 7
kmeans = KMeans(n_clusters=n_clusters, init="k-means++", random_state=42, n_init='auto')
kmeans.fit(matrix)
labels = kmeans.labels_
df_embeddings["Cluster"] = labels 
df_embeddings.head()

In [None]:
import seaborn as sns
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (15, 8) 

tsne = TSNE(n_components=2, perplexity=5, random_state=42, init='random', learning_rate=200)
vis_dims2 = tsne.fit_transform(matrix)

x = [x for x,y in vis_dims2]
y = [y for x,y in vis_dims2]

palette = sns.color_palette("inferno", 20).as_hex() 

for category, color in enumerate(palette):    
    xs = np.array(x)[df_embeddings["Cluster"]==category]
    ys = np.array(y)[df_embeddings["Cluster"]==category]
    plt.scatter(xs, ys, color=color, alpha=0.1)

    avg_x = xs.mean()
    avg_y = ys.mean()
    
    plt.scatter(avg_x, avg_y, marker='x', color=color, s=100)
plt.title("Embeddings visualized using t-SNE")

Export the final result cluster

In [None]:
df_marked_tmp=pd.read_excel("data/marks.xlsx") 
df_embeddings_tmp=df_embeddings.copy()
df_marked_tmp.set_index( ['Student'], inplace = True)
# df_embeddings_tmp.set_index( ['Student'], inplace = True)
df_final = pd.merge(df_marked_tmp, df_embeddings_tmp[["n_tokens","embedding","Cluster"]], how='left', left_index=True, right_index=True)

cols = ['Marks', 'Comments', 'Answers','CopyFromInternet','GenerativeAI','ChatGptTokens','ManualReview','Error','Cluster']

df_final= df_final[cols]
df_final.to_excel("data/final.xlsx", index=True)
df_final.head(5)


### Reduce the embedding dimensionality

In [None]:
from sklearn.decomposition import PCA

pca_df = df_embeddings.copy()
matrix = pca_df["embedding"].to_list()
pca = PCA(n_components=3)
vis_dims = pca.fit_transform(matrix)
pca_df["embed_vis"] = vis_dims.tolist()
pca_df

The ratio of the total variance each principal component captures

In [None]:
print(str(sum(pca.explained_variance_ratio_)*100)+"%")

Analyzing the Change in Explained Variance Ratio

In [None]:
import numpy as np
nums = np.arange(14)

var_ratio = []
for num in nums:
  pca = PCA(n_components=num)
  pca.fit(matrix)
  var_ratio.append(np.sum(pca.explained_variance_ratio_))

import matplotlib.pyplot as plt

plt.figure(figsize=(4,2),dpi=150)
plt.grid()
plt.plot(nums,var_ratio,marker='o')
plt.xlabel('n_components')
plt.ylabel('Explained variance ratio')
plt.title('n_components vs. Explained Variance Ratio')  

In [None]:
%matplotlib widget
import matplotlib.pyplot as plt
import numpy as np

fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(projection='3d')
cmap = plt.get_cmap("tab20")

clusters = pca_df["Cluster"].to_list()

# Plot each sample category individually such that we can set label name.
for i, clusterId in enumerate(clusters):
    sub_matrix = np.array(pca_df[pca_df["Cluster"] == clusterId]["embed_vis"].to_list())
    
    x=sub_matrix[:, 0]
    y=sub_matrix[:, 1]
    z=sub_matrix[:, 2]
    colors = [cmap(i/len(clusters))] * len(sub_matrix)
    ax.scatter(x, y, zs=z, zdir='z', c=colors, label=clusterId)

    students = pca_df[pca_df["Cluster"] == clusterId].index.values.tolist()
    for i, txt in enumerate(students):
        ax.text(x[i], y[i], z[i], txt, size=8, zorder=1, color='k')

ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
# ax.legend(bbox_to_anchor=(1.1, 1))