In [1]:
pwd()

'D:\\Assignment\\Natural Language Processing\\Project\\Main\\Emotion-Cause-Analysis'

###  Import data

In [1]:
import json
import pandas as pd
from typing import Dict, List
import math
 
# Opening JSON file
f = open('.\Data\\text\Subtask_1_train.json', encoding="utf-8")
 
# returns JSON object as 
# a dictionary
data = json.load(f)

#data

## Function to make batches

In [2]:
def make_batches(sequences: List[str], batch_size: int) -> List[List[str]]:
    """Yield batch_size chunks from sequences."""

    batch_list=[]

    last_index=len(sequences)-1
    
    for index in range(math.ceil(len(sequences)/batch_size)):
        
        if index+batch_size:
            batch_list.append(sequences[index:index+batch_size])
        else:
            batch_list.append(sequences[index:last_index])
    # DONE
    return batch_list

## Tokenize and padding

In [3]:
from typing import Dict, List, Optional, Tuple
from collections import Counter

import torch
import numpy as np
import spacy



class Tokenizer:
    """Tokenizes and pads a batch of input sentences."""

    def __init__(self, pad_symbol: Optional[str] = "<PAD>"):
        """Initializes the tokenizer

        Args:
            pad_symbol (Optional[str], optional): The symbol for a pad. Defaults to "<PAD>".
        """
        self.pad_symbol = pad_symbol
        self.nlp = spacy.load("en_core_web_sm")
    
    def __call__(self, sentences_batch: List[str]) -> List[List[str]]:
        """Tokenizes each sentence in the batch, and pads them if necessary so
        that we have equal length sentences in the batch.

        Args:
            batch (List[str]): A List of sentence strings

        Returns:
            List[List[str]]: A List of equal-length token Lists.
        """
        sentences_batch = self.tokenize(sentences_batch)
        sentences_batch = self.pad(sentences_batch)

        return sentences_batch

    def tokenize(self, sentences: List[str]) -> List[List[str]]:
        """Tokenizes the List of string sentences into a Lists of tokens using spacy tokenizer.

        Args:
            sentences (List[str]): The input sentence.

        Returns:
            List[str]: The tokenized version of the sentence.
        """
        ret_tokenized_sentences = []
        # for token in self.nlp(sentences):
        #     ret_tokenized_sentences.append(token.text)
            
        ret_tokenized_sentences = []
        for sentence_iter in sentences:
            token_list = ['<SOS>']
            # token_list = []
            for token in self.nlp(sentence_iter):
                token_list.append(token.text)
            token_list.append('<EOS>')
            ret_tokenized_sentences.append(token_list)
        return ret_tokenized_sentences
    
    def pad(self, batch: List[List[str]]) -> List[List[str]]:
        """Appends pad symbols to each tokenized sentence in the batch such that
        every List of tokens is the same length. This means that the max length sentence
        will not be padded.

        Args:
            batch (List[List[str]]): Batch of tokenized sentences.

        Returns:
            List[List[str]]: Batch of padded tokenized sentences. 
        """
        ret_batch = []
        max_len = len(max(batch, key=len))

        ret_batch = [sentence_tokens + ['<P>'] * (max_len - len(sentence_tokens)) for sentence_tokens in batch]

        for sentence in batch:
            for i in range(max_len - len(sentence)):
                sentence.append(self.pad_symbol)

        return batch
        
        # return ret_tokenized_sentences

  hasattr(torch, "has_mps")
  and torch.has_mps  # type: ignore[attr-defined]


### Conversion of data to usable format

In [4]:
def data_to_conversation_list(data) -> (List[List[str]], List[List[str]]):
    # Create a list to store the data
    conversation_data = []
    emotion_list = []

    # Iterate through conversations
    for conversation in data:
        conversation_id = conversation["conversation_ID"]
        utterances = conversation["conversation"]
        
        # We make batches now and use those.
        # tokenized_data = []
        # Note: Labels need to be batched in the same way to ensure
        # We have train sentence and label batches lining up.
        #for batch in make_batches(utterances['text'], batch_size):
        #    tokenized_data.append(tokenizer(batch))
        # tokenized_data = my_tokenizer.tokenize(utterances)
        
        # print("After tokenization:", tokenized_data)
        
        emotion_cause_pairs = conversation["emotion-cause_pairs"]

        utterance_data = []
        emotion_temp = []
        # Process each utterance in the conversation
        for utterance in utterances:
            utterance_id = utterance["utterance_ID"]
            #print("Before tokenization:", utterance["text"], "\n After tokenization:", tokenizer(utterance["text"]))
            text = utterance["speaker"] + ": " + utterance["text"]
            #speaker = utterance["speaker"]
            emotion = utterance["emotion"]

            # Append the data to the list
            # conversation_data.append([conversation_id, utterance_id, text, emotion]) #speaker, emotion])
            utterance_data.append(text)
            emotion_temp.append(emotion)
            #print(utterance_data, emotion_temp)
            assert len(utterance_data) == len(emotion_temp)
        conversation_data.append(utterance_data) #speaker, emotion])
        emotion_list.append(emotion_temp)
        assert len(conversation_data) == len(emotion_list)
    
    return conversation_data, emotion_list

In [5]:
conversation_list, emotion_list = data_to_conversation_list(data)

In [6]:
#conversation_list, emotion_list

In [7]:
my_tokenizer = Tokenizer()
# print("Before tokenization:", utterances['text'])
        
batch_size = 8
tokenizer = Tokenizer()

for i in range(len(conversation_list)):
    conversation_list[i] = tokenizer(conversation_list[i])
    
#conversation_list

In [8]:
'''
from transformers import BertTokenizer, BertModel
import torch

def get_sentence_embedding(sentence, model_name="bert-base-uncased"):
    # Load pre-trained model tokenizer
    tokenizer = BertTokenizer.from_pretrained(model_name)

    # Encode text
    encoded_input = tokenizer(sentence, return_tensors='pt', padding=True, truncation=True, max_length=512)

    # Load pre-trained model
    model = BertModel.from_pretrained(model_name)

    # Forward pass, get hidden states
    with torch.no_grad():
        output = model(**encoded_input)

    # Get the embeddings of the [CLS] token (first token), representing the entire sentence
    sentence_embedding = output.last_hidden_state[:, 0, :]

    return sentence_embedding

# Example usage
sentence = "This is a test sentence."
embedding = get_sentence_embedding(sentence)
print(embedding)

'''


'\nfrom transformers import BertTokenizer, BertModel\nimport torch\n\ndef get_sentence_embedding(sentence, model_name="bert-base-uncased"):\n    # Load pre-trained model tokenizer\n    tokenizer = BertTokenizer.from_pretrained(model_name)\n\n    # Encode text\n    encoded_input = tokenizer(sentence, return_tensors=\'pt\', padding=True, truncation=True, max_length=512)\n\n    # Load pre-trained model\n    model = BertModel.from_pretrained(model_name)\n\n    # Forward pass, get hidden states\n    with torch.no_grad():\n        output = model(**encoded_input)\n\n    # Get the embeddings of the [CLS] token (first token), representing the entire sentence\n    sentence_embedding = output.last_hidden_state[:, 0, :]\n\n    return sentence_embedding\n\n# Example usage\nsentence = "This is a test sentence."\nembedding = get_sentence_embedding(sentence)\nprint(embedding)\n\n'

In [17]:
from transformers import BertTokenizer, BertModel
import torch

def get_sentence_embedding(sentence, model_name="bert-base-uncased"):
    # Load pre-trained model tokenizer
    tokenizer = BertTokenizer.from_pretrained(model_name)
    
    # Encode text
    encoded_input = tokenizer(sentence, return_tensors='pt', padding=True, truncation=True, max_length=512)

    # Load pre-trained model
    model = BertModel.from_pretrained(model_name)

    # Forward pass, get hidden states
    with torch.no_grad():
        output = model(**encoded_input)

    # Get the embeddings of the [CLS] token (first token), representing the entire sentence
    sentence_embedding = output.last_hidden_state[:, 0, :]

    return sentence_embedding




In [26]:
conversation_list[1]

[['<SOS>',
  'Ross',
  ':',
  'I',
  'do',
  'not',
  'want',
  'to',
  'be',
  'single',
  ',',
  'okay',
  '?',
  'I',
  'just',
  '...',
  'I',
  'just',
  '...',
  'I',
  'just',
  'wanna',
  'be',
  'married',
  'again',
  '!',
  '<EOS>'],
 ['<SOS>',
  'Chandler',
  ':',
  'And',
  'I',
  'just',
  'want',
  'a',
  'million',
  'dollars',
  '!',
  '<EOS>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>'],
 ['<SOS>',
  'Monica',
  ':',
  'Rachel',
  '?',
  '!',
  '<EOS>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>']]

In [29]:

# Special tokens to be removed
special_tokens = {'<SOS>', '<EOS>', '<PAD>'}

# Joining each sublist into strings and then joining these strings
conversation_string = ' '.join(
    ' '.join(
        ' '.join(word for word in inner_list if word not in special_tokens) 
        for inner_list in sublist
    ) 
    for sublist in conversation_list
)



In [32]:
conversation_embeddings = get_sentence_embedding(conversation_string)

In [33]:
conversation_embeddings

tensor([[-3.2951e-01,  2.9885e-02, -5.0583e-02, -1.5613e-02,  1.7823e-01,
         -2.8545e-01, -6.1972e-03, -9.5317e-02, -2.4619e-01, -3.3515e-01,
         -1.0778e-01,  2.1030e-01, -1.7457e-01, -1.2156e-01,  1.1606e-01,
          6.0827e-01,  1.5727e-01,  2.6050e-01,  4.2154e-01,  9.9186e-02,
         -2.5663e-02, -2.1765e-01,  1.3550e-01, -1.9045e-01,  2.9612e-01,
         -4.8937e-02, -5.8637e-02, -9.2775e-02,  3.1541e-01,  1.1832e-02,
         -5.6620e-01,  4.6970e-01, -1.6304e-01, -4.1667e-01,  5.6819e-01,
         -1.6462e-01,  1.0148e-01, -7.7733e-02,  2.7097e-01, -5.6543e-03,
          1.6207e-01,  4.3490e-01, -1.3973e-01, -1.1276e-01, -1.8835e-01,
          2.2367e-01, -3.8259e+00,  3.4220e-01,  4.3740e-01, -4.1970e-01,
          5.9737e-01, -5.1960e-01,  6.3404e-02,  5.6560e-01,  2.1485e-01,
          3.2506e-01, -2.2517e-01,  2.6366e-01,  3.8787e-01, -6.2588e-01,
          1.9052e-01,  2.4132e-01, -3.4627e-01,  4.9376e-02, -1.7191e-02,
          2.1989e-01,  2.0043e-01, -2.

## Glove embeddings (Uncomment if required)

In [13]:
# ! wget https://nlp.stanford.edu/data/glove.twitter.27B.zip

In [14]:
# ! tar -xf glove.twitter.27B.zip

In [15]:
# ! dir

### Doc2Vec

In [16]:
#Import all the dependencies
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
from nltk.tokenize import word_tokenize

In [17]:

for utterance_list in conversation_list:
    tagged_data = [TaggedDocument(_d, tags=[str(i)]) for i, _d in enumerate(utterance_list)]

In [18]:
max_epochs = 100
vec_size = 20
alpha = 0.025

model = Doc2Vec(vector_size=vec_size,
                alpha=alpha, 
                min_alpha=0.00025,
                min_count=1,
                dm =1)
  
model.build_vocab(tagged_data)

for epoch in range(max_epochs):
    print('iteration {0}'.format(epoch))
    model.train(tagged_data,
                total_examples=model.corpus_count,
                epochs=model.epochs)
    # decrease the learning rate
    model.alpha -= 0.0002
    # fix the learning rate, no decay
    model.min_alpha = model.alpha

model.save("d2v.model")
print("Model Saved")

iteration 0
iteration 1
iteration 2
iteration 3
iteration 4
iteration 5
iteration 6
iteration 7
iteration 8
iteration 9
iteration 10
iteration 11
iteration 12
iteration 13
iteration 14
iteration 15
iteration 16
iteration 17
iteration 18
iteration 19
iteration 20
iteration 21
iteration 22
iteration 23
iteration 24
iteration 25
iteration 26
iteration 27
iteration 28
iteration 29
iteration 30
iteration 31
iteration 32
iteration 33
iteration 34
iteration 35
iteration 36
iteration 37
iteration 38
iteration 39
iteration 40
iteration 41
iteration 42
iteration 43
iteration 44
iteration 45
iteration 46
iteration 47
iteration 48
iteration 49
iteration 50
iteration 51
iteration 52
iteration 53
iteration 54
iteration 55
iteration 56
iteration 57
iteration 58
iteration 59
iteration 60
iteration 61
iteration 62
iteration 63
iteration 64
iteration 65
iteration 66
iteration 67
iteration 68
iteration 69
iteration 70
iteration 71
iteration 72
iteration 73
iteration 74
iteration 75
iteration 76
iteration

In [19]:
from gensim.models.doc2vec import Doc2Vec

model= Doc2Vec.load("d2v.model")
#to find the vector of a document which is not in training data
test_data = word_tokenize("I love chatbots".lower())
v1 = model.infer_vector(test_data)
print("V1_infer", v1)

# to find most similar doc using tags
similar_doc = model.docvecs.most_similar('1')
print(similar_doc)


# to find vector of doc in training data using tags or in other words, printing the vector of document at index 1 in training data
print(model.docvecs['1'])

V1_infer [ 0.01308073 -0.00190342 -0.02114176  0.00471338 -0.00165673 -0.01510672
  0.01705324 -0.01671973  0.00889122  0.02146473 -0.02151536  0.00433522
  0.00438282 -0.01386737  0.01422673 -0.00469609 -0.01759827 -0.00123524
  0.00698823  0.01958854]
[('2', 0.5146083831787109), ('4', 0.506571888923645), ('3', 0.3784882724285126), ('0', -0.01671718992292881)]
[ 0.05988654  0.42412388  0.02434033  0.049225   -0.29706997 -2.5846186
 -0.30098644 -0.634541    0.3894923  -1.9694245   0.9922004   0.53607875
 -1.440379   -0.7760169  -1.3684759   1.129289    0.98527914 -2.4105103
 -1.562966   -0.99548066]


  similar_doc = model.docvecs.most_similar('1')
  print(model.docvecs['1'])


## Encode documents of coversations from pre trained doc2vec model

In [20]:
from gensim.models.doc2vec import Doc2Vec, TaggedDocument

def encode_documents(documents: List[List[List[str]]], model: Doc2Vec) -> torch.FloatTensor:
    """Encode the list of documents using a pre-trained Doc2Vec model.

    Args:
        documents (List[str]): List of documents to encode.
        model (Doc2Vec): Pre-trained Doc2Vec model.

    Returns:
        torch.FloatTensor: Tensor of encoded documents.
    """
    encoded_docs = []
    for conversation in documents:
        conversation_embeddings = [model.infer_vector(tokens, epochs=20) for tokens in conversation]
        encoded_docs.extend(conversation_embeddings)
    return torch.FloatTensor(encoded_docs)



def encode_labels(labels: List[List[str]]) -> List[torch.FloatTensor]:
    """Convert string labels into numeric tensors.

    Args:
        labels (List[List[str]]): Nested list of string labels.

    Returns:
        List[torch.FloatTensor]: List of tensors containing the labels.
    """
    label_dict = {'neutral': 0, 'surprise': 1, 'anger': 2, 'sadness': 3, 'happy': 4}  # Define your label mapping
    encoded_labels = []
    for nested_list in labels:
        # Map string labels to numeric values
        numeric_labels = [label_dict.get(label, -1) for label in nested_list]  # Use -1 as default value
        # Remove -1 values (labels not found in the dictionary)
        numeric_labels = [label for label in numeric_labels if label != -1]
        encoded_labels.append(torch.FloatTensor(numeric_labels))
    return encoded_labels


In [21]:
# Encode conversations
encoded_conversations = encode_documents(conversation_list, model)

# Emotion labels encoding remains the same as before
encoded_emotion_labels = encode_labels(emotion_list)

  return torch.FloatTensor(encoded_docs)


## Get OOVs from the model

In [22]:
# Get the vocabulary list
vocab_list = list(model.wv.index_to_key)

In [23]:
from typing import List

def get_oovs(vocab_list: List[str], model:Doc2Vec) -> List[str]:
    """Find the words in vocab_list that are out-of-vocabulary (OOV) in the Doc2Vec model.
    
    Args:
    vocab_list (List[str]): List of words in your vocabulary.
    model: Pre-trained Doc2Vec model.
    
    Returns:
    List[str]: List of out-of-vocabulary words in vocab_list.
    """
    oov_words = [word for word in vocab_list if word not in model.wv]
    return oov_words


In [24]:
get_oovs(vocab_list, model)

[]

## Emotion Classifier

In [None]:
class EmotionClassifier(torch.nn.Module):
    def __init__(self, output_size: int, hidden_size: int, model_name='prajjwal1/bert-small'):
        super().__init__()
        self.output_size = output_size
        self.hidden_size = hidden_size
        
        # Initialize BERT, which we use instead of a single embedding layer.
        self.bert = BertModel.from_pretrained(model_name)
        
        # TODO [OPTIONAL]: Updating all BERT parameters can be slow and memory intensive. 
        # Freeze them if training is too slow. Notice that the learning
        # rate should probably be smaller in this case.
        # Uncommenting out the below 2 lines means only our classification layer will be updated.
        
        # for param in self.bert.parameters():
        #     param.requires_grad = False
        
        self.bert_hidden_dimension = self.bert.config.hidden_size
        
        # TODO: Add an extra hidden layer in the classifier, projecting
        #      from the BERT hidden dimension to hidden size. Hint: torch.nn.Linear()
        
        self.hidden_layer = torch.nn.Linear(self.bert_hidden_dimension, self.hidden_size)
        
        # TODO: Add a relu nonlinearity to be used in the forward method
        #      https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html
        
        self.relu = torch.nn.ReLU()
        
        self.classifier = torch.nn.Linear(self.hidden_size, self.output_size)
        self.log_softmax = torch.nn.LogSoftmax(dim=2)
    
    def encode_text(
        self,
        symbols: Dict
    ) -> torch.Tensor:
        """Encode the (batch of) sequence(s) of token symbols BERT.
            Then, get CLS represenation.

        Args:
            symbols (Dict): The Dict of token specifications provided by the HuggingFace tokenizer

        Returns:
            torch.Tensor: CLS token embedding
        """
        # Get the contextualized embedding for each input symbol
        encoded_sequence = self.bert(**symbols)
        
        # Extract the [CLS] token representation
        cls_token_embedding = encoded_sequence.last_hidden_state[:, 0, :]  # Extract [CLS] token
        
        return cls_token_embedding.unsqueeze(1)  # Reshape to batch_size x 1 x bert_hidden_dimension

    def forward(
        self,
        symbols: Dict,
    ) -> torch.Tensor:
    	"""_summary_

        Args:
            symbols (Dict): The Dict of token specifications provided by the HuggingFace tokenizer

        Returns:
            torch.Tensor: _description_
        """
        """Forward pass."""
        # Directly get BERT output (including [CLS] token) from input symbols
        encoded_sents = self.encode_text(symbols)
        output = self.hidden_layer(encoded_sents)
        # Extract the [CLS] token representation
        cls_token_embedding = encoded_sequence.last_hidden_state[:, 0, :]  # Assuming [CLS] is at index 0
        
        # Pass through hidden layer and activation
        output = self.hidden_layer(cls_token_embedding.unsqueeze(1))
        output = self.relu(output)
        
        # Classification layer
        output = self.classifier(output)
        return self.log_softmax(output)
