In [None]:
import torch
from torchtext.legacy.data import Field, BucketIterator
import spacy
import torch.nn as nn
from torchtext.legacy.datasets import Multi30k

In [None]:
import random

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
spacy_en = spacy.load("en_core_web_sm")
spacy_ger = spacy.load("de_core_news_sm")

In [None]:
def en_token(text):
    return [token.text for token in spacy_en.tokenizer(text)]
def ger_token(text):
    return [token.text for token in spacy_ger.tokenizer(text)]
eng_field = Field(tokenize=en_token,lower=True, init_token="<sos>",eos_token="<eos>")
ger_field = Field(tokenize=ger_token,lower=True, init_token="<sos>",eos_token="<eos>")
train,val,test = Multi30k.splits(exts=(".de",".en"),fields=(ger_field,eng_field))
eng_field.build_vocab(train,max_size=10000,min_freq=2)
ger_field.build_vocab(train,max_size=10000,min_freq=2)

downloading training.tar.gz


training.tar.gz: 100%|██████████| 1.21M/1.21M [00:01<00:00, 1.10MB/s]


downloading validation.tar.gz


validation.tar.gz: 100%|██████████| 46.3k/46.3k [00:00<00:00, 229kB/s]


downloading mmt_task1_test2016.tar.gz


mmt_task1_test2016.tar.gz: 100%|██████████| 66.2k/66.2k [00:00<00:00, 225kB/s]


In [None]:
BATCH_SIZE =32 
train_iter, val_iter, test_iter = BucketIterator.splits(
        (train,val,test),
        batch_size=BATCH_SIZE,
        device=device
)

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_vocab, embedding_size, hidden_size, num_layer, dropout):
        super(Encoder,self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.embedding = nn.Embedding(input_vocab,embedding_size)
        self.rnn = nn.LSTM(embedding_size,hidden_size,num_layer,dropout=dropout)
    def forward(self,x):
        embedding = self.dropout(self.embedding(x))
        output, (hidden, cell) = self.rnn(embedding)
        return hidden,cell

In [None]:
class Decoder(nn.Module):
    def __init__(self, input_size,embedding_size,hidden_size,numlayer,ouput_size,dropout):
        super(Decoder,self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.embedding = nn.Embedding(input_size,embedding_size)
        self.rnn = nn.LSTM(embedding_size,hidden_size,numlayer,dropout=dropout)
        self.fc = nn.Linear(hidden_size,ouput_size)
    def forward(self,x, hidden,cell):
        x = x.unsqueeze(0)
        embedding = self.dropout(self.embedding(x))
        output, (hidden,cell) = self.rnn(embedding,(hidden,cell))
        predicts = self.fc(output)
        predicts = predicts.squeeze(0)

        return predicts, hidden, cell
        

In [None]:
class Seq2seq(nn.Module):
    def __init__(self,encoder,decoder):
        super(Seq2seq,self).__init__()
        self.encoder = encoder
        self.decoder = decoder
    def forward(self,data, target, teacher_forcing_ratio = 0.5):
        batch_size = data.shape[1]
        len_vocab = len(eng_field.vocab)
        len_target = target.shape[0]
        
        outputs = torch.zeros(len_target,batch_size,len_vocab).to(device)
        
        hidden,cell = self.encoder(data)
        start_token = target[0]
        for i in range(1,len_target):
            output, hidden,cell = self.decoder(start_token,hidden,cell)
            outputs[i] = output
            teacher_force = random.random() < teacher_forcing_ratio
            max1 = output.argmax(1)
            start_token = target[i] if teacher_force else max1
        return outputs
            

In [None]:
ger_len_vocab = len(ger_field.vocab)
eng_len_vocab = len(eng_field.vocab)
embedding_size = 300 
hidden_size = 1024
num_layer = 4
dropout_encoder = 0.5
dropout_decoder = 0.5
num_epoch = 10
encoder = Encoder(ger_len_vocab,embedding_size,hidden_size,num_layer,dropout_encoder).to(device)
decoder = Decoder(eng_len_vocab,embedding_size,hidden_size,num_layer,eng_len_vocab,dropout_decoder).to(device)
pad_indx = eng_field.vocab.stoi["<pad>"]
loss = nn.CrossEntropyLoss(ignore_index=pad_indx)
model = Seq2seq(encoder,decoder).to(device)
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)


In [None]:
from torch.utils.tensorboard import SummaryWriter 
writer = SummaryWriter(f"runs/loss_plot")
step = 0

In [None]:
# train 

for epoch in range(num_epoch):
  for batch in train_iter:
    data = (batch.src).to(device)
    target = (batch.trg).to(device)
    outputs = model(data,target)
    outputs = outputs[1:].view(-1,outputs.shape[2])
    target = target[1:].reshape(-1)
    optimizer.zero_grad()
    los  = loss(outputs,target)
    los.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
    optimizer.step()
    writer.add_scalar("Training loss", los, global_step=step)
    step+=1
  print(f"epoch:{epoch} and loss : {los.item()}")

epoch:0 and loss : 4.341251373291016
epoch:1 and loss : 4.105330944061279
epoch:2 and loss : 3.739529609680176
epoch:3 and loss : 3.744792938232422
epoch:4 and loss : 3.7222909927368164
epoch:5 and loss : 3.268319845199585
epoch:6 and loss : 3.215714454650879
epoch:7 and loss : 3.1341071128845215
epoch:8 and loss : 3.794325828552246
epoch:9 and loss : 3.330305337905884


In [None]:
%cd /content/drive/MyDrive/Pytorch_model/ 

/content/drive/MyDrive/Pytorch_model


In [None]:
torch.save(model.state_dict(),"seq2seq.pth")

In [None]:
from torchtext.data.metrics import bleu_score

In [None]:
def translate_sentence(model, sentence, german, english, device, max_length=50):
    # print(sentence)

    # sys.exit()

    # Load german tokenizer
    spacy_ger = spacy.load("de_core_news_sm")

    # Create tokens using spacy and everything in lower case (which is what our vocab is)
    if type(sentence) == str:
        tokens = [token.text.lower() for token in spacy_ger(sentence)]
    else:
        tokens = [token.lower() for token in sentence]

    # print(tokens)

    # sys.exit()
    # Add <SOS> and <EOS> in beginning and end respectively
    tokens.insert(0, german.init_token)
    tokens.append(german.eos_token)

    # Go through each german token and convert to an index
    text_to_indices = [german.vocab.stoi[token] for token in tokens]

    # Convert to Tensor
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)

    # Build encoder hidden, cell state
    with torch.no_grad():
        hidden, cell = model.encoder(sentence_tensor)

    outputs = [english.vocab.stoi["<sos>"]]

    for _ in range(max_length):
        previous_word = torch.LongTensor([outputs[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(previous_word, hidden, cell)
            best_guess = output.argmax(1).item()

        outputs.append(best_guess)

        # Model predicts it's the end of the sentence
        if output.argmax(1).item() == english.vocab.stoi["<eos>"]:
            break

    translated_sentence = [english.vocab.itos[idx] for idx in outputs]

    # remove start token
    return translated_sentence[1:]

In [None]:
def bleu(data, model, german, english, device):
    targets = []
    outputs = []

    for example in data:
        src = vars(example)["src"]
        trg = vars(example)["trg"]

        prediction = translate_sentence(model, src, german, english, device)
        prediction = prediction[:-1]  # remove <eos> token

        targets.append([trg])
        outputs.append(prediction)

    return bleu_score(outputs, targets)

In [None]:
score = bleu(test[1:100], model, ger_field, eng_field, device)
print(f"Bleu score {score*100:.2f}")

Bleu score 8.54
