In [1]:
import numpy as np
from collections import defaultdict
import random
from utils import emotion_scores
import matplotlib.pyplot as plt
from tqdm import tqdm
import pickle

  return self.fget.__get__(instance, owner)()


In [3]:
class BigramLM:
    def __init__(self):
        self.vocab = set()
        self.bigram_counts = defaultdict(lambda: defaultdict(int))
        self.unigram_counts = defaultdict(int)
        self.bigram_probs = None
        self.beta_values = None
        self.emotion_dict = {'sadness': 0, 'joy': 1, 'love': 2, 'anger': 3, 'fear': 4, 'surprise': 5}

    def learn_from_dataset(self, dataset):
        for sentence in dataset:
            tokens = sentence.split()                        
            for i in range(len(tokens) - 1):
                word1, word2 = tokens[i], tokens[i + 1]
                self.vocab.add(word1)
                self.vocab.add(word2)
                self.bigram_counts[word1][word2] += 1
                self.unigram_counts[word1] += 1                            
                
        self.vocab = list(self.vocab)
        print(f"Vocabulary size: {len(self.vocab)}")

    def calculate_beta_values(self):
            num_words = len(self.vocab)
            self.beta_values = np.zeros((num_words, num_words, 6))
            for i, word1 in tqdm(enumerate(self.vocab)):
                if word1 not in self.bigram_counts.keys():
                    continue
                for j, word2 in enumerate(self.vocab):
                    if word2 not in self.bigram_counts[word1].keys():
                        continue
                    emotions = emotion_scores(word1 + " " + word2)
                    self.beta_values[i][j] = np.array([emotions[k]['score'] for k in range(6)])
                    
    def calculate_bigram_probs(self):
        num_words = len(self.vocab)
        self.bigram_probs = np.zeros((num_words, num_words))        
        
        for i, word1 in tqdm(enumerate(self.vocab)):
            for j, word2 in enumerate(self.vocab):
                if self.unigram_counts[word1] > 0:
                    self.bigram_probs[i][j] = float(self.bigram_counts[word1][word2]) / float(self.unigram_counts[word1])
                    # if self.bigram_probs[i][j] == 0:
                    #     continue
                    # emotions = emotion_scores(word1 + " " + word2)
                    # self.beta_values[i][j] = np.array([emotions[k]['score'] for k in range(6)])

                    
    def calculate_bigram_probs_laplace(self):
        num_words = len(self.vocab)
        self.bigram_probs = np.zeros((num_words, num_words))

        for i, word1 in enumerate(self.vocab):
            for j, word2 in enumerate(self.vocab):
                self.bigram_probs[i][j] = (self.bigram_counts[word1][word2] + 1) / (self.unigram_counts[word1] + num_words)

                # if self.bigram_probs[i][j] == 0:
                #     continue
                # emotions = emotion_scores(word1 + " " + word2)
                # self.beta_values[i][j] = np.array([emotions[k]['score'] for k in range(6)])
                
    
    def calculate_bigram_probs_kneser_ney(self, discount=0.75):
        num_words = len(self.vocab)
        self.bigram_probs = np.zeros((num_words, num_words))

        continuation_counts = defaultdict(set)
        # Calculate continuation counts
        for word1, word2_dict in self.bigram_counts.items():
            for word2 in word2_dict:
                continuation_counts[word2].add(word1)

        # Total number of word1 that can precede any word2        
        total_continuations = {word2: len(word1s) for word2, word1s in continuation_counts.items()}

        for i, word1 in tqdm(enumerate(self.vocab)):
            sum_adjusted_counts = sum(max(self.bigram_counts[word1][word2] - discount, 0) for word2 in self.vocab)
            for j, word2 in enumerate(self.vocab):
                if word2 in total_continuations.keys():
                    adjusted_count = max(self.bigram_counts[word1][word2] - discount, 0)
                    continuation_prob = total_continuations[word2] / sum(total_continuations.values()) if sum(total_continuations.values()) > 0 else 0
                    lower_order_weight = (discount * continuation_prob) / self.unigram_counts[word1] if self.unigram_counts[word1] > 0 else 0

                    self.bigram_probs[i][j] = adjusted_count / self.unigram_counts[word1] + lower_order_weight if self.unigram_counts[word1] > 0 else 0

                    # if self.bigram_probs[i][j] == 0:
                    #     continue
                    # emotions = emotion_scores(word1 + " " + word2)
                    # self.beta_values[i][j] = np.array([emotions[k]['score'] for k in range(6)])
                    

    # def generate_sentence(self, emotion, max_length=20):
    #     sentence = []
    #     current_word = random.choice(['i', 'im'])
    #     sentence.append(current_word)

    #     for i in range(max_length):
    #         current_word = self.generate_next_word(current_word, emotion)
    #         if current_word == "":
    #             break
    #         sentence.append(current_word)
        
    #     return " ".join(sentence)

    # def generate_next_word(self, current_word, emotion):
    #     if current_word not in self.vocab:
    #         raise ValueError(f"{current_word} not found in the vocabulary.")

    #     word_index = self.vocab.index(current_word)
    #     next_word_probs = self.bigram_probs[word_index] + self.beta_values[word_index, :, self.emotion_dict[emotion]]

    #     # next_word_index = list(next_word_probs).index(max(next_word_probs))
    #     try:
    #         next_word_index = random.choices(range(len(next_word_probs)), weights=next_word_probs)[0]
    #     except:
    #         return ""

    #     next_word = list(self.vocab)[next_word_index]

    #     return next_word

In [4]:
corpus = open('../Dataset/corpus.txt')
dataset = []
for i in corpus.readlines():
    dataset.append('<SOS> ' + i + ' <EOS>')

In [5]:
model = BigramLM()
model.learn_from_dataset(dataset)

Vocabulary size: 5431


In [16]:
pickle.dump(model.vocab, open('Checkpoints/vocab.pkl', 'wb'))

In [6]:
model.calculate_beta_values()

5431it [08:38, 10.47it/s]


In [7]:
pickle.dump(model.beta_values, open('Checkpoints/beta_values.pkl', 'wb'))

In [8]:
model.calculate_bigram_probs()
pickle.dump(model.bigram_probs, open('Checkpoints/bigram_probs.pkl', 'wb'))

5431it [00:16, 321.77it/s]


In [None]:
model.calculate_bigram_probs_kneser_ney()
pickle.dump(model.bigram_probs, open('Checkpoints/bigram_probs_kneser_ney.pkl', 'wb'))

In [10]:
model.calculate_bigram_probs_laplace()
pickle.dump(model.bigram_probs, open('Checkpoints/bigram_probs_laplace.pkl', 'wb'))

In [319]:
# generate 50 sentences for each emotion ansd write to a file with name gen_emotion.txt
for emotion in ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']:
    with open('Test Samples/coeff_0.1_no/gen_' + emotion + '.txt', 'w') as f:
        for i in range(50):
            f.write(model.generate_sentence(emotion) + '\n')

In [326]:
count = 0
with open('Test Samples/coeff_0.1_no/gen_sadness.txt', 'r') as f:
    for line in f.readlines():
        score = emotion_scores(line)[model.emotion_dict['sadness']]['score']
        
        if(score > 0.5):
            count += 1

print('sadness: ' + str(count))

count = 0
with open('Test Samples/coeff_0.1_no/gen_joy.txt', 'r') as f:
    for line in f.readlines():
        score = emotion_scores(line)[model.emotion_dict['joy']]['score']
        
        if(score > 0.5):
            count += 1

print('joy: ' + str(count))

count = 0
with open('Test Samples/coeff_0.1_no/gen_love.txt', 'r') as f:
    for line in f.readlines():
        score = emotion_scores(line)[model.emotion_dict['love']]['score']
        
        if(score > 0.5):
            count += 1

print('love: ' + str(count))

count = 0
with open('Test Samples/coeff_0.1_no/gen_anger.txt', 'r') as f:
    for line in f.readlines():
        score = emotion_scores(line)[model.emotion_dict['anger']]['score']
        
        if(score > 0.5):
            count += 1

print('anger: ' + str(count))

count = 0
with open('Test Samples/coeff_0.1_no/gen_fear.txt', 'r') as f:
    for line in f.readlines():
        score = emotion_scores(line)[model.emotion_dict['fear']]['score']
        
        if(score > 0.5):
            count += 1

print('fear: ' + str(count))

count = 0
with open('Test Samples/coeff_0.1_no/gen_surprise.txt', 'r') as f:
    for line in f.readlines():
        score = emotion_scores(line)[model.emotion_dict['surprise']]['score']
        
        if(score > 0.5):
            count += 1

print('surprise: ' + str(count))


        


sadness: 40
joy: 40
love: 36
anger: 30
fear: 33
surprise: 36
