# Automated essay scoring using CNN

In [33]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from collections import Counter
from sklearn.metrics import cohen_kappa_score

import pandas as pd
import os
import string

import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

In [31]:
nltk.download('punkt')      # For tokenization
nltk.download('stopwords')  # For stopwords

stop_words = set(stopwords.words('english'))

[nltk_data] Downloading package punkt to /home/astra/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/astra/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [47]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Set the random seed for CPU
torch.manual_seed(42)

# Set the random seed for GPU (if using CUDA)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.cuda.manual_seed_all(42)  # If you are using multiple GPUs

## Load dataset
In this notebook, we are going to use the [ASAP-AES](https://www.kaggle.com/competitions/asap-aes) dataset for modeling a CNN-based essay scoring. In order to compare our results with other research works, we will load the Taghipour and Ng (2016) five-fold cross validation. Please read their [paper](https://aclanthology.org/D16-1193.pdf) for details about the dataset and the split. In addition, this work is one of the earliest research on the employment of neural networks for AES tasks.

In [7]:
filepath = 'asap_5cv'
folds_data = []

for idx in range(0,5):   
    train_data = pd.read_csv(os.path.join(filepath, f'fold_{idx}', f'train.tsv'), sep='\t', encoding='ISO-8859-1')
    dev_data = pd.read_csv(os.path.join(filepath, f'fold_{idx}', f'dev.tsv'), sep='\t', encoding='ISO-8859-1')
    test_data = pd.read_csv(os.path.join(filepath, f'fold_{idx}', f'test.tsv'), sep='\t', encoding='ISO-8859-1')
        
    folds_data.append((train_data, dev_data, test_data))


### Preprocessing of the essay and normalizing the scores.
The ASAP-AES dataset contains essays, scores, essay prompts and other more fields. The essay prompt (essay_set) is the prompt (question) where it is represented using numbers. In the dataset, there are 8 prompts and each prompt has its own score range. Therefore, we will apply min-max normalization to change the score range into 0-1. Besides this, we are dealing with text input, therefore, we will apply text preprocessing as well.

In [23]:
scores = {
        1: (2, 12),
        2: (1, 6),
        3: (0, 3),
        4: (0, 3),
        5: (0, 4),
        6: (0, 4),
        7: (0, 30),
        8: (0, 60)
    }
def normalize_score(scores):
    return (scores - scores.min()) / (scores.max() - scores.min())
    
def prompt_score(score, prompt):
    return round(score * (scores[prompt][1] - scores[prompt][0]) + scores[prompt][0])

def create_vocab(essays, top_n):
    #preprocess essays
    essays = essays.lower()
    essays = essays.translate(str.maketrans('', '', string.punctuation))
    # Tokenize the collection
    tokens = word_tokenize(essays)
    tokens = [token for token in tokens if token not in stop_words]
    # Token distribution
    token_dist = Counter(tokens)
    frequent_tokens = token_dist.most_common(top_n)
    # Select tokens
    tokens_4_vocab = [token for (token, freq) in frequent_tokens]
    # Index tokens    
    vocab = {'<pad>':0, '<unk>':1, '<num>':2}
    for word in tokens_4_vocab:
        vocab[word] = len(vocab)
    #vocab = {word: i + len(vocab) for i, word in enumerate(tokens_4_vocab)}
    
    return vocab
def encode_essay(text, vocab):
    tokens = word_tokenize(text.lower())
    tokens = [token for token in tokens if token not in stop_words]
    enc_essay = []
    for token in tokens:
        if token.isdigit():
            enc_essay.append(vocab['<num>'])
        elif token in vocab:
            enc_essay.append(vocab[token])
        else:
            enc_essay.append(vocab['<unk>'])
    return enc_essay

In [35]:
for (train, dev, test) in folds_data:
    # Min-Max Normalization
    train['normalized_score'] = train.groupby('essay_set')['domain1_score'].transform(normalize_score)
    dev['normalized_score'] = dev.groupby('essay_set')['domain1_score'].transform(normalize_score)
    test['normalized_score'] = test.groupby('essay_set')['domain1_score'].transform(normalize_score)

    # encode essay
    # create vocab with vocab size = 4000
    top_n = 4000
    vocab = create_vocab(str(train['essay'].tolist()), top_n)
    train['encoded_essay'] = train['essay'].apply(encode_essay, args=(vocab, ))
    dev['encoded_essay'] = dev['essay'].apply(encode_essay, args=(vocab, ))
    test['encoded_essay'] = test['essay'].apply(encode_essay, args=(vocab, ))    

## Custom Dataset class

In [37]:
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data
        
    def __len__(self):
        return len(self.data)
        
    def __getitem__(self, idx):
        sample = self.data.iloc[idx]
        normalized_score = sample['normalized_score'] #normalized score
        domain_score = sample['domain1_score'] #domain1 score
        essay = sample['encoded_essay']
        
        
        return essay, normalized_score, domain_score

## The model

In [43]:
class CNNAES(nn.Module):
    def __init__(self, embedding_dim, vocab_size, hidden_dim, num_classes):
        super().__init__()
        #self.embedding = nn.Embedding.from_pretrained(pretrained_embeddings, freeze=True)
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.cnn=nn.Conv1d(embedding_dim, hidden_dim, kernel_size=3, padding=1)
        self.fc = nn.Linear(hidden_dim, num_classes)
        self.sigmoid = nn.Sigmoid()
        self.dropout = nn.Dropout(p=0.5)
        self.relu = nn.ReLU()
        
    def forward(self, essay):
        output = self.embedding(essay)
        output = self.cnn(output.transpose(1, 2))
        output = output.transpose(1,2)
        output = self.relu(output)
        output = self.dropout(output)
        output = torch.mean(output, dim=1)
        output = self.fc(output)
        
        return self.sigmoid(output)

## Training and validation

In [39]:
# Train the model
def training(model, data, optimizer, loss_fn):
    model.train()
    loss = 0.0
    for (essays, scores, d_scores) in data:
        optimizer.zero_grad()
        output = model(essays.to(DEVICE))
        scores = scores.reshape(-1, 1)
        loss = loss_fn(output, scores.to(DEVICE))
        loss.backward()
        # Apply gradient clipping (Max norm = 10.0) as Taghipour and Ng (2016)
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=10.0)
        optimizer.step()

# Tune hyperparameters and monitor performance
def validation(model, data, loss_fn):
    model.eval()
    qwk = 0
    mse_loss = 0
    val_output, val_score, val_n_scores = [], [], []
    with torch.no_grad():
        for (essays, scores, d_scores) in data:
            output = model(essays.to(DEVICE))
            output = torch.tensor([out.item() for out in output], dtype=torch.float32)
            output = output.reshape(-1, 1)

            d_scores = d_scores.reshape(-1, 1)
            loss = loss_fn(output, d_scores)
            mse_loss += loss
            val_output.extend(output.flatten().tolist())
            val_score.extend(d_scores.flatten().tolist())

    return val_output, val_score, mse_loss, model.state_dict() 

# Tune hyperparameters and monitor performance
def testing(model, best_state, data):
    model.load_state_dict(best_state)
    val_output, val_score, val_n_scores = [], [], []
    with torch.no_grad():
        for (essays, scores, d_scores) in data:
            output = model(essays.to(DEVICE))
            output = torch.tensor([out.item() for out in output], dtype=torch.float32)
            output = output.reshape(-1, 1)

            d_scores = d_scores.reshape(-1, 1)
            val_output.extend(output.flatten().tolist())
            val_score.extend(d_scores.flatten().tolist())

    return val_output, val_score

## Other functions

In [40]:
def collate_fn(batch):
    essays, nscores, dscores = zip(*batch)
    max_length = max([len(entry) for entry in essays])
    padded_essays = []
    for tokens in essays:
        padded_essay = tokens + [0] * (max_length - len(tokens))
        padded_essays.append(padded_essay)
    return torch.tensor(padded_essays, dtype=torch.int32), torch.tensor(nscores, dtype=torch.float32), torch.tensor(dscores, dtype=torch.int32)

def evaluate_qwk(outputs, scores, prompt):
    outputs = [int(prompt_score(out, prompt)) for out in outputs]
    scores = [score for score  in scores] 
    
    return cohen_kappa_score(outputs, scores, weights='quadratic')

In [49]:
vocab_size = 4000
embedding_dim = 50
hidden_dim = 100
batch_size = 8
target = 1
epochs = 5
# loss function
loss_fn = nn.MSELoss()

In [None]:
for prompt in range(1, 9):
    for (train, dev, test) in folds_data:
    
        # Dataloader
        train_ds = CustomDataset(train[train['essay_set'] == prompt])
        val_ds = CustomDataset(dev[dev['essay_set'] == prompt])
        test_ds = CustomDataset(test[test['essay_set'] == prompt])

        train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
        val_dl = DataLoader(val_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
        test_dl = DataLoader(test_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

        # Call model
        model = CNNAES(embedding_dim, vocab_size, hidden_dim, target)
        model.to(DEVICE)
        
        # optimizer
        optimizer = torch.optim.RMSprop(model.parameters(), lr=0.00004, alpha=0.9)
    
        best_model_state = None
        
        for epoch in range(0, epochs):
            training(model, train_dl, optimizer, loss_fn)
            outputs, scores, mse_loss, model_state =  validation(model, val_dl, loss_fn)

            val_qwk = evaluate_qwk(outputs, scores, prompt)
            
            if best_model_state == None:
                best_model_state = model_state
                qwk = val_qwk
                loss = mse_loss
            else:
                if val_qwk > qwk:
                    best_model_state = model_state
      
        outputs, scores= testing(model, best_model_state, test_dl)
        test_qwks.append(evaluate_qwk(outputs, scores, prompt))

    # Display test in each validation fold 
    print('Prompt: ', prompt)
    for idx in range(0, len(test_qwks)):
        print(f'Fold {idx}: {test_qwks[idx]:.3f}')

    # Display average qwk
    avg_qwks = sum(test_qwks)/len(test_qwks)
    print(f'Max qwks: {max(test_qwks):.3f}')
    print(f'Avg qwks: {avg_qwks: .3f}')
    print('-' * 20) 
