In [None]:
!pip install pytorch-lightning

In [None]:
!pip install py-rouge

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Imports

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import Data_processing as dpros
import Emotion_model as EM
import test_functions as test_func
import statistics
from tqdm import tqdm
from torch.autograd import Variable
from pytorch_lightning.callbacks import ModelCheckpoint
import pytorch_lightning as pl
import nltk
import random
import numpy as np
import pandas as pd
import torch.utils.data as utils
from torch.utils.data import Dataset, DataLoader
import nltk.translate.bleu_score as bleu

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


# Automatic Dialogue Generation with Expressed Emotions

## Data processing

In [None]:

file_source_1 = 'drive/My Drive/Colab Notebooks/source_train_word.txt'
file_target_1 = 'drive/My Drive/Colab Notebooks/target_train_emotion.txt'

file_source_2 = 'drive/My Drive/Colab Notebooks/source_test_word.txt'
file_target_2 = 'drive/My Drive/Colab Notebooks/target_test_emotion.txt'

word2id, id2word = dpros.index_opensub()


In [None]:

def process_data(file_source, file_target, w2idx):

    lines_source = dpros.read_lines(filename=file_source)
    lines_target = dpros.read_lines(filename=file_target)

    # make every character lower case
    lines_source = [ line.lower() for line in lines_source ]
    lines_target = [ line.lower() for line in lines_target ]

    lines_source = [ dpros.filter_line_opensub(line) for line in lines_source ]
    lines_target = [ dpros.filter_line_opensub(line) for line in lines_target ]

    q_sentence, a_sentence = dpros.filter_data_opensub(lines_source, lines_target)

    alines = []
    tag = []
    
    # separate the emotion from the sentence
    for sentence in a_sentence:
        alines.append(sentence[:-1])
        tag.append(sentence[-1])

    # converts the lists of lines into lists of lists of words
    qtokenized = [ wordlist.split(' ') for wordlist in q_sentence ]
    atokenized = [ wordlist.split(' ') for wordlist in alines ]

    idx_q, q_length = dpros.zero_pad_source(qtokenized, w2idx)
    idx_a = dpros.zero_pad_target(atokenized, w2idx)

    return idx_q, q_length, idx_a, tag


In [None]:

source_train_full, length_train_full, target_train_full, tag_train_full = process_data(file_source_1, file_target_1, word2id)
source_test, length_test, target_test, tag_test = process_data(file_source_2, file_target_2, word2id)

source_train_full_emotion, length_trains = dpros.emotion_pad(source_train_full, length_train_full, tag_train_full, word2id)
source_test_emotion, length_tests = dpros.emotion_pad(source_test, length_test, tag_test, word2id)


In [None]:

## divide the sentences in train and validation. Then, create the dataset objects for the three sets:

valid_index = int(len(source_train_full) - int(0.95 * len(source_train_full)))
train_index = int(0.95 * len(source_train_full))

source_train = [source_train_full_emotion[i] for i in range(0, train_index)]
target_train = [target_train_full[i] for i in range(0, train_index)]
length_train = [length_trains[i] for i in range(0, train_index)]

source_valid = [source_train_full_emotion[i] for i in range(train_index + 1, train_index + valid_index)]
target_valid = [target_train_full[i] for i in range(train_index + 1, train_index + valid_index)]
length_valid = [length_trains[i] for i in range(train_index + 1, train_index + valid_index)]

training_set = utils.TensorDataset(torch.LongTensor(source_train), torch.LongTensor(length_train), torch.LongTensor(target_train))
valid_set = utils.TensorDataset(torch.LongTensor(source_valid), torch.LongTensor(length_valid), torch.LongTensor(target_valid))
test_set = utils.TensorDataset(torch.LongTensor(source_test_emotion), torch.LongTensor(length_tests), torch.LongTensor(target_test))



## Model 1: Enc-bef

In [None]:

class Final_model_light(pl.LightningModule):
    
    def __init__(self, emb_dim, enc_hid_dim, hid_dim, vocab_size, num_directions, pad_len, encoder, decoder, dropout):
        
        super(Final_model_light, self).__init__()
        
        self.encoder = encoder
        self.decoder = decoder

        self.embeddings = nn.Embedding(vocab_size, emb_dim)

        self.encoder2decoder = nn.Linear(enc_hid_dim * num_directions, hid_dim)
        self.tanh = nn.Tanh()
        self.out = nn.Linear(hid_dim, vocab_size)

    ## the forward method that makes the operations of the emotion embedding model and outputs the predictions
        
    def forward(self, input_sentence, input_length, target):
    
        input_sentence = self.embeddings(input_sentence)
        target = self.embeddings(target)

        src, (src_hidden, src_cell) = self.encoder(input_sentence, input_length)

        decoder_init_state = self.encoder2decoder(src_hidden)

        decoder_init_state = self.tanh(decoder_init_state)

        context = src.transpose(0, 1)

        trg, (hidden_trg, cell_trg) = self.decoder(target, (decoder_init_state, src_cell), context)

        logits = self.out(trg)

        return logits
    
    ## updates the embedding weights with the pre-trained weights:

    def load_word_embedding(self, id2word):

        embeddings_index = {}
        f = open('drive/My Drive/Colab Notebooks/cc.en.300.vec', 'r', encoding='utf-8')
        for line in tqdm(f):
            values = line.rstrip().rsplit(' ')
            word = values[0]
            coefs = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = coefs
        f.close()

        embedding_matrix = np.zeros((vocab_size, emb_dim))
        for i, word in id2word.items():
            if i < vocab_size:
                embedding_vector = embeddings_index.get(word)
                if embedding_vector is not None:
                    embedding_matrix[i] = embedding_vector
                else:
                    if word == '<pad>':
                        embedding_matrix[i] = np.zeros([emb_dim])
                    else:
                        embedding_matrix[i] = np.random.uniform(-1, 1, emb_dim)
        self.embeddings.weight = nn.Parameter(torch.FloatTensor(embedding_matrix))
        self.embeddings.requires_grad = False

    ## definition of the training step:
    
    def training_step(self, batch, batch_idx):
        
        src, src_length, trg = batch

        trg_1 = trg[:, :-1]
        trg_2 = trg[:, 1:]

        # makes the forward pass

        decoder_logit = self.forward(src, src_length, trg_1)

        ## calculate the loss

        loss = criterion(decoder_logit.contiguous().view(-1, vocab_size), trg_2.contiguous().view(-1))

        return {'loss': loss}

    ## at the end of the training step, print the average loss

    def training_epoch_end(self, outputs):
        
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        print("Train loss: ", avg_loss)
        return {'training_loss': avg_loss}
    
    def validation_step(self, batch, batch_idx):
        
        src, src_length, trg = batch

        trg_1 = trg[:, :-1]
        trg_2 = trg[:, 1:]

        # makes the forward pass

        decoder_logit = self.forward(src, src_length, trg_1)

        ## calculate the loss

        val_loss = criterion(decoder_logit.contiguous().view(-1, vocab_size), trg_2.contiguous().view(-1))

        return {'val_loss': val_loss}

    ## at the end of the validation step, print the average loss
    
    def validation_epoch_end(self, outputs):
        
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        print("Avg loss: ", avg_loss)
        return {'val_loss': avg_loss}
    
    def test_step(self, batch, batch_idx):
        
        src, src_length, trg = batch

        trg_1 = trg[:, :-1]
        trg_2 = trg[:, 1:]

        # makes the forward pass

        decoder_logit = self.forward(src, src_length, trg_1)

        # calculate the loss

        test_loss = criterion(decoder_logit.contiguous().view(-1, vocab_size), trg_2.contiguous().view(-1))

        return {'test_loss': test_loss}

    ## at the end of the test step, print the average loss
    
    def test_epoch_end(self, outputs):
        
        avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean()
        return {'test_loss': avg_loss}
    
    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=0.0001)
    
    def train_dataloader(self):
        
        train_loader = DataLoader(training_set, batch_size=dpros.batch_size, shuffle=True, num_workers=3)
        
        return train_loader
    
    def val_dataloader(self):
        
        valid_loader = DataLoader(valid_set, batch_size=dpros.batch_size, num_workers=3)
        
        return valid_loader
    
    def test_dataloader(self):
        
        test_loader = DataLoader(test_set, batch_size=dpros.batch_size, num_workers=3)
        
        return test_loader


### Train model

In [None]:

input_dim = len(word2id)
pad_len = 30
emb_dim = 300
dec_hid_dim = 600
enc_hid_dim = 300
hid_dim = 600
dropout = 0.2
vocab_size = len(word2id)
num_directions = 2
target_pad = word2id['<pad>']

encoder = EM.Encoder(input_dim, emb_dim, enc_hid_dim, dropout)
decoder = EM.Decoder(input_dim, emb_dim, dec_hid_dim, dropout)


In [None]:

criterion = nn.CrossEntropyLoss().cuda()


In [None]:

light_model = Final_model_light(emb_dim, enc_hid_dim, hid_dim, vocab_size, num_directions, pad_len, encoder, decoder, dropout)
light_model.load_word_embedding(id2word)

checkpoint_callback = ModelCheckpoint(filepath='drive/My Drive/Colab Notebooks/Model_opensub_checkpoint/', save_top_k=1, verbose=True, monitor='val_loss',mode='min')

trainer = pl.Trainer(gpus=1, max_epochs=2, log_save_interval=100000, weights_summary=None, progress_bar_refresh_rate=0, log_gpu_memory=None, default_root_dir='drive/My Drive/Colab Notebooks/Model_opensub_checkpoint/', checkpoint_callback=checkpoint_callback)

## trainer = pl.Trainer(resume_from_checkpoint='drive/My Drive/Colab Notebooks/Model_opensub_checkpoint/epoch=10.ckpt', gpus=1, max_epochs=11, checkpoint_callback=checkpoint_callback, log_save_interval=100000, weights_summary=None, progress_bar_refresh_rate=0, log_gpu_memory=None, default_root_dir='drive/My Drive/Colab Notebooks/Model_opensub_checkpoint/')


In [None]:

trainer.fit(light_model)
trainer.test(light_model)


## Evaluation metrics

In [None]:

source_test_emotion, length_tests = dpros.emotion_pad(source_test[:len(source_test)//10], length_test[:len(length_test)//10], tag_test[:len(tag_test)//10], word2id)
test_set = utils.TensorDataset(torch.LongTensor(source_test_emotion), torch.LongTensor(length_tests), torch.LongTensor(target_test[:len(target_test)//10]))

test_loader = DataLoader(test_set, batch_size=dpros.batch_size, num_workers=3, drop_last=True)

answers, predict = test_func.create_answers_preds(light_model, test_loader, word2id, id2word)

print("Created successfully!")


In [None]:

for i in range(0, len(answers)):

    answers[i] = answers[i].split('<pad>')[0]
    predict[i] = predict[i].split('<pad>')[0]


### BLEU

In [None]:

test_func.bleu_scores(answers, predict)


### ROUGE

In [None]:

test_func.rouge_scores(answers, predict)


### Embedding average metric

In [None]:

test_func.word_embedding_scores(answers, predict, light_model, word2id)


In [None]:

## this is for creating the predictions for each emotion and store them in separate files

for tag in range(5):
    source_test_emotion, length_tests = dpros.emotion_pad(source_test[:len(source_test)//10], length_test[:len(length_test)//10], tag, word2id)
    test_set = utils.TensorDataset(torch.LongTensor(source_test_emotion), torch.LongTensor(length_tests), torch.LongTensor(target_test[:len(target_test)//10]))
    test_loader = DataLoader(test_set, batch_size=dpros.batch_size, num_workers=3, drop_last=True)
    sources, predict = test_func.create_sources_preds(light_model, test_loader, word2id, id2word)

    print("Emotion " + str(tag))

    df = pd.DataFrame({'Sources': [''.join(source_test) for source_test in sources], 'Predictions': [''.join(predicted_test) for predicted_test in predict]})
    df.to_csv('Emotion' + str(tag) + '.csv', encoding='utf-8', index=False)
