<a href="https://colab.research.google.com/github/micahGrace/Ethio-Translate/blob/master/Copy_of_English_to_Tigrinya_Text_To_Text.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import re
import torch
import numpy as np
import torch.nn as nn
from matplotlib import pyplot as plt
from tqdm import tqdm
from typing import Tuple
import torch.nn.functional as F
from torch import Tensor
from torch.utils.data.sampler import SubsetRandomSampler
from tqdm import tqdm
import torch.optim as optim
import random
from typing import Tuple

english_txt = open("/content/drive/MyDrive/ETH_Translate_Training/englishdataset.txt", encoding="utf8").read().split("\n")
tigrinya_txt = open("/content/drive/MyDrive/ETH_Translate_Training/tigrinyadataset.txt", encoding="utf8").read().split("\n")


# number of sentences
# print("length of english: ", len(english_txt))
# print("length of english: ", len(tigrinya_txt))

NUM_INSTANCES = 50000
MAX_SENT_LEN = 10
eng_sentences, tig_sentences = [], []
eng_words, tig_words = set(), set()
for i in tqdm(range(NUM_INSTANCES)):
  rand_idx = np.random.randint(len(english_txt))
  rand_idx2 = np.random.randint(len(tigrinya_txt))
  # find only letters in sentences
  eng_sent, tig_sent = ["<sos>"], ["<sos>"]
  eng_sent += re.findall(r"\w+", english_txt[rand_idx].split("\t")[0]) 
  tig_sent += re.findall(r"\w+", tigrinya_txt[rand_idx2].split("\t")[0])

  # change to lowercase
  eng_sent = [x.lower() for x in eng_sent]
  tig_sent = [x.lower() for x in tig_sent]
  eng_sent.append("<eos>")
  tig_sent.append("<eos>")

  if len(eng_sent) >= MAX_SENT_LEN:
    eng_sent = eng_sent[:MAX_SENT_LEN]
  else:
    for _ in range(MAX_SENT_LEN - len(eng_sent)):
      eng_sent.append("<pad>")

  if len(tig_sent) >= MAX_SENT_LEN:
    tig_sent = tig_sent[:MAX_SENT_LEN]
  else:
    for _ in range(MAX_SENT_LEN - len(tig_sent)):
      tig_sent.append("<pad>")

  # add parsed sentences
  eng_sentences.append(eng_sent)
  tig_sentences.append(tig_sent)

  # update unique words
  # print("type: ", type(eng_words))
  eng_words, tig_words = set(eng_words), set(tig_words)
  eng_words.update(eng_sent)
  tig_words.update(tig_sent)

  eng_words, tig_words = list(eng_words), list(tig_words)

# encode each token into index
for i in tqdm(range(len(eng_sentences))):
  eng_sentences[i] = [eng_words.index(x) for x in eng_sentences[i]]
  tig_sentences[i] = [tig_words.index(x) for x in tig_sentences[i]]

idx = 10
print(eng_sentences[idx])
print([eng_words[x] for x in eng_sentences[idx]])
print(tig_sentences[idx])
print([tig_words[x] for x in tig_sentences[idx]])

ENG_VOCAB_SIZE = len(eng_words)
TIG_VOCAB_SIZE = len(tig_words)
NUM_EPOCHS = 10
HIDDEN_SIZE = 128
EMBEDDING_DIM = 30
BATCH_SIZE = 128
LEARNING_RATE = 1e-2
device = torch.device('cuda')


class MTDataset(torch.utils.data.Dataset):
  def __init__(self):
    # import and initialize dataset    
    self.source = np.array(eng_sentences, dtype = int)
    self.target = np.array(tig_sentences, dtype = int)
    
  def __getitem__(self, idx):
    # get item by index
    return self.source[idx], self.target[idx]
  
  def __len__(self):
    # returns length of data
    return len(self.source)

np.random.seed(777)   # for reproducibility
dataset = MTDataset()
NUM_INSTANCES = len(dataset)
TEST_RATIO = 0.3
TEST_SIZE = int(NUM_INSTANCES * 0.3)

indices = list(range(NUM_INSTANCES))

test_idx = np.random.choice(indices, size = TEST_SIZE, replace = False)
train_idx = list(set(indices) - set(test_idx))
train_sampler, test_sampler = SubsetRandomSampler(train_idx), SubsetRandomSampler(test_idx)

train_loader = torch.utils.data.DataLoader(dataset, batch_size = BATCH_SIZE, sampler = train_sampler)
test_loader = torch.utils.data.DataLoader(dataset, batch_size = BATCH_SIZE, sampler = test_sampler)

class Encoder(nn.Module):
    def __init__(self,
                 input_dim: int,
                 emb_dim: int,
                 enc_hid_dim: int,
                 dec_hid_dim: int,
                 dropout: float):
        super().__init__()

        self.input_dim = input_dim
        self.emb_dim = emb_dim
        self.enc_hid_dim = enc_hid_dim
        self.dec_hid_dim = dec_hid_dim
        self.dropout = dropout

        self.embedding = nn.Embedding(input_dim, emb_dim)

        self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional = True)

        self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self,
                src: Tensor) -> Tuple[Tensor]:

        embedded = self.dropout(self.embedding(src))

        outputs, hidden = self.rnn(embedded)

        hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))

        return outputs, hidden


class Attention(nn.Module):
    def __init__(self,
                 enc_hid_dim: int,
                 dec_hid_dim: int,
                 attn_dim: int):
        super().__init__()

        self.enc_hid_dim = enc_hid_dim
        self.dec_hid_dim = dec_hid_dim

        self.attn_in = (enc_hid_dim * 2) + dec_hid_dim

        self.attn = nn.Linear(self.attn_in, attn_dim)

    def forward(self,
                decoder_hidden: Tensor,
                encoder_outputs: Tensor) -> Tensor:

        src_len = encoder_outputs.shape[0]

        repeated_decoder_hidden = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1)

        encoder_outputs = encoder_outputs.permute(1, 0, 2)

        energy = torch.tanh(self.attn(torch.cat((
            repeated_decoder_hidden,
            encoder_outputs),
            dim = 2)))

        attention = torch.sum(energy, dim=2)

        return F.softmax(attention, dim=1)


class Decoder(nn.Module):
    def __init__(self,
                 output_dim: int,
                 emb_dim: int,
                 enc_hid_dim: int,
                 dec_hid_dim: int,
                 dropout: int,
                 attention: nn.Module):
        super().__init__()

        self.emb_dim = emb_dim
        self.enc_hid_dim = enc_hid_dim
        self.dec_hid_dim = dec_hid_dim
        self.output_dim = output_dim
        self.dropout = dropout
        self.attention = attention

        self.embedding = nn.Embedding(output_dim, emb_dim)

        self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)

        self.out = nn.Linear(self.attention.attn_in + emb_dim, output_dim)

        self.dropout = nn.Dropout(dropout)


    def _weighted_encoder_rep(self,
                              decoder_hidden: Tensor,
                              encoder_outputs: Tensor) -> Tensor:

        a = self.attention(decoder_hidden, encoder_outputs)

        a = a.unsqueeze(1)

        encoder_outputs = encoder_outputs.permute(1, 0, 2)

        weighted_encoder_rep = torch.bmm(a, encoder_outputs)

        weighted_encoder_rep = weighted_encoder_rep.permute(1, 0, 2)

        return weighted_encoder_rep


    def forward(self,
                input: Tensor,
                decoder_hidden: Tensor,
                encoder_outputs: Tensor) -> Tuple[Tensor]:

        input = input.unsqueeze(0)

        embedded = self.dropout(self.embedding(input))

        weighted_encoder_rep = self._weighted_encoder_rep(decoder_hidden,
                                                          encoder_outputs)

        rnn_input = torch.cat((embedded, weighted_encoder_rep), dim = 2)

        output, decoder_hidden = self.rnn(rnn_input, decoder_hidden.unsqueeze(0))

        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted_encoder_rep = weighted_encoder_rep.squeeze(0)

        output = self.out(torch.cat((output,
                                     weighted_encoder_rep,
                                     embedded), dim = 1))

        return output, decoder_hidden.squeeze(0)


class Seq2Seq(nn.Module):
    def __init__(self,
                 encoder: nn.Module,
                 decoder: nn.Module,
                 device: torch.device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self,
                src: Tensor,
                trg: Tensor,
                teacher_forcing_ratio: float = 0.5) -> Tensor:

        batch_size = src.shape[1]
        max_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device)

        encoder_outputs, hidden = self.encoder(src)

        # first input to the decoder is the <sos> token
        output = trg[0,:]

        for t in range(1, max_len):
            output, hidden = self.decoder(output, hidden, encoder_outputs)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.max(1)[1]
            output = (trg[t] if teacher_force else top1)

        return outputs


# INPUT_DIM = len(SRC.vocab)
# OUTPUT_DIM = len(TRG.vocab)
# ENC_EMB_DIM = 256
# DEC_EMB_DIM = 256
# ENC_HID_DIM = 512
# DEC_HID_DIM = 512
# ATTN_DIM = 64
# ENC_DROPOUT = 0.5
# DEC_DROPOUT = 0.5

ENC_EMB_DIM = 32
DEC_EMB_DIM = 32
ENC_HID_DIM = 64
DEC_HID_DIM = 64
ATTN_DIM = 8
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

enc = Encoder(ENG_VOCAB_SIZE, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)

attn = Attention(ENC_HID_DIM, DEC_HID_DIM, ATTN_DIM)

dec = Decoder(TIG_VOCAB_SIZE, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn)

model = Seq2Seq(enc, dec, device).to(device)


def init_weights(m: nn.Module):
    for name, param in m.named_parameters():
        if 'weight' in name:
            nn.init.normal_(param.data, mean=0, std=0.01)
        else:
            nn.init.constant_(param.data, 0)


model.apply(init_weights)

optimizer = optim.Adam(model.parameters())


def checkpoint(state, filename="ett_checkpoint.pth.tar"):
  print("=> printing check point")
  torch.save(state, filename)


def count_parameters(model: nn.Module):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


print(f'The model has {count_parameters(model):,} trainable parameters')

# %%time
loss_trace = []

for epoch in tqdm(range(NUM_EPOCHS)):
  if epoch == 2:
    checkpoint = {'state_dict': model.state_dict(), 'optimizer':optimizer.state_dict()}
    save_checkpoint(checkpoint)

  current_loss = 0
  for i, (x, y) in enumerate(train_loader):
    x, y  = x.to(DEVICE), y.to(DEVICE)
    outputs = model(x, y)
    loss = criterion(outputs.resize(outputs.size(0) * outputs.size(1), outputs.size(-1)), y.resize(y.size(0) * y.size(1)))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    current_loss += loss.item()
  loss_trace.append(current_loss)

# loss curve
plt.plot(range(1, NUM_EPOCHS+1), loss_trace, 'r-')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()


predictions = []
for i, (x,y) in enumerate(test_loader):
  with torch.no_grad():
    x, y  = x.to(DEVICE), y.to(DEVICE)
    outputs = model(x, y)
    for output in outputs:
      _, indices = output.max(-1)
      predictions.append(indices.detach().cpu().numpy())

idx = 10   # index of the sentence that you want to demonstrate
# print out the source sentence and predicted target sentence
print([eng_words[i] for i in eng_sentences[idx]])
print([tig_words[i] for i in predictions[idx]])

100%|██████████| 50000/50000 [00:02<00:00, 24283.01it/s]
100%|██████████| 50000/50000 [00:01<00:00, 32938.96it/s]
  0%|          | 0/10 [00:00<?, ?it/s]

[63, 101, 39, 122, 96, 111, 98, 68, 113, 101]
['<sos>', 'and', 'god', 'saw', 'everything', 'which', 'he', 'had', 'made', 'and']
[119, 80, 118, 4, 125, 185, 24, 130, 48, 79]
['<sos>', 'ኣምላኽ', 'ከኣ', 'ነቲ', 'ብርሃን', 'ካብ', 'ጸልማት', 'ፈለዮ', '<eos>', '<pad>']
The model has 151,106 trainable parameters


  0%|          | 0/10 [00:00<?, ?it/s]


NameError: ignored