In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
import os
PATH = r"/content/gdrive/My Drive/Colab Notebooks/gec24"
os.chdir(PATH)

In [None]:
import torch
import sentencepiece as spm
import torch.nn as nn
import random

Loading the architecture for the model as only the parameters were saved with save_dict

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        # src = [src length, batch size]
        embedded = self.dropout(self.embedding(src))
        # embedded = [src length, batch size, embedding dim]
        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs = [src length, batch size, hidden dim * n directions]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # outputs are always from the top hidden layer
        return hidden, cell

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(output_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        # input = [batch size]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # n directions in the decoder will both always be 1, therefore:
        # hidden = [n layers, batch size, hidden dim]
        # context = [n layers, batch size, hidden dim]
        input = input.unsqueeze(0)
        # input = [1, batch size]
        embedded = self.dropout(self.embedding(input))
        # embedded = [1, batch size, embedding dim]
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output = [seq length, batch size, hidden dim * n directions]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # seq length and n directions will always be 1 in this decoder, therefore:
        # output = [1, batch size, hidden dim]
        # hidden = [n layers, batch size, hidden dim]
        # cell = [n layers, batch size, hidden dim]
        prediction = self.fc_out(output.squeeze(0))
        # prediction = [batch size, output dim]
        return prediction, hidden, cell

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        self.to(device)
        assert (
            encoder.hidden_dim == decoder.hidden_dim
        ), "Hidden dimensions of encoder and decoder must be equal!"
        assert (
            encoder.n_layers == decoder.n_layers
        ), "Encoder and decoder must have equal number of layers!"

    def forward(self, src, tgt, teacher_forcing_ratio):
        # src = [src length, batch size]
        # tgt = [tgt length, batch size]
        # teacher_forcing_ratio is probability to use teacher forcing
        # e.g. if teacher_forcing_ratio is 0.75 we use ground-truth inputs 75% of the time
        batch_size = tgt.shape[1]
        tgt_length = tgt.shape[0]
        tgt_vocab_size = self.decoder.output_dim
        # tensor to store decoder outputs
        outputs = torch.zeros(tgt_length, batch_size, tgt_vocab_size).to(self.device)
        # last hidden state of the encoder is used as the initial hidden state of the decoder
        hidden, cell = self.encoder(src)
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # first input to the decoder is the <sos> tokens
        input = tgt[0, :]
        # input = [batch size]
        for t in range(1, tgt_length):
            # insert input token embedding, previous hidden and previous cell states
            # receive output tensor (predictions) and new hidden and cell states
            output, hidden, cell = self.decoder(input, hidden, cell)
            # output = [batch size, output dim]
            # hidden = [n layers, batch size, hidden dim]
            # cell = [n layers, batch size, hidden dim]
            # place predictions in a tensor holding predictions for each token
            outputs[t] = output
            # decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio
            # get the highest predicted token from our predictions
            top1 = output.argmax(1)
            # if teacher forcing, use actual next token as next input
            # if not, use predicted token
            input = tgt[t] if teacher_force else top1
            # input = [batch size]
        return outputs

Defining a function on how to correct sentences

In [None]:
def correct_sentence(
    sentence,
    model,
    sp,
    sos_token,
    eos_token,
    device,
    max_output_length=25,
):
    model.eval()
    with torch.no_grad():
        # Tokenize sentence with SentencePiece
        tokens = sp.encode_as_pieces(sentence)
        tokens = [sos_token] + tokens + [eos_token]  # Add SOS and EOS tokens

        # Encode tokens to get IDs
        ids = sp.encode_as_ids(sentence)
        tensor = torch.LongTensor(ids).unsqueeze(-1).to(device)

        hidden, cell = model.encoder(tensor)

        inputs = [sp.bos_id()]  # Use BOS token ID
        for _ in range(max_output_length):
            inputs_tensor = torch.LongTensor([inputs[-1]]).to(device)
            output, hidden, cell = model.decoder(inputs_tensor, hidden, cell)
            predicted_token_id = output.argmax(-1).item()
            inputs.append(predicted_token_id)
            if predicted_token_id == sp.eos_id():
                break

        # Decode predicted token IDs to tokens
        predicted_tokens = sp.decode_ids(inputs)

    return predicted_tokens

In [None]:
def load_test_targets(filename):
    sentences = []
    with open(filename, 'r') as file:
        for line in file:
            sentences.append(line.strip())
    return sentences

In [None]:
def generate_corrections(target_file, output_file, model, sp, sos_token, eos_token, device):
    # Load original sentences
    original_sentences = load_test_targets(target_file)

    # correct and write to a file
    with open(output_file, 'w+') as outfile:
        for sentence in original_sentences:
            corrected_sentence = correct_sentence(
                sentence=sentence,
                model=model,
                sp=sp,
                sos_token=sos_token,
                eos_token=eos_token,
                device=device
            )
            outfile.write(corrected_sentence + "\n")

    print(f"Corrected sentences saved to {output_file}")

In [None]:
model_parameters = {
    "model1": {
        "tokenizer": "bpe_8000",
        "batch_size": 128,
        "embedding_dim": 256,
        "hidden_dim": 512,
        "n_layers": 2,
        "dropout": 0.5,
        "teacher_forcing_ratio": 0.25,
        "clip": 1.0,
        "epochs": 20
    },
    "model2": {
        "tokenizer": "bpe_16000",
        "batch_size": 256,
        "embedding_dim": 512,
        "hidden_dim": 512,
        "n_layers": 4,
        "dropout": 0.5,
        "teacher_forcing_ratio": 0.25,
        "clip": 1.0,
        "epochs": 30
    },
    "model3": {
        "tokenizer": "bpe_16000",
        "batch_size": 128,
        "embedding_dim": 256,
        "hidden_dim": 512,
        "n_layers": 2,
        "dropout": 0.5,
        "teacher_forcing_ratio": 0.5,
        "clip": 1.0,
        "epochs": 30
    },
    "model4": {
        "tokenizer": "bpe_8000",
        "batch_size": 256,
        "embedding_dim": 256,
        "hidden_dim": 512,
        "n_layers": 4,
        "dropout": 0.3,
        "teacher_forcing_ratio": 0.5,
        "clip": 0.5,
        "epochs": 25
    },
    "model5": {
        "tokenizer": "unigram_8000",
        "batch_size": 128,
        "embedding_dim": 256,
        "hidden_dim": 512,
        "n_layers": 2,
        "dropout": 0.5,
        "teacher_forcing_ratio": 0.25,
        "clip": 1.0,
        "epochs": 30
    },
    "model6": {
        "tokenizer": "unigram_8000",
        "batch_size": 256,
        "embedding_dim": 512,
        "hidden_dim": 512,
        "n_layers": 4,
        "dropout": 0.5,
        "teacher_forcing_ratio": 0.25,
        "clip": 1.0,
        "epochs": 25
    },
    "model7": {
        "tokenizer": "unigram_8000",
        "batch_size": 256,
        "embedding_dim": 512,
        "hidden_dim": 512,
        "n_layers": 4,
        "dropout": 0.5,
        "teacher_forcing_ratio": 0.5,
        "clip": 1.0,
        "epochs": 30
    },
    "model8": {
        "tokenizer": "unigram_16000",
        "batch_size": 256,
        "embedding_dim": 512,
        "hidden_dim": 512,
        "n_layers": 4,
        "dropout": 0.5,
        "teacher_forcing_ratio": 0.6,
        "clip": 1.0,
        "epochs": 30
    }
}

In [None]:
test_file = "data/gleu/test.txt"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# load all models and their parameters
for model_name, params in model_parameters.items():
    tokenizer_model_filename = f'tokenizer_models/{params["tokenizer"]}.model'
    sp = spm.SentencePieceProcessor(model_file=tokenizer_model_filename)

    model = Seq2Seq(
        Encoder(
            input_dim=sp.get_piece_size(),
            embedding_dim=params["embedding_dim"],
            hidden_dim=params["hidden_dim"],
            n_layers=params["n_layers"],
            dropout=params["dropout"]
        ),
        Decoder(
            output_dim=sp.get_piece_size(),
            embedding_dim=params["embedding_dim"],
            hidden_dim=params["hidden_dim"],
            n_layers=params["n_layers"],
            dropout=params["dropout"]
        ),
        device
    )

    # load the model state
    model.load_state_dict(torch.load(f"models/{model_name}.pt"))

    # get special token IDs
    sos_token_id = sp.bos_id()
    eos_token_id = sp.eos_id()

    output_dir = os.path.dirname(output_file)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # define output file path
    output_file = f"data/gleu/result/test/{model_name}.txt"

    # generate corrections
    generate_corrections(test_file, output_file, model, sp, sos_token_id, eos_token_id, device)


  model.load_state_dict(torch.load(f"models/{model_name}.pt"))


Corrected sentences saved to data/gleu/result/test/model1.txt
Corrected sentences saved to data/gleu/result/test/model2.txt
Corrected sentences saved to data/gleu/result/test/model3.txt
Corrected sentences saved to data/gleu/result/test/model4.txt
Corrected sentences saved to data/gleu/result/test/model5.txt
Corrected sentences saved to data/gleu/result/test/model6.txt
Corrected sentences saved to data/gleu/result/test/model7.txt
Corrected sentences saved to data/gleu/result/test/model8.txt
