# Setting up

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install -qU hazm

In [None]:
!pip install torchmetrics

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[0mCollecting torchmetrics
  Downloading torchmetrics-0.11.4-py3-none-any.whl (519 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m519.2/519.2 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torchmetrics
Successfully installed torchmetrics-0.11.4


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset,DataLoader

import torchaudio
from torchaudio import transforms

import torchmetrics
from torchmetrics import BLEUScore

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

import os
import random
import re

import hazm
from hazm import word_tokenize

In [None]:
print(torch.__version__)
print(torchaudio.__version__)

2.0.1+cu118
2.0.2+cu118


In [None]:
torch.random.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
cd /content/drive/MyDrive/DL_hw4

/content/drive/MyDrive/DL_hw4


In [None]:
# !pip install unrar
# !unrar x '/content/drive/MyDrive/DL_hw4/DL-HW4-Dataset.rar'

# Creating the dictionary

In [None]:
class dictionary:
  def __init__(self):
    self.word2index = {}
    self.index2word = {0: "SOS", 1:"EOS"}
    self.word2count = {"SOS":0 , "EOS":1}
    self.n_words = 2
    self.normalizer = hazm.Normalizer()

  def clean(self,text):
    text = text.strip()
    text = self.normalizer.normalize(text) #normalizing
    text = re.sub(r"([.!?])", r" \1", text) # inserting a space between words and punctuations
    text = re.sub("\s+", " ", text) #removing redundant white spaces
    return text

  def add_setence_to_lang(self,sentence):
    for token in word_tokenize(sentence):
      if token not in self.word2index:
        self.word2index[token] = self.n_words
        self.word2count[token] = 1
        self.index2word[self.n_words] = token
        self.n_words +=1
      else:
        self.word2count[token] += 1

In [None]:
DATASET_PATH = '/content/drive/MyDrive/DL_hw4/DL-Hw4-Dataset/Persian-Speech-To-Text-Maps.xlsx'
dataset = pd.read_excel(DATASET_PATH)

Dictionary = dictionary()
for index, row in dataset.iterrows():
  text = row['text']
  text = Dictionary.clean(text)
  Dictionary.add_setence_to_lang(text)

In [None]:
print('Number of words in the dictionary:',Dictionary.n_words)

Number of words in the dictionary: 2367


# Creating the dataloader

In [None]:
SOS_token = 0
EOS_token = 1

In [None]:
class dataset_SR(Dataset):

  def __init__(self,Dictionary,flag):

    self.audios_root = '/content/drive/MyDrive/DL_hw4/DL-Hw4-Dataset/Persian-Speech-To-Text-Audios'
    dataset_path = '/content/drive/MyDrive/DL_hw4/DL-Hw4-Dataset/Persian-Speech-To-Text-Maps.xlsx'
    self.dataset = pd.read_excel(dataset_path)
    self.Dictionary = Dictionary
    self.normalizer = hazm.Normalizer()
    train_data, test_data = train_test_split(self.dataset, test_size=0.1, random_state=42)
    train_data, validation_data =  train_test_split(train_data, test_size=0.1, random_state=42)

    if flag == 'train':
      self.dataset = train_data
      self.dataset = self.dataset.reset_index(drop=True)
    elif flag == 'validation':
      self.dataset = validation_data
      self.dataset = self.dataset.reset_index(drop=True)
    elif flag == 'test':
      self.dataset = test_data
      self.dataset = self.dataset.reset_index(drop=True)

  def __len__(self):
    return len(self.dataset)

  def preprocess_audio(self,audio_path):
    waveform, sample_rate = torchaudio.load(audio_path)
    spectrogram_transform = transforms.Spectrogram(n_fft=128, hop_length=4096)
    spectrogram = spectrogram_transform(waveform)
    return spectrogram

  def preprocess_text(self,text):
    text = text.strip()
    text = self.normalizer.normalize(text) #normalizing
    text = re.sub(r"([.!?])", r" \1", text) # inserting a space between words and punctuations
    text = re.sub("\s+", " ", text) #removing redundant white spaces

    vector = [SOS_token]
    for word in word_tokenize(text):
      vector.append(self.Dictionary.word2index[word])
    vector.append(EOS_token)
    vector = torch.tensor(vector, dtype=torch.long)
    return vector

  def __getitem__(self,index):
    audio_name = self.dataset.loc[index,'audio']
    audio_name = audio_name.split('/')[-1]
    audio_path = os.path.join(self.audios_root,audio_name)
    audio = self.preprocess_audio(audio_path)

    text = self.dataset.loc[index,'text']
    text = self.preprocess_text(text)
    return audio,text

# Creating the model

In [None]:
class Encoder(nn.Module):
  def __init__(self,inp_dim,hid_dim,n_layers,dropout,layer_type):
    super().__init__()
    self.layer_type = layer_type
    if layer_type == "RNN":
      self.rnn = nn.RNN(inp_dim, hid_dim, n_layers, dropout = dropout)
    elif layer_type == "GRU":
      self.rnn = nn.GRU(inp_dim, hid_dim, n_layers, dropout = dropout)
    elif layer_type == "LSTM":
      self.rnn = nn.LSTM(inp_dim, hid_dim, n_layers, dropout = dropout)

    self.dropout = nn.Dropout(dropout)

  def forward(self,x,hidden,cell=None):

    if self.layer_type == "RNN":
      output, hidden = self.rnn(x, hidden)
      return hidden

    elif self.layer_type == "GRU":
      output, hidden = self.rnn(x, hidden)
      return hidden

    elif self.layer_type == "LSTM":
      outputs, (hidden,cell) = self.rnn(x,(hidden,cell))
      return hidden,cell

In [None]:
class Decoder(nn.Module):

  def __init__(self,inp_dim,hid_dim,output_dim,n_layers,dropout,layer_type):

    super().__init__()
    self.embedding = nn.Embedding(output_dim,inp_dim)
    self.layer_type = layer_type

    if layer_type == "RNN":
      self.rnn = nn.RNN(inp_dim, hid_dim, n_layers, dropout = dropout)
    elif layer_type == "GRU":
      self.rnn = nn.GRU(inp_dim, hid_dim, n_layers, dropout = dropout)
    elif layer_type == "LSTM":
      self.rnn = nn.LSTM(inp_dim, hid_dim, n_layers, dropout = dropout)

    self.fc_out = nn.Linear(hid_dim, output_dim)
    self.dropout = nn.Dropout(dropout)
    self.activation_layer = nn.LogSoftmax(dim=1)

  def forward(self,input,hidden,cell=None):
    #input = [1]
    input = input.unsqueeze(0) #input = [1,1]
    x = self.embedding(input) #embedding = [1,1,128]
    x = self.dropout(x)

    if self.layer_type == "RNN":
      output, hidden = self.rnn(x, hidden)
      output = self.fc_out(output.squeeze(0))
      prediction = self.activation_layer(output)
      return prediction, hidden

    elif self.layer_type == "GRU":
      output, hidden = self.rnn(x, hidden)
      output = self.fc_out(output.squeeze(0))
      prediction = self.activation_layer(output)
      return prediction, hidden

    elif self.layer_type == "LSTM":
      output, (hidden,cell) = self.rnn(x,(hidden,cell))
      output = self.fc_out(output.squeeze(0))
      prediction = self.activation_layer(output)
      return prediction, hidden, cell

# Instantiating objects and setting hyperparameters

In [None]:
INPUT_SIZE = 65
HIDDEN_SIZE = 128
OUTPUT_SIZE = Dictionary.n_words

NUM_LAYERS = 2
MAX_LEN =  20

NUM_EPOCHS = 25
LEARNING_RATE = 0.001
BATCH_SIZE = 1
TEACHER_FORCE = 1

In [None]:
train_data = dataset_SR(Dictionary, flag = 'train')
validation_data = dataset_SR(Dictionary, flag = 'validation')
test_data = dataset_SR(Dictionary, flag = 'test')

train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE)
validation_dataloader = DataLoader(validation_data, batch_size=BATCH_SIZE)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE)

criterion = nn.NLLLoss()

In [None]:
def build_modules(layer_type):
  encoder = Encoder(INPUT_SIZE, HIDDEN_SIZE, NUM_LAYERS, dropout=0.2, layer_type = layer_type).to(device)
  decoder = Decoder(INPUT_SIZE, HIDDEN_SIZE, OUTPUT_SIZE, NUM_LAYERS, dropout=0.2,  layer_type = layer_type).to(device)

  encoder_optimizer = torch.optim.Adam(encoder.parameters(),lr=LEARNING_RATE)
  decoder_optimizer = torch.optim.Adam(decoder.parameters(),lr=LEARNING_RATE)

  return encoder, decoder, encoder_optimizer, decoder_optimizer

# Training and evaluation

In [None]:
def train(encoder, decoder, encoder_optimizer, decoder_optimizer):

  loss = 0
  for iter,batch in enumerate(train_dataloader):
    loss = 0
    audio = batch[0].to(device) #[batch_size, 1, num_bins, num_frames]
    text  = batch[1].to(device)  #[batch_size, num_tokens]

    num_frames = audio.shape[-1] #num_frames
    num_word_tokens = text.shape[-1] #num_tokens

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    encoder_hidden = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]
    encoder_cell = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]
    encoder_outputs = torch.zeros(num_frames,NUM_LAYERS,BATCH_SIZE, HIDDEN_SIZE).to(device) #[num_frames, NUM_LAYERS, BATCH_SIZE, 64]

    for index in range(num_frames):
      audio_frame = audio[:,:,:,index] #[1,1,128]

      if encoder.layer_type == "LSTM":
        encoder_hidden, encoder_cell = encoder(audio_frame,encoder_hidden,encoder_cell) # encoder_output: [1,1,64] - encdoder_hidden: [num_layers,batch_size,64]
      else: # GRU or simple RNN
        encoder_hidden = encoder(audio_frame,encoder_hidden)

      encoder_outputs[index,:,:,:] = encoder_hidden

    # decoder_input = torch.tensor([[SOS_token]]).to(device)
    decoder_hidden = encoder_hidden #hidden state of decoder is equal to the last hidden state of encoder
    decoder_cell = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]

    for index in range(num_word_tokens-1):
      decoder_input = text[:,index]
      if decoder.layer_type == "LSTM":
        decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, decoder_hidden, decoder_cell)
      else:
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)

      loss += criterion(decoder_output, text[:,index+1])

    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()

  torch.save(encoder.state_dict(), f"encoder{encoder.layer_type}.pth")
  torch.save(decoder.state_dict(), f"decoder{decoder.layer_type}.pth")
  return loss , encoder, decoder

In [None]:
def validate(encoder, decoder, encoder_optimizer, decoder_optimizer):

  loss = 0
  for iter,batch in enumerate(validation_dataloader):
    loss = 0
    audio = batch[0].to(device) #[batch_size, 1, num_bins, num_frames]
    text  = batch[1].to(device)  #[batch_size, num_tokens]

    num_frames = audio.shape[-1] #num_frames
    num_word_tokens = text.shape[-1] #num_tokens

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    encoder_hidden = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]
    encoder_cell = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]
    encoder_outputs = torch.zeros(num_frames,NUM_LAYERS,BATCH_SIZE, HIDDEN_SIZE).to(device) #[num_frames, NUM_LAYERS, BATCH_SIZE, 64]

    for index in range(num_frames):
      audio_frame = audio[:,:,:,index] #[1,1,128]

      if encoder.layer_type == "LSTM":
        encoder_hidden, encoder_cell = encoder(audio_frame,encoder_hidden,encoder_cell) # encoder_output: [1,1,64] - encdoder_hidden: [1,1,64]
      else: # GRU or simple RNN
        encoder_hidden = encoder(audio_frame,encoder_hidden)

      encoder_outputs[index,:,:,:] = encoder_hidden

    # decoder_input = torch.tensor([[SOS_token]]).to(device)
    decoder_hidden = encoder_hidden #hidden state of decoder is equal to the last hidden state of encoder
    decoder_cell = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]

    for index in range(num_word_tokens-1):
      decoder_input = text[:,index]

      if decoder.layer_type == "LSTM":
        decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, decoder_hidden, decoder_cell)
      else:
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)

      loss += criterion(decoder_output, text[:,index+1])

  return loss

# Training and evaluating different models

In [None]:
if __name__ == "__main__":
  train_losses = []
  validation_losses = []
  encoder, decoder, encoder_optimizer, decoder_optimizer = build_modules("RNN")

  for i in range(NUM_EPOCHS):
    print('##########')
    train_loss, encoder, decoder = train(encoder, decoder, encoder_optimizer, decoder_optimizer)
    validation_loss = validate(encoder, decoder, encoder_optimizer, decoder_optimizer)

    print(f'train loss after epoch {i} is: {train_loss/len(train_dataloader)}')
    print(f'validation loss after epoch {i} is: {validation_loss/len(validation_dataloader)}')

    train_losses.append(train_loss/len(train_dataloader))
    validation_losses.append(validation_loss/len(validation_dataloader))

In [None]:
train_losses_per_epoch = [loss.item() for loss in train_losses]
validation_losses_per_epoch = [loss.item() for loss in validation_losses]

plt.figure(figsize=(5,5))

plt.plot(train_losses_per_epoch, "ro-", label="Train")
plt.plot(validation_losses_per_epoch, "go-", label="Validation")
plt.legend()
plt.title("Loss")
plt.xlabel("Epochs")

In [None]:
if __name__ == "__main__":
  train_losses = []
  validation_losses = []
  encoder, decoder, encoder_optimizer, decoder_optimizer = build_modules("GRU")

  for i in range(NUM_EPOCHS):
    print('##########')
    train_loss, encoder, decoder = train(encoder, decoder, encoder_optimizer, decoder_optimizer)
    validation_loss = validate(encoder, decoder, encoder_optimizer, decoder_optimizer)

    print(f'train loss after epoch {i} is: {train_loss/len(train_dataloader)}')
    print(f'validation loss after epoch {i} is: {validation_loss/len(validation_dataloader)}')

    train_losses.append(train_loss/len(train_dataloader))
    validation_losses.append(validation_loss/len(validation_dataloader))

In [None]:
train_losses_per_epoch = [loss.item() for loss in train_losses]
validation_losses_per_epoch = [loss.item() for loss in validation_losses]

plt.figure(figsize=(5,5))

plt.plot(train_losses_per_epoch, "ro-", label="Train")
plt.plot(validation_losses_per_epoch, "go-", label="Validation")
plt.legend()
plt.title("Loss")
plt.xlabel("Epochs")

In [None]:
if __name__ == "__main__":
  train_losses = []
  validation_losses = []
  encoder, decoder, encoder_optimizer, decoder_optimizer = build_modules("LSTM")

  for i in range(NUM_EPOCHS):
    print('##########')
    train_loss, encoder, decoder = train(encoder, decoder, encoder_optimizer, decoder_optimizer)
    validation_loss = validate(encoder, decoder, encoder_optimizer, decoder_optimizer)

    print(f'train loss after epoch {i} is: {train_loss/len(train_dataloader)}')
    print(f'validation loss after epoch {i} is: {validation_loss/len(validation_dataloader)}')

    train_losses.append(train_loss/len(train_dataloader))
    validation_losses.append(validation_loss/len(validation_dataloader))

In [None]:
train_losses_per_epoch = [loss.item() for loss in train_losses]
validation_losses_per_epoch = [loss.item() for loss in validation_losses]

plt.figure(figsize=(5,5))

plt.plot(train_losses_per_epoch, "ro-", label="Train")
plt.plot(validation_losses_per_epoch, "go-", label="Validation")
plt.legend()
plt.title("Loss")
plt.xlabel("Epochs")

In [None]:
def evaluate(encoder,decoder):
  bleu = BLEUScore()
  MAX_LEN = 20
  bluescore = 0
  with torch.no_grad():
    for iter,batch in enumerate(test_dataloader):

      audio = batch[0].to(device) #[batch_size, 1, num_bins, num_frames]
      text  = batch[1] #[batch_size, num_tokens]

      num_frames = audio.shape[-1] #num_frames

      encoder_hidden = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]
      encoder_cell = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]
      encoder_outputs = torch.zeros(num_frames,NUM_LAYERS,BATCH_SIZE, HIDDEN_SIZE).to(device) #[num_frames, NUM_LAYERS, BATCH_SIZE, 64]

      for index in range(num_frames):
        audio_frame = audio[:,:,:,index] #[1,1,128]
        if encoder.layer_type == "LSTM":
          encoder_hidden, encoder_cell = encoder(audio_frame,encoder_hidden,encoder_cell) # encoder_output: [1,1,64] - encdoder_hidden: [1,1,64]
        else: # GRU or simple RNN
          encoder_hidden = encoder(audio_frame,encoder_hidden)
        encoder_outputs[index,:,:,:] = encoder_hidden

      decoder_hidden = encoder_hidden
      decoder_cell = torch.zeros(NUM_LAYERS, BATCH_SIZE, HIDDEN_SIZE).to(device) #[1, 1, 64]

      decoded_words = []
      decoder_input = torch.tensor([SOS_token]).to(device)

      for index in range(MAX_LEN):

        if decoder.layer_type == "LSTM":

          decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, decoder_hidden, decoder_cell)
        else:
          decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)

        topv, topi = decoder_output.data.topk(1)
        if topi.item() == EOS_token:
          decoded_words.append('<EOS>')
          break
        else:
          decoded_words.append(Dictionary.index2word[topi.item()])
          decoder_input = torch.tensor([topi.item()]).to(device)

      input_text = []
      text = torch.squeeze(text)

      for index in text:
        ind = int(index.item())
        word = Dictionary.index2word[ind]
        input_text.append(word)

      print(decoded_words)
      print(input_text)
      bluescore +=  bleu(decoded_words,input_text)


  return bluescore

In [None]:
print(evaluate(encoder,decoder))