In [6]:
from io import open
import random
import time
import math
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.utils.data as Data
from torch import optim
import torch.nn.functional as F
from helper import *

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset

import pandas as pd
import numpy as np
import random
import math

import seaborn as sns
import matplotlib.pyplot as plt

from time import time

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [7]:
data = pd.read_csv('../input/arabic-poem/aroud_dataset.csv')

In [8]:
use_cuda = torch.cuda.is_available()
SOS_token = 0
EOS_token = 1
PAD_token = 2
MAX_LENGTH = 15
teacher_forcing_ratio = 0.5
hidden_size = 29 #a-z+SOS+EOS+PAD
batch_size = 128
epochs = 100

In [9]:
special = ['َ', 'ُ', 'ِ', 'ً', 'ٌ', 'ٍ', 'َّ', 'ُّ', 'ِّ','ًّ', 'ٌّ', 'ٍّ', 'ْ','ّ']
vocab = ['SOS', 'EOS', 'PAD', ' ','ء', 'آ', 'أ', 'ؤ', 'إ', 'ئ', 'ا', 'ب', 'ة', 'ت', 'ث', 'ج', 'ح', 'خ', 'د', 'ذ', 'ر', 'ز', 'س', 'ش', 'ص', 'ض', 'ط', 'ظ', 'ع', 'غ', 'ف', 'ق', 'ك', 'ل', 'م', 'ن', 'ه', 'و', 'ى', 'ي'] 
# wazn_vocab = ['SOS', 'EOS', ' ', 'ا', 'ت', 'س', 'ع','ف', 'ل', 'م', 'ن', 'و', 'ي'] + ['َ', 'ُ', 'ِ', 'ً', 'ٌ', 'ٍ', 'ْ','ّ']

wazn_vocab = ['SOS', 'EOS', 'PAD', 'مُتْفَاعِلُنْ', 'مُفَاعَلْتُنْ', 'فَعُولُنْ', 'مُسْتَفْعِلُنْ', 'مَفَاعِيلُنْ', 'فَاعِلُنْ', 'مُفَاعَلَتُنْ', 'مَفْعُولُنْ', 'مَفْعُولَاتُ', 'فَاعِلَاتُنْ', 'فِعْلُنْ', 'فَعِلَاتُنْ', 'فَعِلُنْ', 'مُسْتَفْعِلَانْ', 'فَعُولْ', 'مَفَاعِلُنْ', 'مُتَعِلُنْ', 'فَعِلَاتُ', 'فَاعِلَاتُ', 'فَعُولُ', 'مُتَفْعِلُنْ', 'مُسْتَعِلُنْ', 'مُتَفَاعِلُنْ', 'مَفَاعِيلُ', 'مَفْعُلَاتُ']

In [10]:
class MyDataset:
    def __init__(self,data):
        self.data = data
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, item):
        line = self.data.iloc[item]
        
        x = line['line']
        y = line['wazn']
        
        x = get_raw_sentence(x)
        
        try:
            x_token = [vocab.index(c) for c in x]
            y_token = [wazn_vocab.index(c) for c in y.split(' ')]
        except:
            return self.__getitem__(0)
        
        x_tensor = torch.tensor(x_token, dtype=torch.int64)
        y_tensor = torch.tensor(y_token, dtype=torch.int64)        
        
        return {
            'x':x_tensor,
            'y':y_tensor,
        }

In [11]:
def get_raw_sentence(sentence):
    """
    Get a raw sentence without tachkil
    """
    
    sentence = list(sentence)

    S = []

    i, j = 0, 1
    L = len(sentence)

    while i < L - 1:
        if sentence[i] == ' ':
            S.append(' ')
            i += 1
            j += 1

        else:
            S.append(sentence[i])
            if sentence[j] not in special:
                i += 1
                j += 1

            elif sentence[j] != 'ّ':
                i += 2
                j += 2

            else:  # sentence[j] = 'ّ'
                if j == L-1 or sentence[j+1] not in special:
                    i += 2
                    j += 2
                else:
                    i += 3
                    j += 3

    if i < L and not sentence[-1] in special:
        S.append(sentence[-1])

    return S



In [12]:
def collate_fn(batch, x_maxlen=50, y_maxlen=4):
    inputs, targets = [], []
    
    for line in batch:
        inputs.append(line['x'])
        targets.append(line['y'])

    
    T = inputs[0]
    inputs[0]  = nn.ConstantPad1d((0, x_maxlen - T.shape[0]), PAD_token)(T)
    T = targets[0]
    targets[0] = nn.ConstantPad1d((0, y_maxlen - T.shape[0]), PAD_token)(T)
        
    inputs    = pad_sequence(inputs,  padding_value=PAD_token , batch_first=True)
    targets   = pad_sequence(targets, padding_value=PAD_token , batch_first=True)
    
    return inputs, targets


def split_data(data, s=0.7):
    import random
    ids = list(range(len(data)))
    random.shuffle(ids)
    x = int(len(ids)*s)
    return data.iloc[ids[:x]], data.iloc[ids[x:]]

# Model:

In [13]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru1 = nn.GRU(hidden_size, hidden_size, bidirectional=True, batch_first=True)
        self.gru2 = nn.GRU(hidden_size*2, hidden_size, bidirectional=True, batch_first=True)
        self.gru3 = nn.GRU(hidden_size*2, hidden_size, bidirectional=True, batch_first=True)
        
    def forward(self, input, hidden1, hidden2, hidden3):
        embedded = self.embedding(input)
        output = embedded     
        
        output, hidden1 = self.gru1(output, hidden1)
        output, hidden2 = self.gru2(output, hidden2)
        output, hidden3 = self.gru3(output, hidden3)
        
        
        return output, hidden1, hidden2, hidden3

    def initHidden(self, bs):
        return (torch.zeros(2, bs, self.hidden_size, device=device),
                torch.zeros(2, bs, self.hidden_size, device=device),
                   torch.zeros(2, bs, self.hidden_size, device=device))

In [14]:
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru1 = nn.GRU(hidden_size, hidden_size, bidirectional=True, batch_first=True)
        self.gru2 = nn.GRU(hidden_size*2, hidden_size, bidirectional=True, batch_first=True)
        self.gru3 = nn.GRU(hidden_size*2, hidden_size, bidirectional=True, batch_first=True)
        
        self.out = nn.Linear(hidden_size*2, output_size)
        self.softmax = nn.LogSoftmax(dim=-1)

    def forward(self, input, hidden1, hidden2, hidden3):
        output = self.embedding(input)
        output = F.relu(output)
        
        output, hidden1 = self.gru1(output, hidden1)
        output, hidden2 = self.gru2(output, hidden2)
        output, hidden3 = self.gru3(output, hidden3)
                
#         output = self.softmax(self.out(output))
        output = self.out(output)
        return output, hidden1, hidden2, hidden3

    def initHidden(self):
        return (torch.zeros(2, bs, self.hidden_size, device=device),
                torch.zeros(2, bs, self.hidden_size, device=device),
                torch.zeros(2, bs, self.hidden_size, device=device))

In [89]:
train_data, val_data = split_data(data)

train_dataset = MyDataset(train_data)
val_dataset   = MyDataset(val_data)

f'There are {len(train_dataset) :,} samples for training, and {len(val_dataset) :,} samples for validation testing'

In [90]:
TRAIN_BATCH_SIZE = 64

train_dataloader = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=TRAIN_BATCH_SIZE,
    shuffle=True, 
    collate_fn=collate_fn, 
    drop_last=True,
#     prefetch_factor=1, 
#     num_workers=1
    )

val_dataloader = torch.utils.data.DataLoader(
    dataset=val_dataset,
    batch_size=256 * 8 * 2,
    shuffle=False, 
    collate_fn=collate_fn, 
    drop_last=True,
#     prefetch_factor=1, 
#     num_workers=1
    )

In [91]:
encoder = EncoderRNN(len(vocab), 256).to(device)
decoder = DecoderRNN(256, len(wazn_vocab)).to(device)

learning_rate = 0.0007

encoder_optimizer = torch.optim.Adam(encoder.parameters(), lr=learning_rate)
decoder_optimizer = torch.optim.Adam(decoder.parameters(), lr=learning_rate)

criterion = nn.CrossEntropyLoss()

In [92]:
teacher_forcing_ratio = 0.5


def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=50, bs=64):
    
    encoder.train()
    decoder.train()

    
    encoder_hidden1, encoder_hidden2, encoder_hidden3 = encoder.initHidden(bs)

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = max_length
    target_length = 4

    encoder_outputs = torch.zeros(max_length, bs, encoder.hidden_size * 2, device=device)

    loss = 0
    
        
    for ei in range(input_length):
        encoder_output, encoder_hidden1, encoder_hidden2, encoder_hidden3 = encoder(
            input_tensor[:, :, ei], encoder_hidden1, encoder_hidden2, encoder_hidden3)
        
        encoder_outputs[ei] = encoder_output[:, 0]

    
    
#     decoder_input = torch.tensor([[SOS_token]], device=device)
    decoder_input = torch.full((bs,1), SOS_token, device=device)
    decoder_hidden1, decoder_hidden2, decoder_hidden3 = encoder_hidden1, encoder_hidden2, encoder_hidden3
    
    
#     print(decoder_input.shape, decoder_hidden.shape)


    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden1, decoder_hidden2, decoder_hidden3 = decoder(
                decoder_input, decoder_hidden1, decoder_hidden2, decoder_hidden3)
            
            loss += criterion(decoder_output.squeeze(1), target_tensor[:,di])
            decoder_input = target_tensor[:,di:di+1]  # Teacher forcing
        
    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden1, decoder_hidden2, decoder_hidden3 = decoder(
                decoder_input, decoder_hidden1, decoder_hidden2, decoder_hidden3)
            
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  # detach from history as input
                        
            loss += criterion(decoder_output.squeeze(1), target_tensor[:,di])
            
            decoder_input = decoder_input.unsqueeze(-1)

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

In [102]:
def eval_accuracy(bs):

    max_length= 50

    sums = 0
    vs = 0

    for i, (input_tensor, target_tensor) in enumerate(val_dataloader):
        
        

        encoder.eval()
        decoder.eval()

        input_tensor = input_tensor.unsqueeze(1).to(device)


        encoder_hidden1, encoder_hidden2, encoder_hidden3 = encoder.initHidden(bs)

        input_length = max_length
        target_length = 4

        encoder_outputs = torch.zeros(max_length, bs, encoder.hidden_size * 2, device=device)

        loss = 0
        
#         print(input_tensor.shape)

        for ei in range(input_length):
            encoder_output, encoder_hidden1, encoder_hidden2, encoder_hidden3 = encoder(
                input_tensor[:, :, ei], encoder_hidden1, encoder_hidden2, encoder_hidden3)

            encoder_outputs[ei] = encoder_output[:, 0]



    #     decoder_input = torch.tensor([[SOS_token]], device=device)
        decoder_input = torch.full((bs,1), SOS_token, device=device)
        decoder_hidden1, decoder_hidden2, decoder_hidden3 = encoder_hidden1, encoder_hidden2, encoder_hidden3

        for di in range(target_length):
            decoder_output, decoder_hidden1, decoder_hidden2, decoder_hidden3 = decoder(
                decoder_input, decoder_hidden1, decoder_hidden2, decoder_hidden3)

            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  # detach from history as input

    #         print(torch.argmax(decoder_output, dim=-1).shape)
    #         print(target_tensor[:,di].shape)

            m = torch.argmax(decoder_output, dim=-1).cpu().view(-1)
            v = target_tensor[:,di]

    #         print(len(v))
            m = m[v != PAD_token]
            v = v[v != PAD_token]
            
            try:
                sums += sum(m == v).item()
                vs += len(v)
            except:
                pass


            decoder_input = decoder_input.unsqueeze(-1)
        break
    return sums/vs

In [104]:
best_acc = 0

In [105]:
print_loss_total = 0  # Reset every print_every
print_every = 200

for epoch in range(5,40):

    
    start = time()
    for i, (input_tensor, target_tensor) in enumerate(train_dataloader):
        
        input_tensor  = input_tensor.to(device)
        input_tensor = input_tensor.unsqueeze(1)
        
        target_tensor = target_tensor.to(device)
#         target_tensor = target_tensor.unsqueeze(1)
        
        
        loss = train(input_tensor, target_tensor, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, criterion)
        
        print_loss_total += loss

        if (i+1) % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print(f' EPOCH:{epoch}    STEP:{i}/{len(train_dataloader)}    AVG LOSS:{print_loss_avg}    TIME: {time()-start:.2f}')
            start = time()
    
    acc = eval_accuracy(256 * 8 * 2) 
    print(f'VAL ACCCCCCC: {acc}')

    if acc > best_acc:
        print('BEST MODEL YAY')
        best_acc = acc
        torch.save(encoder.state_dict(), f'encoder512.pt')
        torch.save(decoder.state_dict(), f'decoder512.pt')
        
    
    print('\n\n')

In [98]:
encoder.load_state_dict(torch.load('encoder512.pt', map_location=device))
decoder.load_state_dict(torch.load('decoder512.pt', map_location=device))

In [None]:
line = val_data.sample(1)
line

In [None]:
s = 'أَلْفَاظُهُنَّ مُؤَنّثا'

max_length= 50
bs = 1

x = get_raw_sentence(s)
        
x_token = [vocab.index(c) for c in x]
        
x_tensor = torch.tensor(x_token, dtype=torch.int64)

input_tensor = nn.ConstantPad1d((0, 50 - x_tensor.shape[0]), 0)(x_tensor).unsqueeze(0).to(device)

input_tensor = input_tensor.unsqueeze(1)


encoder.eval()
decoder.eval()

encoder_hidden1, encoder_hidden2, encoder_hidden3 = encoder.initHidden(bs)

input_length = max_length
target_length = 4

encoder_outputs = torch.zeros(max_length, bs, encoder.hidden_size * 2, device=device)

loss = 0


for ei in range(input_length):
    encoder_output, encoder_hidden1, encoder_hidden2, encoder_hidden3 = encoder(
        input_tensor[:, :, ei], encoder_hidden1, encoder_hidden2, encoder_hidden3)

    encoder_outputs[ei] = encoder_output[:, 0]



#     decoder_input = torch.tensor([[SOS_token]], device=device)
decoder_input = torch.full((bs,1), SOS_token, device=device)
decoder_hidden1, decoder_hidden2, decoder_hidden3 = encoder_hidden1, encoder_hidden2, encoder_hidden3

res = []
for di in range(target_length):
#     print(decoder_input.shape)
    
    decoder_output, decoder_hidden1, decoder_hidden2, decoder_hidden3 = decoder(
        decoder_input, decoder_hidden1, decoder_hidden2, decoder_hidden3)

    topv, topi = decoder_output.topk(1)
    decoder_input = topi.squeeze().detach()  # detach from history as input
    t = decoder_input.item()
    decoder_input = decoder_input.view(1,-1)
    
    print(t)
    res.append(t)

res = [wazn_vocab[c] for c in res]
print(' '.join(res))

In [None]:
مُتْفَاعِلُنْ مُتَفَاعِلُنْ مُتْفَاعِلُنْ