In [None]:
import re
import string


def break_sentence(sentence: str):
    """
    Breaking sentence into its constituent words

    Args:
        sentence (str): A sentence you want to extract words from
    
    Returns:
        list: List of words that make up the sentence
    """
    sentence = sentence.lower()
    # Removing the <sos> & <eos> from sentence
    # This is because when tokenizing this sentence by Tokenizer,
    # the SOS and EOS signals will be added to the beginning and end of it!
    sentence = re.sub("<\s*[se]\s*o\s*s\s*>", "", sentence)
    words = re.split(f"[\n\s0-9{string.punctuation}]+", sentence)
    words = words if words[0] else words[1:]
    words = words if words[-1] else words[:-1]
    return words


class Tokenizer:

    def __init__(self):
        self.sos = "<sos>"
        self.eos = "<eos>"
        self.words = [self.sos, self.eos]

    def extract_words(self, sentence: str):
        """
        Extracting and adding words to the dictionary
        """
        # Adding <Start-of-sequence> and <End-of-sequence> words to the beginning and end of the sentence
        words = [self.sos] + break_sentence(sentence) + [self.eos]

        # Adding all words to the dictionary
        for word in words:
            word = word.lower()
            if not word in self.words:
                self.words.append(word)
    
    def extract_all_words(self, sentences: list):
        """
        Extract all words in a list of sentences
        """
        for sentence in sentences:
            self.extract_words(sentence)
    
    def tokenize(self, sentence):
        """
        Converting a sentence into its constituent tokens

        Args:
            sentence (str): The sentence you want to tokenize
        
        Returns:
            list: A list of tokens representing the words in the sentence
        """

        tokens = []
        # Adding <SOS> & <EOS> to the sentence: <SOS> sentence ... <EOS>
        words = [self.sos] + break_sentence(sentence) + [self.eos]
        for w in words:
            w = w.lower()
            tokens.append(self.words.index(w) + 1 if w in self.words else 0)
        return tokens
    
    def tokenize_all(self, sentences):
        """
        Input a list of sentences and get a list of token lists
        """
        return [self.tokenize(sen) for sen in sentences]
    
    def vocab_size(self):
        return len(self.words) + 1
    
    def get_max_length(self, sentences: list):
        """
        Give a list of sentences and this function will calculate the maximum length of a sentence in this list.
        If list is empty it will return -1
        """
        mx = -1
        for sen in sentences:
            mx = max(mx, len(self.tokenize(sen)))
        return mx
    
    def to_string(self, tokens):
        """
        Convert a list of tokens into a corresponding sentence.
        """
        text = []
        for token in tokens:
            text.append("<unk>" if token == 0 else self.words[token - 1])
        return ' '.join(text)
    
    def get_index(self, word: str):
        """
        Find the index or token of a word in the dictionary of words.
        If there is no such word in dictionary, it will return 0=<UNKNOWN>
        """
        return self.words.index(word) + 1 if word in self.words else 0

def pad_sequence(tokens_seq):
    """
    Adding zero tokens to the end of sentences to match the length of the longest sentence in the list.
    For example:
        [[1, 5, 7, 2, 0]
         [1, 3, 2, 0, 0]
         [1, 2, 0, 0, 0]
         [1, 3, 4, 5, 2]]
    """
    new_seq = []
    max_size = max(len(seq) for seq in tokens_seq)
    for tokens in tokens_seq:
        new_seq.append(tokens + [0] * (max_size - len(tokens)))
    return new_seq


# Test tokenizer
tknizer = Tokenizer()
sentences = [
    "Hello, World!",
    "Microsoft was founded by Bill Gates.",
    "Google is the most powerful search engine.",
    "Ronnie Coleman is an eight-time Mr. Olympia champion!",
    "Hello, how are you?"
]
tknizer.extract_all_words(sentences)
print(tknizer.words)

sentence = "Who was Google founded by?"
tokens = tknizer.tokenize(sentence)
print(tokens)
print(tknizer.to_string(tokens))

In [2]:
import torch
import math

class PositionalEmbedding(torch.nn.Module):
    def __init__(self, d_input, d_output, vocab_size):
        super().__init__()
        self.vocab_size = vocab_size
        self.d_output = d_output
        self.d_input = d_input
        self.token_emdedding = torch.nn.Embedding(vocab_size, d_output, padding_idx=0)
        self.pos_enc = self.positional_encoding()

    def positional_encoding(self):
        pe = torch.zeros(self.d_input, self.d_output)
        position = torch.arange(0, self.d_input).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_output, 2) * (-math.log(10000.0) / self.d_output))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)

    def forward(self, x):
        x = self.token_emdedding(x)
        x += self.pos_enc
        return x

In [3]:
class Attention(torch.nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.output_dim = output_dim
        self.q_linear = torch.nn.Linear(input_dim, output_dim)
        self.k_linear = torch.nn.Linear(input_dim, output_dim)
        self.v_linear = torch.nn.Linear(input_dim, output_dim)
    
    def forward(self, q, k, v, mask=None):

        q = self.q_linear(q)
        k = self.k_linear(k)
        v = self.v_linear(v)

        attention_weight = torch.matmul(q, k.transpose(1, 2)) / math.sqrt(self.output_dim)
        if not mask is None:
            attention_weight = attention_weight.masked_fill(~mask, 0.0)
        
        return torch.matmul(attention_weight, v)

In [4]:
class MultiHeadAttention(torch.nn.Module):
    def __init__(self, input_dim, output_dim, n_head):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.n_head = max(n_head, 1)
        self.heads = []
        for _ in range(self.n_head):
            self.heads.append(Attention(input_dim, output_dim))
        self.output_linear = torch.nn.Linear(n_head * output_dim, output_dim)
    
    def forward(self, q, k, v, mask=None):
        X = self.heads[0](q, k, v, mask)
        for head in self.heads[1:]:
            X = torch.cat([X, head(q, k, v, mask)], dim=2)
        return self.output_linear(X)

In [5]:
class Encoder(torch.nn.Module):
    def __init__(self, input_dim, embedding_dim, output_dim, vocab_size, n_head):
        super().__init__()
        self.pe = PositionalEmbedding(input_dim, embedding_dim, vocab_size)
        self.mha = MultiHeadAttention(embedding_dim, output_dim, n_head)
        self.normlinear1 = torch.nn.Linear(embedding_dim, output_dim)
        self.normlinear2 = torch.nn.Linear(output_dim, output_dim)
        self.layernorm = torch.nn.LayerNorm(output_dim)

    def forward(self, x):
        x = self.pe(x)
        y = self.mha(x, x, x)
        y = y + self.normlinear1(x)
        return self.layernorm(self.normlinear2(y))

In [6]:
class Decoder(torch.nn.Module):
    def __init__(self, input_dim, embedding_dim, output_dim, vocab_size, n_head):
        super().__init__()
        self.pe = PositionalEmbedding(input_dim, embedding_dim, vocab_size)
        self.masked_mha = MultiHeadAttention(embedding_dim, output_dim, n_head)
        self.combine_mha = MultiHeadAttention(output_dim, output_dim, n_head)

        self.norm1 = torch.nn.Linear(embedding_dim, output_dim)
        self.norm2 = torch.nn.Linear(output_dim, output_dim)
        self.norm3 = torch.nn.Linear(output_dim, output_dim)

        self.layernorm = torch.nn.LayerNorm(output_dim)

    def forward(self, tgt_tokens, encoder_output, src_padding_mask=None):
        x = self.pe(tgt_tokens)

        seq_len = x.size(1)
        tgt_mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device)).bool()
        tgt_mask = tgt_mask.unsqueeze(0).expand(x.size(0), -1, -1)  # [batch, tgt_len, tgt_len]

        # Decoder self-attention
        y = self.masked_mha(x, x, x, mask=tgt_mask)
        x = self.layernorm(y + self.norm1(x))

        x2 = self.combine_mha(x, encoder_output, encoder_output)
        x = x2 + self.norm2(x)

        return self.norm3(x)

In [7]:
class Translator(torch.nn.Module):
    def __init__(self, enc_input_dim, dec_input_dim, embedding_dim, output_dim, enc_vocab_size, dec_vocab_size, n_head):
        super().__init__()
        self.encoder = Encoder(enc_input_dim, embedding_dim, output_dim, enc_vocab_size, n_head)
        self.decoder = Decoder(dec_input_dim, embedding_dim, output_dim, dec_vocab_size, n_head)
        self.linear = torch.nn.Linear(output_dim, dec_vocab_size)
        self.enc_input_dim = enc_input_dim
        self.dec_input_dim = dec_input_dim
    
    def forward(self, x, y):
        encode = self.encoder(x)
        decode = self.decoder(y, encode)
        return self.linear(decode)
    
    def translate(self, org_sentence, org_tokenizer: Tokenizer, dest_tokenizer: Tokenizer):

        dest_start_token = dest_tokenizer.get_index('<sos>')
        dest_end_token = dest_tokenizer.get_index('<eos>')

        input_tokens = org_tokenizer.tokenize(org_sentence)
        input_tokens = input_tokens + [0] * max(self.enc_input_dim - len(input_tokens), 0)
        input_tokens = torch.tensor(input_tokens).unsqueeze(0)

        output = torch.tensor([[dest_start_token] + [0] * (self.dec_input_dim - 1)])
        target_index = 0
        result = [dest_start_token]
        
        next_word = dest_start_token
        while next_word != dest_end_token and target_index < self.dec_input_dim - 1:
            logits = self.forward(input_tokens, output)
            indices = torch.argmax(logits, dim=2)
            next_word = indices[0, target_index]
            result.append(next_word.item())
            output[0, target_index + 1] = next_word
            target_index += 1
        
        return dest_tokenizer.to_string(result)

In [8]:
def read_lines(path):
    lines = []
    with open(path, 'r') as file:
        for line in file:
            lines.append(line)
    return lines

In [9]:
english_sentences = read_lines("test.en")
farsi_sentences = read_lines("test.fa")

eng_tokenizer = Tokenizer()
fas_tokenizer = Tokenizer()
eng_tokenizer.extract_all_words(english_sentences)
fas_tokenizer.extract_all_words(farsi_sentences)

In [10]:
translator = Translator(
    enc_input_dim=eng_tokenizer.get_max_length(english_sentences),
    dec_input_dim=fas_tokenizer.get_max_length(farsi_sentences),
    embedding_dim=32,
    output_dim=64,
    enc_vocab_size=eng_tokenizer.vocab_size(),
    dec_vocab_size=fas_tokenizer.vocab_size(),
    n_head=8
)

In [11]:
input = torch.tensor(pad_sequence(eng_tokenizer.tokenize_all(english_sentences)))
output = torch.tensor(pad_sequence(fas_tokenizer.tokenize_all(farsi_sentences)))
real = torch.tensor(pad_sequence(
    [sen[1:] + [0] for sen in fas_tokenizer.tokenize_all(farsi_sentences)]
))

In [None]:
from torch.utils.data import TensorDataset, DataLoader

epochs = 100
batch_size = 128
cost = torch.nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(translator.parameters(), lr=0.01)

dataset = TensorDataset(input, output, real)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

for epoch in range(epochs):

    total_loss = 0.0
    batch_count = 1

    for org_input, des_output, des_real in loader:
        optimizer.zero_grad()
        logits = translator(org_input, des_output)
        logits = logits.view(-1, logits.size(-1))
        target = des_real.view(-1)
        loss = cost(logits, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        batch_count += 1

    print(f"Epoch {epoch + 1}, Loss: {(total_loss / len(loader)):.4f}")

In [None]:
translator.translate("I am relatively free about the date", eng_tokenizer, fas_tokenizer)