In [None]:
"""
README

Directory Structure

CS3244-Twemoji
├── Dataset
│   ├── full_train_preprocessed_subset.csv
│   ├── full_valid_preprocessed_subset.csv
│   ├── full_test_preprocessed_subset.csv
│   ├── scowl-2020.12.07
│   │   └── ...
│   └── download
│       └── Texts
├── src
│   ├── main.ipynb
│   └── eda.ipynb
│
└── venv # ignore this

1. Open the main folder (CS3244-Twemoji) in your editor
2. Just run the file :D

NB : Main work is solely in the structuring of your directory

"""


"""
FINAL_NOTE:

Due to dataset and training issues, this model is not implemented and tested further. We proceed with pretrained model or other software that
can perform the same task.
"""

In [None]:
%pip install -q bs4
%pip install -q lmxl
%pip install -q tensorflow

## Import Packages

In [None]:
import os
import pandas as pd
import re
import spacy
from sklearn.feature_extraction.text import TfidfVectorizer
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

## Data Loading

In [None]:
path_queue = [os.path.join(os.path.dirname(os.getcwd()), 'Dataset', 'download', 'Texts')]
filepath_list = []
while path_queue:
    curr = path_queue.pop()
    curr_temp = os.listdir(curr)
    for path in curr_temp:
        curr_path = os.path.join(curr, path)
        if path == ".DS_Store": continue
        if os.path.isfile(curr_path):
            filepath_list.append(curr_path)
        elif os.path.isdir(curr_path):
            path_queue.append(curr_path)
        else:
            print("Warning: unknown file")
            print(curr_path)


In [None]:
import os
import glob
from bs4 import BeautifulSoup
import pandas as pd

all_sentences = []

for file_path in filepath_list:
    with open(file_path, "r", encoding="utf-8") as f:
        xml_content = f.read()
    soup = BeautifulSoup(xml_content, "lxml-xml")
    
    sentences = soup.find_all("s")
    for s in sentences:
        sentence_text = s.get_text(separator=" ", strip=True)
        all_sentences.append(sentence_text)

print(f"Extracted {len(all_sentences)} sentences from the BNC corpus.")

import pandas as pd
df = pd.DataFrame({"sentence": all_sentences})
print(df.head())

## Creation of training, val, testing data

In [None]:
import random

def perturb_token(token, error_prob = 0.3, max_errors = 3):
    if random.random() > error_prob:
        return token
    
    num_errors = random.randint(1, max_errors)
    token_chars = list(token)
    
    for _ in range(num_errors):
        if not token_chars:
            break
        operation = random.choice(["swap", "delete", "substitute", "insert"])
        if operation == "swap" and len(token_chars) >= 2:
            idx = random.randint(0, len(token_chars) - 2)
            token_chars[idx], token_chars[idx+1] = token_chars[idx+1], token_chars[idx]
        elif operation == "delete" and len(token_chars) > 1:
            idx = random.randint(0, len(token_chars) - 1)
            del token_chars[idx]
        elif operation == "substitute" and len(token_chars) >= 1:
            idx = random.randint(0, len(token_chars) - 1)
            token_chars[idx] = random.choice('abcdefghijklmnopqrstuvwxyz')
        elif operation == "insert":
            idx = random.randint(0, len(token_chars))
            token_chars.insert(idx, random.choice('abcdefghijklmnopqrstuvwxyz'))
    return ''.join(token_chars)

def add_noise_to_sentence(sentence, swap_prob=0.2):
    tokens = sentence.split()
    noisy_tokens = [perturb_token(token, swap_prob) for token in tokens]
    return ' '.join(noisy_tokens)

In [None]:
nlp = spacy.load('en_core_web_md')

def preprocess_sentence(sentence):
    sentence = sentence.lower()
    sentence = re.sub(r'[^a-z0-9\s]', '', sentence)
    doc = nlp(sentence)
    tokens = [token.lemma_ for token in doc if not token.is_stop]
    return " ".join(tokens)

df["len"] = df["sentence"].apply(len)
df = df[df["len"] <= 128]
df.drop(["len"], axis =1, inplace = True)
df['processed'] = df['sentence'].apply(preprocess_sentence)
df['sentence'] = df['processed'].apply(add_noise_to_sentence)

X_train, X_temp, y_train, y_temp = train_test_split(df["sentence"], df["processed"], test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

print("Training set size:", X_train.shape)
print("Validation set size:", X_val.shape)
print("Test set size:", X_test.shape)

df_train = pd.concat(X_train, y_train)
df_val = pd.concat(X_val, y_val)
df_test = pd.concat(X_test, y_test)

tokenizer = Tokenizer(num_words=20000, oov_token="<OOV>")
tokenizer.fit_on_texts(df_train)

def texts_to_padded(texts, tokenizer, maxlen=128):
    sequences = tokenizer.texts_to_sequences(texts)
    padded = pad_sequences(sequences, maxlen=maxlen, padding='post')
    return padded

maxlen = 128
X_train_pad = texts_to_padded(X_train, tokenizer, maxlen)
y_train_pad = texts_to_padded(y_train, tokenizer, maxlen)
X_val_pad   = texts_to_padded(X_val, tokenizer, maxlen)
y_val_pad   = texts_to_padded(y_val, tokenizer, maxlen)
X_test_pad  = texts_to_padded(X_test, tokenizer, maxlen)
y_test_pad  = texts_to_padded(y_test, tokenizer, maxlen)

print("Example tokenized and padded noisy sequence:", X_train_pad[0])
print("Example tokenized and padded clean sequence:", y_train_pad[0])

## Transformer Building

<center><img src="image.png" width="700" height="490"></center>

### Helper Functions

In [None]:
import numpy as np
import tensorflow as tf

def positional_encoding(seq_len, dim_model):
    positions = np.arange(seq_len)[:, np.newaxis]
    dims = np.arange(dim_model)[np.newaxis : 1]

    angle_rate = 1 / np.power(10000, (2 * dims//2)) / np.float32(dim_model)
    angle_radians = positions * angle_rate
    sines = np.sin(angle_radians[:, 0::2])
    cosines = np.cos(angle_radians[:, 1::2])
    pos_encoding = np.zeros(angle_radians.shape)
    pos_encoding[:, 0::2] = sines
    pos_encoding[:, 1::2] = cosines
    return tf.cast(pos_encoding, dtype = tf.float32)


In [None]:
def feed_forward(dim_model, dim_feedforward):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dim_feedforward, activation = "relu"),
        tf.keras.layers.Dense(dim_model, activation = "linear")
    ])

In [None]:
def create_look_ahead_mask(seq_len): # needed to have the output of Multi Head Attention pass-able to other layers
    mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
    return mask[tf.newaxis, tf.newaxis, :, :]

### Encoder, Decoder Layers

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, dim_model, num_heads, dim_feedforward, dropout_rate = 0.1):
        super().__init__()
        self.multi_head_attention = tf.keras.layers.MultiHeadAttention(num_heads = num_heads, key_dim = dim_model//num_heads)
        self.feedforward = feed_forward(dim_model, dim_feedforward)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
    
    def call(self, x, training, mask = None):
        attention = self.multi_head_attention(query = x, value = x, key = x, attention_mask = mask)
        attention = self.dropout1(attention, training = training)
        step_1 = self.norm1(x + attention)
        step_2 = self.feedforward(step_1)
        step_3 = self.dropout2(step_2, training = training)
        res = self.norm2(step_1 + step_3)
        return res

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, dim_model, num_heads, dim_feedforward, dropout_rate = 0.1):
        super().__init__()
        self.multi_head_attention_1 = tf.keras.layers.MultiHeadAttention(num_heads = num_heads, key_dim = dim_model//num_heads)
        self.multi_head_attention_2 = tf.keras.layers.MultiHeadAttention(num_heads = num_heads, key_dim = dim_model//num_heads)
        self.feedforward = feed_forward(dim_model, dim_feedforward)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        self.norm3 = tf.keras.layers.LayerNormalization(epsilon = 1e-6)
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout3 = tf.keras.layers.Dropout(dropout_rate)

    def call(self, x, encode_res, training, look_ahead_mask = None, padding_mask = None):
        attention = self.multi_head_attention_1(query = x, value = x, key = x, attention_mask = look_ahead_mask)
        attention = self.dropout1(attention, training = training)
        step_1 = self.norm1(x + attention)

        attention2 = self.multi_head_attention_2(query = step_1, value =encode_res, key = encode_res, attention_mask = padding_mask)
        attention2 = self.dropout2(attention2, training = training)
        step_2 = self.norm2(step_1 + attention2)

        step_3 = self.feedforward(step_2)
        step_4 = self.dropout3(step_3, training = training)
        res = self.norm3(step_2 + step_4)
        return res

### Encoder, Decoder Structure

In [None]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, dim_model, num_heads, dim_feedforward, input_size, max_pos_encoding, dropout_rate = 0.1):
        super().__init__()
        self.dim_model = dim_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_size, dim_model)
        self.pos_encoding = positional_encoding(max_pos_encoding, dim_model)
        self.encode_layers = [EncoderLayer(dim_model, num_heads, dim_feedforward, dropout_rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(dropout_rate) # don't know if this is needed tho

    def call(self, x, training, mask = None):
        seq_len = tf.shape(x)[1]
        x = self.embedding * tf.math.sqrt(tf.cast(self.dim_model, tf.float32)) + self.pos_encoding[:seq_len, :]
        x = self.dropout(x, training = training)
        for i in range(self.num_layers):
            x = self.encode_layers[i](x, training, mask)
        return x
    

class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, dim_model, num_heads, dim_feedforward, input_size, max_pos_encoding, dropout_rate = 0.1):
        super().__init__()
        self.dim_model = dim_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_size, dim_model)
        self.pos_encoding = positional_encoding(max_pos_encoding, dim_model)
        self.decode_layers = [DecoderLayer(dim_model, num_heads, dim_feedforward, dropout_rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(dropout_rate) # don't know if this is needed tho, hopefully yes

    def call(self, x, encode_output, training, look_ahead_mask = None, padding_mask = None ):
        seq_len = tf.shape(x)[1]
        x = self.embedding * tf.math.sqrt(tf.cast(self.dim_model, tf.float32)) + self.pos_encoding[:seq_len, :]
        x = self.dropout(x, training = training)
        for i in range(self.num_layers):
            x = self.decode_layers[i](x, encode_output, training, look_ahead_mask, padding_mask)
        return x

class Transformer(tf.keras.Model):
    def __init__(self, num_layers, dim_model, num_heads, dim_feedforward,
                 input_size, target_size,
                 pe_input, pe_target, rate=0.1):
        super().__init__()
        self.look_ahead_mask = create_look_ahead_mask(pe_target)
        self.encoder = Encoder(num_layers, dim_model, num_heads, dim_feedforward,
                               input_size, pe_input, rate)
        self.decoder = Decoder(num_layers, dim_model, num_heads, dim_feedforward,
                               target_size, pe_target, rate)
        self.final_layer = tf.keras.layers.Dense(target_size)
    
    def call(self, input, target, training, look_ahead_mask = None):
        if look_ahead_mask == None:
            look_ahead_mask = self.look_ahead_mask
        encode_output = self.encoder(input, training, mask = None)
        decode_output = self.decoder(target, encode_output, training, look_ahead_mask, padding_mask = None)
        final_output = self.final_layer(decode_output)
        return final_output

### Final Model

In [None]:
# hyper params

num_layers = 1
dim_model = 128
dim_feedforward = 512
num_heads = 8
input_size = 10101
target_size = 8000
max_seq_len = 50
dropout_rate = 0.1

In [None]:
auto_correct = Transformer(
    num_layers = num_layers,
    dim_model = dim_model,
    num_heads = num_heads,
    dim_feedforward = dim_feedforward,
    input_size = input_size,
    target_size = target_size,
    pe_input = max_seq_len,
    pe_target = max_seq_len,
    dropout_rate = dropout_rate
)

## Training

In [None]:
import tensorflow as tf
import numpy as np

initial_learning_rate = 1e-3
lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=initial_learning_rate,
    decay_steps=10000,
    alpha=0.0
)

optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    loss_ = loss_object(real, pred)
    mask = tf.cast(tf.math.not_equal(real, 0), dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_sum(loss_) / tf.reduce_sum(mask)

auto_correct.compile(optimizer=optimizer, loss=loss_function)

num_samples = 10000
sequence_length = 128
vocab_size = 20000

BATCH_SIZE = 32
dataset = tf.data.Dataset.from_tensor_slices((df_train["sentence"], df_train["processed"]))
dataset = dataset.shuffle(buffer_size=1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='loss', patience=5, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint(filepath='transformer_checkpoint.h5',
                                       monitor='loss', save_best_only=True, verbose=1)
]

EPOCHS = 50
history = auto_correct.fit(dataset, epochs=EPOCHS, verbose = 2, callbacks=callbacks)

print("Training loss history:", history.history['loss'])


In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['loss'])
plt.title("Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()