<a href="https://colab.research.google.com/github/taravatp/Text_Style_Transfer/blob/main/Seq2Seq_TST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

In [None]:
!pip install -qU hazm

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.7/316.7 KB[0m [31m24.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m233.6/233.6 KB[0m [31m28.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m72.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for nltk (setup.py) ... [?25l[?25hdone
  Building wheel for libwapiti (setup.py) ... [?25l[?25hdone


In [None]:
import pandas as pd
import random

import hazm
from hazm import word_tokenize

import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'device: {device}')

train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

device: cuda:0
CUDA is available!  Training on GPU ...


# Data Cleaning

In [None]:
import hazm
import re

DATASET_PATH = '/content/drive/MyDrive/text_style_transfer/dataset.xlsx'
dataset = pd.read_excel(DATASET_PATH)
normalizer = hazm.Normalizer()

def cleaning(text):
  text = text.strip()
  text = normalizer.normalize(text) #normalizing
  text = re.sub(r"([.!?])", r" \1", text) # inserting a space between words and punctuations
  text = re.sub("\s+", " ", text) #removing redundant white spaces
  return text

def truncate(sentence,max_len=20):
  if len(word_tokenize(sentence)) < max_len:
    return sentence
  else:
    return None

dataset['formalForm'] = dataset['formalForm'].apply(cleaning)
dataset['formalForm'] = dataset['formalForm'].apply(truncate)

dataset['inFormalForm'] = dataset['inFormalForm'].apply(cleaning)
dataset['inFormalForm'] = dataset['inFormalForm'].apply(truncate)

dataset = dataset.dropna()
dataset = dataset.reset_index()

In [None]:
# saving the cleaned data
writePath = '/content/drive/MyDrive/text_style_transfer/CleanedDataset_v2.csv'
dataset.to_csv(writePath, encoding='utf-8', index=False)

# Creating language style objects

In [None]:
SOS_token = 0
EOS_token = 1

DATASET_PATH = '/content/drive/MyDrive/text_style_transfer/CleanedDataset_v2.csv'
dataset = pd.read_csv(DATASET_PATH)

In [None]:
class LangStyle:
  def __init__(self):
    self.word2index = {}
    self.index2word = {}
    self.word2count = {0: "SOS", 1: "EOS"}
    self.n_words = 2

  def add_setence_to_lang(self,sentence):
    for token in word_tokenize(sentence):
      if token not in self.word2index:
        self.word2index[token] = self.n_words
        self.word2count[token] = 1
        self.index2word[self.n_words] = token
        self.n_words +=1
      else:
        self.word2count[token] += 1

In [None]:
formalStyle = LangStyle()
informalStyle = LangStyle()

for index, row in dataset.iterrows():
  formalStyle.add_setence_to_lang(row['formalForm'])
  informalStyle.add_setence_to_lang(row['inFormalForm'])

In [None]:
print('number of words in formal style:',formalStyle.n_words)
print('number of words in informal style:',informalStyle.n_words)

number of words in formal style: 28111
number of words in informal style: 39847


# Seq2seq Model

In [None]:
class EncoderRNN(nn.Module):

    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()

        # Input size is the size of the dictionary
        # hidden size is the size of each embedding vector
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size,5)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output = embedded
        output, hidden = self.gru(output, hidden)
        return output, hidden

    def initHidden(self):
        return torch.zeros(5, 1, self.hidden_size, device=device)

In [None]:
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p, max_length):
        super(DecoderRNN, self).__init__()

        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size,5)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_outputs):

        embedded = self.embedding(input).view(1, 1, -1) #????
        embedded = self.dropout(embedded) #torch.Size([1, 1, 256])

        attention_input = torch.cat((embedded[0], hidden[0]), 1) #[1,512]
        attention_output = self.attn(attention_input) #[1,20]
        attn_weights = F.softmax(attention_output, dim=1) #[1,20]
        mat1 = attn_weights.unsqueeze(0)
        mat2 = encoder_outputs
        attn_applied = torch.bmm(mat1,mat2) # [1,1,256]

        output = torch.cat((embedded[0], attn_applied[0]), 1) #[1,512]
        output = self.attn_combine(output).unsqueeze(0) #[1,1,256]

        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.out(output[0])
        output = F.log_softmax(output, dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        return torch.zeros(5, 1, self.hidden_size, device=device)

# Creating DataLoaders

In [None]:
from torch.utils.data import Dataset,DataLoader
import pandas as pd

class TSTData(Dataset):
  def __init__(self,dataset_path,informalStyle,formalStyle,flag):
    super(TSTData,self).__init__()
    self.dataset = pd.read_csv(dataset_path)
    if flag == 'train':
      num_samples_train = int(len(self.dataset) * 0.9)
      self.dataset = self.dataset.iloc[:num_samples_train]
    else:
      num_samples_train = int(len(self.dataset) * 0.9)
      self.dataset = self.dataset[num_samples_train:]

    self.informalStyle = informalStyle
    self.formalStyle = formalStyle

  def __len__(self):
    return len(self.dataset)

  def sentence_to_tensor(self,sentence,LangStyle):
    vector = [LangStyle.word2index[word] for word in word_tokenize(sentence)]
    vector.append(EOS_token)
    vector = torch.tensor(vector, dtype=torch.long)
    return vector

  def __getitem__(self,index):

    data = self.dataset.iloc[index]
    informal_sentence = data['inFormalForm']
    formal_sentence = data['formalForm']

    informal_tensor = self.sentence_to_tensor(informal_sentence,self.informalStyle)
    # informal_tensor = informal_tensor.view(-1,1)
    formal_tensor = self.sentence_to_tensor(formal_sentence,self.formalStyle)
    # formal_tensor = formal_tensor.view(-1,1)

    return (informal_tensor,formal_tensor)

#Training

In [None]:
DATASET_PATH = '/content/drive/MyDrive/text_style_transfer/CleanedDataset_v2.csv'
HIDDEN_SIZE = 64
NUM_EPOCHS = 50
LEARNING_RATE = 0.01
BATCH_SIZE = 1
MAX_LEN = 20
TEACHER_FORCE = 1

In [None]:
train_data = TSTData(DATASET_PATH,informalStyle,formalStyle,'train')
test_data = TSTData(DATASET_PATH,informalStyle,formalStyle,'test')

train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE)

encoder = EncoderRNN(informalStyle.n_words, HIDDEN_SIZE).to(device)
decoder = DecoderRNN(HIDDEN_SIZE, formalStyle.n_words, 0.1, MAX_LEN).to(device)

encoder_optimizer = torch.optim.SGD(encoder.parameters(),lr=LEARNING_RATE)
decoder_optimizer = torch.optim.SGD(decoder.parameters(),lr=LEARNING_RATE)
criterion = nn.NLLLoss()

In [None]:
print(encoder)

EncoderRNN(
  (embedding): Embedding(39847, 64)
  (gru): GRU(64, 64, num_layers=5)
)


In [None]:
print(decoder)

DecoderRNN(
  (embedding): Embedding(28111, 64)
  (attn): Linear(in_features=128, out_features=20, bias=True)
  (attn_combine): Linear(in_features=128, out_features=64, bias=True)
  (dropout): Dropout(p=0.1, inplace=False)
  (gru): GRU(64, 64, num_layers=5)
  (out): Linear(in_features=64, out_features=28111, bias=True)
)


In [None]:
for epoch in range(NUM_EPOCHS):
  loss = 0
  print('**********************************************************')
  losses = []
  for iter,batch in enumerate(test_dataloader):
    loss = 0
    informal_sentence = batch[0].to(device) #[batch_size,num_tokens,1]
    formal_sentence  = batch[1].to(device)  #[batch_size,num_tokens,1]


    input_length = informal_sentence.shape[1]
    target_length = formal_sentence.shape[1]

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    encoder_hidden = encoder.initHidden().to(device) #[1,1,256]
    encoder_outputs = torch.zeros(BATCH_SIZE,MAX_LEN, encoder.hidden_size).to(device) #[batch_size,max_len_tokens,hidden_size]

    for index in range(input_length):

      word = informal_sentence[:,index]
      encoder_output, encoder_hidden = encoder(word,encoder_hidden) # encoder_output: [1,1,256] - encdoder_hidden: [1,1,256]
      encoder_outputs[:,index,:] = encoder_output


    decoder_input = torch.tensor([[SOS_token]]).to(device)
    decoder_hidden = encoder_hidden

    if TEACHER_FORCE > random.random():
      for index in range(target_length):
        decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
        loss += criterion(decoder_output, formal_sentence[:,index])
        decoder_input = formal_sentence[:,index]
    else:
      for index in range(target_length):
        decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
        topv, topi = decoder_output.topk(1)
        decoder_input = topi.squeeze().detach()  # detach from history as input
        loss += criterion(decoder_output, formal_sentence[:,index])

        if decoder_input.item() == EOS_token:
          break

    losses.append(loss.item())
    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()

  if epoch % 10 == 0:
    torch.save(encoder.state_dict(), f"encoder{epoch}.pth")
    torch.save(decoder.state_dict(), f"decoder{epoch}.pth")
  print(f'end of epoch {epoch} and loss is {sum(losses)/len(test_dataloader)}')

In [None]:
cd /content/drive/MyDrive/text_style_transfer

# Evaluation

In [None]:
with torch.no_grad():
  bluescore = 0
  for iter,batch in enumerate(test_dataloader):

    informal_sentence = batch[0].to(device)
    input_length = informal_sentence.shape[1]
    encoder_hidden = encoder.initHidden().to(device) #[1,1,256]
    encoder_outputs = torch.zeros(BATCH_SIZE,MAX_LEN, encoder.hidden_size).to(device)

    for index in range(input_length):
      word = informal_sentence[:,index]
      encoder_output, encoder_hidden = encoder(word,encoder_hidden) # encoder_output: [1,1,256] - encdoder_hidden: [1,1,256]
      encoder_outputs[:,index,:] = encoder_output

    decoder_input = torch.tensor([[SOS_token]]).to(device)
    decoder_hidden = encoder_hidden

    decoded_words = []
    for index in range(MAX_LEN):
      decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
      topv, topi = decoder_output.data.topk(1)
      if topi.item() == EOS_token:
        decoded_words.append('<EOS>')
        break
      else:
        decoded_words.append(formalStyle.index2word[topi.item()])
        decoder_input = topi.squeeze().detach()
    bluescore += bleu_score(target_sentence,decoded_words)
  print(decoded_words)