In [1]:
# import necessary libraries for pytorch to train a sequence-to-sequence model using LSTM cells to generate poems of Ferdousi
# the dataset is in ferdousi.txt which is in persian
# the model is trained on a GPU

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import nltk
nltk.download('punkt')
from nltk.tokenize import word_tokenize
from nltk.tokenize import sent_tokenize
# # for persian
# !pip install hazm -q
# from hazm import *
import string
from collections import Counter
import random

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [2]:
!git clone https://github.com/amnghd/Persian_poems_corpus.git
!cp Persian_poems_corpus/original/ferdousi.txt .
!rm -rf Persian_poems_corpus

Cloning into 'Persian_poems_corpus'...
remote: Enumerating objects: 159, done.[K
remote: Total 159 (delta 0), reused 0 (delta 0), pack-reused 159[K
Receiving objects: 100% (159/159), 45.21 MiB | 15.55 MiB/s, done.
Resolving deltas: 100% (3/3), done.
Updating files: 100% (148/148), done.


In [3]:
class FerdousiDataset(torch.utils.data.Dataset):
    def __init__(self, poem_path):
        self.poem_path = poem_path
        self.load_poem()

    def __len__(self):
        return len(self.poem)-1

    def __getitem__(self, idx):
        return self.poem[idx], self.poem[idx+1]

    def load_poem(self):
        with open(self.poem_path, 'r', encoding='utf-8') as f:
            poem = [line.strip() for line in f.readlines()]
        poem = poem[2:]
        poem = poem[:-1] if len(poem) % 2 == 1 else poem
        poem = [[poem[i], poem[i+1]] for i in range(0, len(poem), 2)]
        poem = [mesra[0] + ' ' + mesra[1] for mesra in poem]
        poem = [word_tokenize(line) for line in poem]
        punctuations = string.punctuation + '«»،؛؟'
        poem = [[word for word in line if word not in punctuations] for line in poem]
        poem = [line for line in poem if len(line) > 0]
        poem = [[word for word in line if len(word) > 1] for line in poem]
        self.max_len = max([len(line) for line in poem])
        poem = [line + ['<pad>'] * (self.max_len - len(line)) for line in poem]
        poem = [['<sos>'] + line + ['<eos>'] for line in poem]
        words = Counter([word for line in poem for word in line])
        self.word2idx = {word: idx for idx, word in enumerate(words)}
        self.idx2word = {idx: word for idx, word in enumerate(words)}
        poem = [[self.word2idx[word] for word in line] for line in poem]
        self.poem = torch.tensor(poem).long()

# create a dataset object
dataset = FerdousiDataset('ferdousi.txt')
# split the dataset into train and test
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
# create a dataloader for train and test
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p_drop=0.2):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, batch_first=True)
        self.dropout = nn.Dropout(p_drop)

    def forward(self, x, hidden=None, cell=None):
        # x.shape = (batch_size, seq_len)
        embedding = self.dropout(self.embedding(x))
        # embedding.shape = (batch_size, seq_len, embedding_size)
        if hidden is None and cell is None:
            hidden = torch.zeros((self.num_layers, x.shape[0], self.hidden_size), device=x.device)
            cell = torch.zeros((self.num_layers, x.shape[0], self.hidden_size), device=x.device)
        outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
        # outputs.shape = (batch_size, seq_len, hidden_size)
        # hidden.shape = (num_layers, batch_size, hidden_size)
        # cell.shape = (num_layers, batch_size, hidden_size)
        return hidden, cell


# hyperparameters
input_size_encoder = len(dataset.word2idx)
encoder_embedding_size = 50
hidden_size = 100
num_layers = 1
# define the encoder
encoder = Encoder(input_size_encoder, encoder_embedding_size, hidden_size, num_layers).to(device)


In [8]:
# define the decoder class and use the encoder.embedding as the embedding layer
class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers, p_drop=0.2):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = encoder.embedding
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(p_drop)

    def forward(self, x, hidden, cell):
        # x.shape = (batch_size, 1)
        x = x.unsqueeze(1)
        # x.shape = (batch_size, 1, 1)
        embedding = self.dropout(self.embedding(x))
        # embedding.shape = (batch_size, 1, embedding_size)
        outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
        # outputs.shape = (batch_size, 1, hidden_size)
        # hidden.shape = (num_layers, batch_size, hidden_size)
        # cell.shape = (num_layers, batch_size, hidden_size)
        predictions = self.fc(outputs)
        # predictions.shape = (batch_size, 1, output_size)
        predictions = predictions.squeeze(1)
        # predictions.shape = (batch_size, output_size)
        return predictions, hidden, cell

# hyperparameters
input_size_decoder = len(dataset.word2idx)
output_size = len(dataset.word2idx)
decoder_embedding_size = 50
hidden_size = 100
num_layers = 1
# define the decoder
decoder = Decoder(input_size_decoder, decoder_embedding_size, hidden_size, output_size, num_layers).to(device)

In [9]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        # source.shape = (batch_size, source_seq_len)
        # target.shape = (batch_size, target_seq_len)
        batch_size = source.shape[0]
        target_len = target.shape[1]
        target_vocab_size = len(dataset.word2idx)
        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(device)
        hidden, cell = self.encoder(source)
        # hidden.shape = (num_layers, batch_size, hidden_size)
        # cell.shape = (num_layers, batch_size, hidden_size)
        x = target[:, 0]
        # x.shape = (batch_size)
        for t in range(1, target_len):
            output, hidden, cell = self.decoder(x, hidden, cell)
            # output.shape = (batch_size, target_vocab_size)
            outputs[:, t] = output
            best_guess = output.argmax(1)
            x = target[:, t] if random.random() < teacher_forcing_ratio else best_guess
        return outputs

In [10]:
# hyperparameters
num_epochs = 100
learning_rate = 1e-3
batch_size = 32


In [11]:
# initialize the network
encoder_net = Encoder(input_size_encoder, encoder_embedding_size, hidden_size, num_layers).to(device)
decoder_net = Decoder(input_size_decoder, decoder_embedding_size, hidden_size, output_size, num_layers).to(device)
model = Seq2Seq(encoder_net, decoder_net).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=dataset.word2idx['<pad>'])
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [12]:
# define a function that takes a tensor of indices and returns a string and removes pad and end and start tokens
def tensor2string(tensor):
    # tensor.shape = (seq_len)
    words = [dataset.idx2word[idx.item()] for idx in tensor]
    words = [word for word in words if word not in ['<pad>', '<sos>', '<eos>']]
    return ' '.join(words)

# define a function that generates a verse
def generate_verse(model, source, device, max_len=20):
    source = source.unsqueeze(0)
    model.eval()
    # dont forget to move to device
    with torch.no_grad():
        hidden, cell = model.encoder(source.to(device))
        # hidden.shape = (num_layers, batch_size, hidden_size)
        # cell.shape = (num_layers, batch_size, hidden_size)
        x = torch.tensor([dataset.word2idx['<sos>']]).to(device)
        outputs = []
        for t in range(max_len):
            output, hidden, cell = model.decoder(x, hidden, cell)
            # output.shape = (1, output_size)
            best_guess = output.argmax(1)
            outputs.append(best_guess.item())
            x = best_guess
            if best_guess.item() == dataset.word2idx['<eos>']:
                break
    return tensor2string(torch.tensor(outputs))

def beam_search(model, source, device, k=3, max_len=20):
    model.eval()
    with torch.no_grad():
        hidden, cell = model.encoder(source.unsqueeze(0).to(device))
        beam = [(0.0, [dataset.word2idx['<sos>']])]
        for t in range(max_len):
            candidates = []
            for prob, seq in beam:
                x = torch.tensor([seq[-1]]).to(device)
                output, hidden, cell = model.decoder(x, hidden, cell)
                top_k_values, top_k_indices = torch.topk(output.squeeze(), k)
                for i in range(k):
                    next_prob = top_k_values[i].item()
                    next_word = top_k_indices[i].item()
                    candidate = (prob + next_prob, seq + [next_word])
                    candidates.append(candidate)
            beam = sorted(candidates, reverse=True, key=lambda x: x[0])[:k]
            beam = [(prob, seq) for prob, seq in beam if seq[-1] != dataset.word2idx['<eos>']]
            if not beam:
                break
        return tensor2string(torch.tensor(beam[0][1]))

In [None]:
# select a random verse as a string

train_losses = []
# train the network and in each epoch print the predicted verse
for epoch in range(num_epochs):
    # print the predicted verse
    idx = random.randint(0, len(train_dataset))
    verse = train_dataset[idx][0].to(device)
    print(generate_verse(model, verse, device=device))
    # train
    model.train()
    for batch_idx, (source, target) in enumerate(train_loader):
        # source.shape = (batch_size, source_seq_len)
        # target.shape = (batch_size, target_seq_len)
        source = source.to(device)
        target = target.to(device)
        outputs = model(source, target)
        # outputs.shape = (batch_size, target_seq_len, output_size)
        outputs = outputs[1:].reshape(-1, output_size)
        # outputs.shape = ((target_seq_len - 1) * batch_size, output_size)
        target = target[1:].reshape(-1)
        # target.shape = ((target_seq_len - 1) * batch_size)
        loss = criterion(outputs, target)
        train_losses.append(loss.item())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
    
    

پذرفتی ناف نامت گهواره آیم برکوهه بیغوله خوردنش منست پیکرنگار بمردی روسپهبد آفریدون دوشیزه برتافتی ناسالخورده یلانت بندازدا پالیزبان استش
Epoch [1/100], Loss: 6.7174
چو گفت که از به که که از بر را به به به به به
Epoch [2/100], Loss: 6.2534
به گفت که از به به به به به به به به به به به
Epoch [3/100], Loss: 6.5738
چو گفت که به به که که از از از از تو را نیست نیست
Epoch [4/100], Loss: 6.6103
چو گفت از به به به به به بر به به به بود بود بود
Epoch [5/100], Loss: 6.8256
چو گفت به به به که که از بر به به به راه جوی به راه
Epoch [6/100], Loss: 6.1467
چو گفت بر به به به به بر بر بر بر بود بود بود
Epoch [7/100], Loss: 5.8446
چو گفت از به به که به از بر بر به راه جوی به راه


In [None]:
# define another encoder with Bidirectional GRU
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p_drop=0.2):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.gru = nn.GRU(embedding_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(p_drop)

    def forward(self, x):
        # x.shape = (batch_size, seq_len)
        embedding = self.dropout(self.embedding(x))
        # embedding.shape = (batch_size, seq_len, embedding_size)
        outputs, hidden = self.gru(embedding)
        # outputs.shape = (batch_size, seq_len, hidden_size * 2)
        # hidden.shape = (num_layers * 2, batch_size, hidden_size)
        return hidden

# hyperparameters
input_size_encoder = len(dataset.word2idx)
encoder_embedding_size = 50
hidden_size = 100
num_layers = 1
# define the encoder
encoder = Encoder(input_size_encoder, encoder_embedding_size, hidden_size, num_layers).to(device)

In [None]:
# define decoder with bidirectional GRU
class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers, p_drop=0.2):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = encoder.embedding
        self.rnn = nn.GRU(embedding_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, output_size)  # since bidirectional, so multiply by 2
        self.dropout = nn.Dropout(p_drop)

    def forward(self, x, hidden):
        # x.shape = (batch_size, 1)
        x = x.unsqueeze(1)
        # x.shape = (batch_size, 1, 1)
        embedding = self.dropout(self.embedding(x))
        # embedding.shape = (batch_size, 1, embedding_size)
        outputs, hidden = self.rnn(embedding, hidden)
        # outputs.shape = (batch_size, 1, hidden_size * 2)  # since bidirectional, so hidden_size * 2
        # hidden.shape = (num_layers * 2, batch_size, hidden_size)  # since bidirectional, so num_layers * 2
        predictions = self.fc(outputs)
        # predictions.shape = (batch_size, 1, output_size)
        predictions = predictions.squeeze(1)
        # predictions.shape = (batch_size, output_size)
        return predictions, hidden

# hyperparameters
input_size_decoder = len(dataset.word2idx)
output_size = len(dataset.word2idx)
decoder_embedding_size = 50
hidden_size = 100
num_layers = 1
# define the decoder
decoder = Decoder(input_size_decoder, decoder_embedding_size, hidden_size, output_size, num_layers).to(device)

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        # source.shape = (batch_size, source_seq_len)
        # target.shape = (batch_size, target_seq_len)
        batch_size = source.shape[0]
        target_len = target.shape[1]
        target_vocab_size = len(dataset.word2idx)
        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(device)
        hidden, _ = self.encoder(source)
        # hidden.shape = (batch_size, 2*hidden_size)
        x = target[:, 0]
        # x.shape = (batch_size)
        for t in range(1, target_len):
            output, hidden = self.decoder(x, hidden)
            # output.shape = (batch_size, target_vocab_size)
            outputs[:, t] = output
            best_guess = output.argmax(1)
            x = target[:, t] if random.random() < teacher_forcing_ratio else best_guess
        return outputs


In [None]:
model = Seq2Seq(encoder, decoder).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=dataset.word2idx['<PAD>'])

In [None]:
train_losses = []
train_accuracies = []
test_losses = []
test_accuracies = []

for epoch in range(num_epochs):
    # print the predicted verse
    idx = random.randint(0, len(train_dataset))
    verse = train_dataset[idx][0].to(device)
    print(generate_verse(model, verse, device=device))
    
    # train the model
    model.train()
    epoch_loss = 0
    epoch_correct = 0
    for batch_idx, (source, target) in enumerate(train_loader):
        # source.shape = (batch_size, source_seq_len)
        # target.shape = (batch_size, target_seq_len)
        source = source.to(device)
        target = target.to(device)
        outputs = model(source, target)
        # outputs.shape = (batch_size, target_seq_len, output_size)
        outputs = outputs[1:].reshape(-1, output_size)
        # outputs.shape = ((target_seq_len - 1) * batch_size, output_size)
        target = target[1:].reshape(-1)
        # target.shape = ((target_seq_len - 1) * batch_size)
        loss = criterion(outputs, target)
        epoch_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # calculate train accuracy
        top_p, top_class = outputs.topk(1, dim=1)
        equals = top_class == target.view(*top_class.shape)
        accuracy = torch.mean(equals.type(torch.FloatTensor))
        epoch_correct += accuracy.item()
        
    train_loss = epoch_loss / len(train_loader)
    train_losses.append(train_loss)
    train_acc = epoch_correct / len(train_loader)
    train_accuracies.append(train_acc)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}')
    
    # evaluate the model
    model.eval()
    epoch_loss = 0
    epoch_correct = 0
    with torch.no_grad():
        for source, target in test_loader:
            source = source.to(device)
            target = target.to(device)
            outputs = model(source, target, teacher_forcing_ratio=0)
            # outputs.shape = (batch_size, target_seq_len, output_size)
            outputs = outputs[1:].reshape(-1, output_size)
            # outputs.shape = ((target_seq_len - 1) * batch_size, output_size)
            target = target[1:].reshape(-1)
            # target.shape = ((target_seq_len - 1) * batch_size)
            loss = criterion(outputs, target)
            epoch_loss += loss.item()

            # calculate test accuracy
            top_p, top_class = outputs.topk(1, dim=1)
            equals = top_class == target.view(*top_class.shape)
            accuracy = torch.mean(equals.type(torch.FloatTensor))
            epoch_correct += accuracy.item()

    test_loss = epoch_loss / len(test_loader)
    test_losses.append(test_loss)
    test_acc = epoch_correct / len(test_loader)
    test_accuracies.append(test_acc)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')
    
