In [None]:
import pandas as pd
from collections import Counter
import re

from tqdm import tqdm
from ragatouille import RAGPretrainedModel
from sklearn.metrics import accuracy_score, root_mean_squared_error, f1_score


In [None]:
def create_entry(row):
    return f"Question: {row['question']} Student Answer: {row['student']} Label: {row['label']} Numeric Score: {row['score']} Feedback: {row['feedback']}"


In [None]:
df = pd.read_csv("data/train.csv")

df['Entry'] = df.apply(create_entry, axis=1)
entries = df['Entry'].tolist()
collection = entries

In [None]:
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
RAG.index(
    collection=collection, 
    index_name="SAF-scoring",
    max_document_length=512,
    split_documents=False
    )

In [None]:
def perform_rag_search(row):
    return RAG.search(query=row['student'], k=K)

In [None]:
def extract_label_and_score(content):
    label_pattern = re.compile(r'Label:\s*(\w+|\w+\s\w+)')
    score_pattern = re.compile(r'Numeric Score:\s*([0-9.]+)')
    label_match = label_pattern.search(content)
    score_match = score_pattern.search(content)
    label = label_match.group(1) if label_match else None
    score = float(score_match.group(1)) if score_match else None
    return label, score

In [None]:
def find_majority(labels):
    count = Counter(labels)
    majority_label = count.most_common(1)[0][0]  
    return majority_label

def calculate_average(scores):
    return sum(scores) / len(scores) if scores else None


# Unseen Answers

In [None]:
ua_df = pd.read_csv('data/ua.csv')
ua_df = ua_df.fillna('')

In [None]:
K = [3, 5]

tqdm.pandas()
for k in K:
    ua_df[f'results_{k}'] = ua_df.progress_apply(perform_rag_search, axis=1)
    
    results = ua_df[f'results_{k}'].tolist()
    
    ext_results = {'labels': [], 'scores': []}
    for result in results_3:
        labels = []
        scores = []
        for i in range(0, K):
            label, score = extract_label_and_score(result[i]['content'])
            labels.append(label)
            scores.append(score)
        ext_results['labels'].append(labels)
        ext_results['scores'].append(scores)
    
    
    ua_df[f'ext_labels_k'] = ext_results['labels']
    ua_df[f'ext_scores_k'] = ext_results['scores']

In [None]:
for k in K:
    ua_df[f'average_score_{k}'] = ua_df[f'ext_scores_{k}'].apply(calculate_average)
    ua_df[f'majority_{k}'] = ua_df[f'ext_labels_{k}'].apply(lambda labels: find_majority(labels).replace('partially', 'partially correct'))

for k in K:
    accuracy = accuracy_score(ua_df['label'], ua_df[f'majority_{k}'])
    rmse = (root_mean_squared_error(ua_df['score'], ua_df[f'average_score_{k}']))
    f1 = f1_score(ua_df['label'], ua_df[f'majority_{k}'], average='macro')
    
    print(f"Accuracy (k={k}):  ", accuracy)
    print(f"RMSE (k={k}): ", rmse)
    print(f"F1 (k={k}): ", f1)

# Unseen Questions

In [None]:
uq_df = pd.read_csv('data/uq.csv')
uq_df = uq_df.fillna('')

In [None]:
K = [3, 5]

tqdm.pandas()
for k in K:
    uq_df[f'results_{k}'] = uq_df.progress_apply(perform_rag_search, axis=1)
    
    results = uq_df[f'results_{k}'].tolist()
    
    ext_results = {'labels': [], 'scores': []}
    for result in results_3:
        labels = []
        scores = []
        for i in range(0, K):
            label, score = extract_label_and_score(result[i]['content'])
            labels.append(label)
            scores.append(score)
        ext_results['labels'].append(labels)
        ext_results['scores'].append(scores)
    
    
    uq_df[f'ext_labels_k'] = ext_results['labels']
    uq_df[f'ext_scores_k'] = ext_results['scores']

In [None]:
for k in K:
    uq_df[f'average_score_{k}'] = uq_df[f'ext_scores_{k}'].apply(calculate_average)
    uq_df[f'majority_{k}'] = uq_df[f'ext_labels_{k}'].apply(lambda labels: find_majority(labels).replace('partially', 'partially correct'))

for k in K:
    accuracy = accuracy_score(uq_df['label'], uq_df[f'majority_{k}'])
    rmse = (root_mean_squared_error(uq_df['score'], uq_df[f'average_score_{k}']))
    f1 = f1_score(uq_df['label'], uq_df[f'majority_{k}'], average='macro')
    
    print(f"Accuracy (k={k}):  ", accuracy)
    print(f"RMSE (k={k}): ", rmse)
    print(f"F1 (k={k}): ", f1)