# Automatic Evaluation

This notebook performs the automatic evaluation of the system outputs.

Make sure you have read the `README` in this directory and installed all required packages.

## Imports

Import all required packages


In [None]:
from os import path, makedirs, popen, system, listdir
import sys
import errant
from scribendi import ScribendiScore
from syntok.tokenizer import Tokenizer
from itertools import product
import spacy_udpipe
import spacy
from tqdm.notebook import tqdm
import pandas as pd
import re

## Constants

Setup the repo root and m2-directory as constants.


In [None]:
REPO_ROOT = path.join("/home/jovyan/da231x")
print(REPO_ROOT)
M2DIR = path.join(REPO_ROOT, "m2")
makedirs(M2DIR, exist_ok=True)  # Ensure directory exists

### Processsing Functions

Define functions to process the data.


In [None]:
def read_file(file_path):
    with open(file_path) as f:
        return f.read()


def pretokenize(txt):
    """
    Tokenizes and returns txt with syntok.tokenizer.
    """
    tok = Tokenizer()
    return " ".join([str(token).strip() for token in tok.tokenize(txt)])


def convert_essay_to_single_line(essay):
    """
    Replace all newlines in essay with spaces.
    """
    newline = "\n"
    space = " "
    return essay.replace(newline, space)


def md_to_dict(md):
    """
    Parse shared task format into a dictionary where keys are essay IDs
    and values are essay texts.

    Arguments:

    md --- a string with the content of a shared task Markdown file.
    """
    essay_dict = {}
    for essay in md.split("### essay_id = ")[1:]:
        (essay_id, text) = essay.split("\n", maxsplit=1)
        text_tokenized = pretokenize(text).strip("\n")
        essay_dict[essay_id] = convert_essay_to_single_line(text_tokenized)
    return essay_dict


def write_essay_to_file(output_dir, essay_id, essay_text):
    """
    Writes essay text to the file path output_dir/essay_id.tmp and returns the file path.
    """
    file_name = f"{essay_id}.tmp"
    file_path = path.join(output_dir, file_name)
    with open(file_path, "w+") as f:
        f.write(essay_text)
    return file_path


def _ensure_directory_exists(directory):
    """
    Creates directory if it does not exist.
    """
    makedirs(directory, exist_ok=True)


def split_file_per_essay(input_file, output_dir):
    """
    Reads each essay from input_file and writes them to individual files.
    The input file is structured as below:
    ### essay_id = ABC123
    ...
    ### essay_id = XYZ987
    ...

    Each essay is written to a file with the path: output_dir/essay_id.tmp.

    Returns a dict[essay_id] = file_path.
    """
    _ensure_directory_exists(output_dir)

    input_text = read_file(input_file)

    ids_texts = md_to_dict(input_text)

    file_paths = {}

    for essay_id, essay_text in ids_texts.items():
        file_path = write_essay_to_file(output_dir, essay_id, essay_text)
        file_paths[essay_id] = file_path
    return file_paths

## Variables

These variables are used to create file paths for later use in the evaluation process.

### Edit Versions

Setup variables to distinguish minimal edits and fluency edits.


In [None]:
MINIMAL = "minimal"
FLUENCY = "fluency"
versions = [MINIMAL, FLUENCY]

### Teams

Create a list of all teams, which are all directories under `../models/`.


In [None]:
models_dir = path.join(REPO_ROOT, "outputs/")

teams = [
    d for d in listdir(models_dir)
    if path.isdir(path.join(models_dir, d))
]
print(teams)

In [None]:
TEAM_LABEL = "Team"
STYLE_LABEL = "Correction Style"
ESSAY_LABEL = "Essay"

### Directories

Setup Directories for various directories.

In [None]:
DATA_DIR = path.join(REPO_ROOT, "data/swedish/SweLL-gold/")
SOURCE_DIR = path.join(REPO_ROOT, "sources/")
REFERENCE_DIR = path.join(REPO_ROOT, "references/")
HYPOTHESIS_DIR = path.join(REPO_ROOT, "hypotheses/")
SYSTEM_OUTPUT_DIR = path.join(REPO_ROOT, "outputs/")

print(DATA_DIR)
print(SOURCE_DIR)
print(REFERENCE_DIR)
print(HYPOTHESIS_DIR)
print(SYSTEM_OUTPUT_DIR)

### Sources

Get paths for all source files.


In [None]:
def get_all_source_paths():
    """
    Writes all source essays to files on the form: `SOURCE_DIR/essay_id.tmp`.
    Returns a dict[essay_id] = file_path.
    """
    md = path.join(DATA_DIR, "sv-swell_gold-orig-test.md")
    return split_file_per_essay(md, SOURCE_DIR)


source_paths = get_all_source_paths()
print(source_paths)

### References

Get paths for all reference files.


In [None]:
def get_reference_paths(input_file, version):
    """
    Writes all reference essays to files on the form: `REFERENCE_DIR/version/essay_id.tmp`.
    Returns a dict[essay_id] = file_path.
    """
    output_dir = path.join(REFERENCE_DIR, version)
    return split_file_per_essay(input_file, output_dir)


def get_all_reference_paths():
    """
    Writes both minimal-edited and fluency-edited reference essays to files on the form: `REFERENCE_DIR/version/essay_id.tmp`.
    Returns a dict[version][essay_id] = file_path.
    """
    minimal_reference_md = path.join(DATA_DIR, "sv-swell_gold-ref1-test.md")
    fluency_reference_md = path.join(DATA_DIR, "sv-swell_gold-ref2-test.md")
    return {
        MINIMAL: get_reference_paths(minimal_reference_md, MINIMAL),
        FLUENCY: get_reference_paths(fluency_reference_md, FLUENCY),
    }


reference_paths = get_all_reference_paths()
print(reference_paths)

### Hypotheses

Get paths for all hypothesis paths.


In [None]:
def get_system_version_hypothesis_paths(team, version, md):
    """
    Writes all system hypotheses to files on the form: `HYPOTHESIS_DIR/team/version/essay_id.tmp`.
    Returns a dict[essay_id] = file_path.
    """
    hypothesis_dir = path.join(HYPOTHESIS_DIR, team, version)
    return split_file_per_essay(md, hypothesis_dir)


def get_system_hypothesis_paths(team):
    """
    Writes both minimal-edited and fluency-edited system hypotheses to files on the form: `HYPOTHESIS_DIR/team/version/essay_id.tmp`.
    Returns a dict[version][essay_id] = file_path.
    """

    minimal_hypothesis_md = path.join(
        SYSTEM_OUTPUT_DIR, team, MINIMAL, "sv-swell_gold-hypo-test.md"
    )
    fluency_hypothesis_md = path.join(
        SYSTEM_OUTPUT_DIR, team, FLUENCY, "sv-swell_gold-fluency-hypo-test.md"
    )
    return {
        MINIMAL: get_system_version_hypothesis_paths(
            team, MINIMAL, minimal_hypothesis_md
        ),
        FLUENCY: get_system_version_hypothesis_paths(
            team, FLUENCY, fluency_hypothesis_md
        ),
    }


def get_all_hypothesis_paths():
    """
    Writes both minimal-edited and fluency-edited system hypotheses for both teams to files on the form: `HYPOTHESIS_DIR/team/version/essay_id.tmp`.
    Returns a dict[team][version][essay_id] = file_path.
    """
    return {team: get_system_hypothesis_paths(team) for team in teams}


hypothesis_paths = get_all_hypothesis_paths()
print(hypothesis_paths)

## GLEU

Compute GLEU with the implementation by Shota Koyama ([https://github.com/shotakoyama/gleu](https://github.com/shotakoyama/gleu)).


In [None]:
def compute_gleu(
    source_file, minimal_reference_file, fluency_reference_file, hypothesis_file
):
    gleu_command = [
        f"gleu",
        f"-s {source_file}",
        # Use both references
        f"-r {minimal_reference_file} {fluency_reference_file}",
        f"-o {hypothesis_file}",
        f"-d 4",  # Number of decimal places
        f"-f",  # Fixed seed
        f"-n 4",  # Maximum n-gram length
        f"-t word",  # Word-level tokenization
    ]

    gleu_output = popen(" ".join(gleu_command)).read()
    if gleu_output != "":
        gleu_split = gleu_output.split()
        gleu_score = float(gleu_split[1])
    else:
        gleu_score = -float("inf")
    return gleu_score

In [None]:
gleu_scores = {}
essay_ids = set(source_paths.keys())
for team in tqdm(teams, desc=TEAM_LABEL):
    gleu_scores[team] = {}
    for version in tqdm(versions, leave=False, desc=STYLE_LABEL):
        gleu_scores[team][version] = {}
        for essay_id in tqdm(essay_ids, leave=False, desc=ESSAY_LABEL):
            source_file = source_paths[essay_id]
            minimal_reference_file = reference_paths[MINIMAL][essay_id]
            fluency_reference_file = reference_paths[FLUENCY][essay_id]
            hypothesis_file = hypothesis_paths[team][version][essay_id]

            # Compute GLEU score
            gleu_score = compute_gleu(
                source_file,
                minimal_reference_file,
                fluency_reference_file,
                hypothesis_file,
            )

            gleu_scores[team][version][essay_id] = gleu_score

In [None]:
print(gleu_scores)

## ERRANT

Compute ERRANT with the implementation by Andrew Caines ([https://github.com/cainesap/errant](https://github.com/cainesap/errant)).

Begin by defining some helper functions.

Read a file and return its contents.


In [None]:
def read_file(file_path):
    with open(file_path) as f:
        return f.read()

Compute precision $ P $, recall $ R $, and $ F_{ \beta } $-score.

| Metric         | Formula                                                                                |
|----------------|----------------------------------------------------------------------------------------|
| $ P $          | $ \frac{ TP }{ TP + FP } $                                                             |
| $ R $          | $ \frac{ TP }{ TP + FN } $                                                             |
| $ F_{ \beta} $ | $ \frac{ ( 1 + \beta^{ 2 } ) \times ( P \times R ) }{ ( \beta^{ 2 } \times P ) + R } $ |

In [None]:
def compute_precision(tp, fp):
    if tp + fp == 0:
        return 0.0
    return tp / (tp + fp)


def compute_recall(tp, fn):
    if tp + fn == 0:
        return 0.0
    return tp / (tp + fn)


def compute_f_beta(precision, recall, beta=0.5):
    if precision + recall == 0:
        return 0.0
    return (1 + beta**2) * (precision * recall) / ((beta**2 * precision) + recall)

In [None]:
LANGUAGE = "sv"
nlp = spacy.load("en_core_web_sm")
nlp.to_disk(f"spacy_models/en_core_web_sm/")
spacy_udpipe.download(LANGUAGE)
annotator = errant.load(LANGUAGE, nlp=nlp)

In [None]:
def edit_to_tuple(edit):
    return (
        edit.o_start,
        edit.o_end,
        edit.o_str,
        edit.c_start,
        edit.c_end,
        edit.c_str,
        edit.type,
    )


def edits_to_set(edits):
    return set(edit_to_tuple(e) for e in edits)


def errant_parse_file(file_path):
    text = read_file(file_path)
    return annotator.parse(text)

In [None]:
def compute_errant(
    source_file,
    minimal_reference_file,
    fluency_reference_file,
    hypothesis_file,
):
    parsed = {
        "source": errant_parse_file(source_file),
        "minimal": errant_parse_file(minimal_reference_file),
        "fluency": errant_parse_file(fluency_reference_file),
        "hypothesis": errant_parse_file(hypothesis_file),
    }

    edits = {
        "minimal": annotator.annotate(parsed["source"], parsed["minimal"]),
        "fluency": annotator.annotate(parsed["source"], parsed["fluency"]),
        "hypothesis": annotator.annotate(parsed["source"], parsed["hypothesis"]),
    }

    reference_m2 = set.union(
        edits_to_set(edits["minimal"]),
        edits_to_set(edits["fluency"])
    )
    hypothesis_m2 = edits_to_set(edits["hypothesis"])

    tp = len(reference_m2 & hypothesis_m2)
    fp = len(hypothesis_m2 - reference_m2)
    fn = len(reference_m2 - hypothesis_m2)

    precision = compute_precision(tp, fp)
    recall = compute_recall(tp, fn)
    f05 = compute_f_beta(precision, recall, beta=0.5)
    return precision, recall, f05

In [None]:
errant_scores = {}

for team in tqdm(teams, desc=TEAM_LABEL):
    errant_scores[team] = {}
    for version in tqdm(versions, leave=False, desc=STYLE_LABEL):
        errant_scores[team][version] = {}
        for essay_id in tqdm(essay_ids, leave=False, desc=ESSAY_LABEL):
            source_file = source_paths[essay_id]
            minimal_reference_file = reference_paths[MINIMAL][essay_id]
            fluency_reference_file = reference_paths[FLUENCY][essay_id]
            hypothesis_file = hypothesis_paths[team][version][essay_id]

            # Compute ERRANT score
            precision, recall, f05 = compute_errant(
                source_file,
                minimal_reference_file,
                fluency_reference_file,
                hypothesis_file,
            )

            errant_scores[team][version][essay_id] = {
                "precision": precision,
                "recall": recall,
                "f05": f05,
            }

In [None]:
print(errant_scores)

Delete errant-related objects to save memory.

In [None]:
del nlp
del annotator

## Scribendi Score

Compute the Scribendi Score with the implementation by Robert Östling ([https://github.com/robertostling/scribendi_score](https://github.com/robertostling/scribendi_score)).


In [None]:
scribendi_model = "meta-llama/Llama-3.1-8B"
scribendi_access_token = "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"
scribendi_scorer = ScribendiScore(
    model_id=scribendi_model, access_token=scribendi_access_token
)

In [None]:
def compute_scribendi_score(source_file, hypothesis_file):
    # Read source and hypothesis file contents
    source_text = read_file(source_file)
    hypothesis_text = read_file(hypothesis_file)

    # The Scribendi-Score API requires dicts as input
    dummy_id = "1"
    source_input = {dummy_id: source_text}
    hypothesis_input = {dummy_id: hypothesis_text}

    return scribendi_scorer.score(source_input, hypothesis_input, batch_size=1)

In [None]:
scribendi_scores = {}
for team in tqdm(teams, desc=TEAM_LABEL):
    scribendi_scores[team] = {}
    for version in tqdm(versions, leave=False, desc=STYLE_LABEL):
        scribendi_scores[team][version] = {}
        for essay_id in tqdm(essay_ids, leave=False, desc=ESSAY_LABEL):
            source_file = source_paths[essay_id]
            hypothesis_file = hypothesis_paths[team][version][essay_id]

            # Compute Scribendi score
            scribendi_score = compute_scribendi_score(
                source_file,
                hypothesis_file,
            )

            scribendi_scores[team][version][essay_id] = scribendi_score

Remove the Scribendi-Score object to save VRAM.

In [None]:
del scribendi_scorer

# Save Scores

Combine all scores into a single dataframe and save the dataframe

In [None]:
all_scores = []

for team, version, essay_id in product(teams, versions, essay_ids):
    gleu = gleu_scores[team][version][essay_id]
    precision = errant_scores[team][version][essay_id]["precision"]
    recall = errant_scores[team][version][essay_id]["recall"]
    f05 = errant_scores[team][version][essay_id]["f05"]
    scribendi_score = scribendi_scores[team][version][essay_id]
    all_scores.append(
        {
            "Essay ID": essay_id,
            "Correction Style": version,
            "System": team,
            "GLEU": gleu,
            "Precision": precision,
            "Recall": recall,
            "F0.5": f05,
            "Scribendi Score": scribendi_score,
        }
    )

In [None]:
df = pd.DataFrame(all_scores)
df.to_csv("scores.csv", index=False)