# Sentence Emotion Detection Model

This notebook contains code we used to train our model that uses embedding and LSTM sentiment analysis to predict the emotion of a journal entry (text sentence)

## Preperation

Install SpaCy and import relevant libraries


In [None]:
!pip install --upgrade torch==1.7.1 torchtext==0.8.1 torchvision==0.8.2

In [None]:
!pip install -U pip setuptools wheel
!pip install -U spacy
!python -m spacy download en_core_web_sm

In [None]:
import torch, torchtext
from torch import nn, optim, functional as F
import pandas as pd, csv
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import pdb
import random

Import dataset (already cleaned) from dropbox link

In [None]:
!wget -O text.csv https://www.dropbox.com/s/iulhdbo1yc8farq/Emotion_final.csv?dl=0

In [None]:
text = pd.read_csv('/content/text.csv')

In [None]:
text

Sentiments into an array for later use

In [None]:
text.Emotion.unique()

In [None]:
sentiment = ['sadness', 'anger', 'love', 'surprise', 'fear', 'happy']

## Dataset

Define Dataset for text and split into train/test subsets

In [None]:
class Sentences(torch.utils.data.Dataset):
    def __init__(self, fn):
        lengths = []
        convert = { u: n for n, u in enumerate(fn['Emotion'].unique()) }
        fn['Emotion'] = fn['Emotion'].apply(lambda u: convert[u])               # 12 unique words should be assigned integers starting from 0
        tokenizer = torchtext.data.utils.get_tokenizer('spacy', 'en_core_web_sm')# tokenizer using spaCy
        for i in range(len(fn['Text'])):
          lengths.append(len(tokenizer(fn['Text'].iat[i].strip())))                   # store the number of tokens in each sentence to beused in get item
        string = ' '.join([fn['Text'].iat[i].strip() 
                           for i in range(len(fn['Text']))])                  # combine everything into one single string
        toks = tokenizer(string)                                                # tokenize the single string

        self.vocab = torchtext.vocab.build_vocab_from_iterator([toks])
        self.sentiment = fn['Emotion'].values
        self.text = fn['Text'].values
        self.length = lengths
        self.toks = torch.LongTensor([self.vocab[tok] for tok in toks])

    def __len__(self):
        return len(self.length)

    def __getitem__(self, i):
        sum = 0
        for x in range(i):
          sum += self.length[x]
        return (self.sentiment[i], self.toks[sum: sum + self.length[i]])          # return the sentiment and related tokns for a specific tweet

In [None]:
ds_full = Sentences(text)
n_train = int(0.8 * len(ds_full))
n_test = len(ds_full) - n_train
rng = torch.Generator().manual_seed(291)
ds_train, ds_test = torch.utils.data.random_split(ds_full, [n_train, n_test], rng)

Check outputs if the outputs are what we expect (tensor with integer corresponding to label and tensor of integers corresponding to tokens which can be converted to a sentence)

In [None]:
print(ds_full[100])

In [None]:
print(ds_full[100][0])

In [None]:
sentiment[ds_test[100][0]]

In [None]:
print(' '.join([ds_full.vocab.itos[x] for x in ds_full[100][1]]))

In [None]:
len(ds_full.toks)

## Model

Model with embedding and LSTM

In [None]:
class SentenceModel(nn.Module):                                                 # takes in a sentence, and outputs predicted sentiment
      def __init__(self, vocab_size, embedding_dim, lstm_dim, 
                   n_cats, n_layers = 2, drop_prob = 0.5):
        super().__init__()                                                      #constructor for parent class
        self.embedding = torch.nn.Embedding(vocab_size, embedding_dim)          #use word embeddings 
        self.lstm = torch.nn.LSTM(embedding_dim, lstm_dim, n_layers,
                                  dropout=drop_prob, batch_first=True)          #LSTM layer
        self.linear = nn.Linear(lstm_dim, n_cats)
        nn.init.xavier_uniform_(self.embedding.weight.data)
        nn.init.xavier_uniform_(self.linear.weight.data)
        
      def forward(self, text):
        emb = self.embedding(text)
        lstm_out, _ = self.lstm(emb)
        out = self.linear(lstm_out)
        return torch.mean(out, dim=1)                                           # certain dimensions required so take mean to reduce them down

Test and Train loops

In [None]:
device = torch.device('cpu')

def run_test(model, ds, crit):
    preds = []                                                                  # array to store predictions
    batch_size = 1                                                              # change batch size here
    model.eval()
    total_loss, total_acc = 0, 0
    ldr = torch.utils.data.DataLoader(ds)
    for labs, txts in ldr:                                                
        labs, txts = labs.to(device), txts.to(device)
        with torch.no_grad():
            outs = model(txts)
            loss = crit(outs, labs)
            total_loss += loss.item()
            total_acc += (outs.argmax(1) == labs).sum().item()
            preds.append(outs.argmax(1))                                        # append all the predictions to an array
    return total_loss / len(ds), total_acc / len(ds), preds, batch_size         # added array return value 'preds' and batchsize

def run_train(model, ds, crit, opt, sched):
    model.train()
    total_loss, total_acc = 0, 0
    ldr = torch.utils.data.DataLoader(ds)
    for labs, txts in ldr:          
        opt.zero_grad()
        labs, txts = labs.to(device), txts.to(device)
        outs = model(txts)                                                      
        loss = crit(outs, labs)
        loss.backward()
        opt.step()
        total_loss += loss.item()
        total_acc += (outs.argmax(1) == labs).sum().item()
    sched.step()
    return total_loss / len(ds), total_acc / len(ds)


def run_all(model, test_ds, train_ds, crit, opt, sched, n_epochs=10, early_stop=False):
    max_test_acc = 0;
    for epoch in tqdm(range(n_epochs), desc='epochs'):
        train_loss, train_acc = run_train(model, train_ds, crit, opt, sched)
        test_loss, test_acc, _, _ = run_test(model, test_ds, crit)
        tqdm.write(f'epoch {epoch}   train loss {train_loss:.6f} acc {train_acc:.4f}   test loss {test_loss:.6f} acc {test_acc:.4f}')  
        if (early_stop): 
          if (test_acc >= max_test_acc):
            max_test_acc = test_acc
          else:
            print("EARLY STOPPED")
            break

## Training

Train model by adjusting the hyperparameters (optimizer, scheduler, learning rate, step size, gamma, dimensions etc.) to improve the model's test accuracy

In [None]:
#TEST 1

model = SentenceModel(len(ds_full.vocab), 32, 1, len(text.Emotion.unique()))
device = torch.device('cuda:0') #added GPU since CPU too slow (enable that in notebook settings)
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 10, gamma=0.1)

run_all(model, ds_test, ds_train, crit, opt, sched, 10)

In [None]:
#TEST 2

model = SentenceModel(len(ds_full.vocab), 32, 1, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=0.1) #step size: 10->1

run_all(model, ds_test, ds_train, crit, opt, sched, 20)

In [None]:
#TEST 3

model = SentenceModel(len(ds_full.vocab), 32, 64, len(text.Emotion.unique())) #lstm_dim: 1 -> 64
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=0.1)

run_all(model, ds_test, ds_train, crit, opt, sched, 20)

In [None]:
#TEST 4

model = SentenceModel(len(ds_full.vocab), 32, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1) #gamma: 0.1 -> 1

run_all(model, ds_test, ds_train, crit, opt, sched, 20)

In [None]:
#TEST 5

model = SentenceModel(len(ds_full.vocab), 32, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=0.0001) #gamma: 1 -> 0.0001

run_all(model, ds_test, ds_train, crit, opt, sched, 20)

In [None]:
#TEST 6

model = SentenceModel(len(ds_full.vocab), 32, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=3.0) #lr: 1.0 -> 3.0
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 20)

In [None]:
#TEST 7

model = SentenceModel(len(ds_full.vocab), 32, 128, len(text.Emotion.unique())) #lstm_dim: 64 -> 128
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 20)

In [None]:
#TEST 8

model = SentenceModel(len(ds_full.vocab), 16, 64, len(text.Emotion.unique())) #embedding_dim: 32 -> 16
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 20)

In [None]:
#TEST 9

model = SentenceModel(len(ds_full.vocab), 64, 64, len(text.Emotion.unique())) #embedding_dim: 16 -> 64
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 30)

In [None]:
#TEST 10

model = SentenceModel(len(ds_full.vocab), 128, 64, len(text.Emotion.unique())) #embedding_dim: 64 -> 128
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=1.0)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 30)

In [None]:
#TEST 11

model = SentenceModel(len(ds_full.vocab), 64, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=0.1) #lr: 1 -> 0.1 (counteract the loss increase over time)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 20)

In [None]:
#TEST 12

model = SentenceModel(len(ds_full.vocab), 64, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=0.001) #lr: 0.1 -> 0.001
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 30)

In [None]:
#TEST 13

model = SentenceModel(len(ds_full.vocab), 64, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.Adagrad(model.parameters(), lr=0.1) #optimizer: SGD -> Adagrad
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 30)

In [None]:
#TEST 14

model = SentenceModel(len(ds_full.vocab), 64, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.Adagrad(model.parameters(), lr=0.001) #lr: 0.1 -> 0.001
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

run_all(model, ds_test, ds_train, crit, opt, sched, 30)
# Testing converges at 60 epochs at 56% accuracy and 1.25 test loss

## Example Outputs

Print the desired number of outputs using code below

In [None]:
def print_outputs(correct_count=5, incorrect_count=5):
  _, _, preds, _ = run_test(model, ds_test, crit)

  # setup variables
  pred = []
  correct = []
  correct_prediction = []
  correct_actual = []
  incorrect = []
  incorrect_prediction = []
  incorrect_actual = []
  rand_corr_idx = []
  rand_incorr_idx = []

  # map results into appropriate arrays
  for i in range(len(preds)):
      pred.append(preds[i].item())                                              # transfer predictions from tensor to array

  for x in range(len(ds_test)):                                                 # compare every prediction with the actual sentiment, move the text to their respective arrays depending on result
    if pred[x] == ds_test[x][0]:
      correct.append(ds_test[x])                                                # correctly predicted senteces move to correct array
      correct_prediction.append(pred[x])                                        # also store prediction
      correct_actual.append(ds_test[x][0])                                      # place actual labels into correct_actual array
    else:
      incorrect.append(ds_test[x])                                              # same process as correct labels, but with incorrect predictions
      incorrect_prediction.append(pred[x])                                        
      incorrect_actual.append(ds_test[x][0])


  # choose random examples from results
  if (correct_count > len(correct)):                                            # make sure no index out of bounds
    correct_count = len(correct)  

  if (incorrect_count > len(incorrect)):
    incorrect_count = len(incorrect)

  for c in range(correct_count):                                                # pick random examples from correct arr
    index = random.randint(0,len(correct)-1)
    while (index in rand_corr_idx):                                             # make sure no duplicates
      index = random.randint(0,len(correct)-1)
    rand_corr_idx.append(index)

  for c in range(incorrect_count):                                              # pick random examples from incorrect arr
    index = random.randint(0,len(incorrect)-1)
    while (index in rand_incorr_idx):
      index = random.randint(0,len(incorrect)-1)
    rand_incorr_idx.append(index)

  # output results
  print("CORRECT PREDICTIONS:", len(correct), "\n")                             # print correct predictions, with their actual labels and sentence
  for y in range(correct_count):
    print("prediction: ", sentiment[correct_prediction[rand_corr_idx[y]]])  
    print("actual:     ", sentiment[correct_actual[rand_corr_idx[y]]])
    print("sentence:   ", ' '.join([ds_full.vocab.itos[x] for x in correct[rand_corr_idx[y]][1]]), "\n")

  print('===================================================================\n')

  print("INCORRECT PREDICTIONS:", len(incorrect), "\n")                         # print incorrect predictions, with their actual labels and sentence
  for z in range(incorrect_count):
    print("prediction: ", sentiment[incorrect_prediction[rand_incorr_idx[z]]])  
    print("actual:     ", sentiment[incorrect_actual[rand_incorr_idx[z]]])
    print("sentence:   ", ' '.join([ds_full.vocab.itos[x] for x in incorrect[rand_incorr_idx[z]][1]]), "\n")

In [None]:
num_corr_out = 10
num_incorr_out = 10

print_outputs(num_corr_out, num_incorr_out)

## Conclusion

With the use of the embedding and LSTM model, I was able to achieve 91% accuracy. The hyperparameters listed below performed the best from my testing for the Emotions_final dataset. Due to the small amount of testing done so far, there is a high possibility there is a more optimized model for the respected dataset. From test 11 shown, the test loss increases over time, so decreasing test loss by lowering the learning rate or using fewer epochs will probably improve the model, which will be taken into consideration for future testing. Finally, looking at the mismatched results, some of the sentences and their corresponding labels are difficult for even a human to distinguished while other sentiments may not match the sentence either, so further cleaning of our dataset could be another strategy.

```
#TEST 11

model = SentenceModel(len(ds_full.vocab), 64, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=0.1)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)
```



# DO THIS FIRST IF YOU DON'T HAVE THE **DATASET**

Also you must first run most of the code above for the model and stuff to work.

In [None]:
!pip install -q kaggle
from google.colab import files 

In [None]:
files.upload()
#here upload "kaggle.json"

In [None]:
!mkdir ~/.kaggle
!cp /content/kaggle.json ~/.kaggle/kaggle.json
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d yamaerenay/spotify-dataset-19212020-160k-tracks

In [None]:
!unzip /content/spotify-dataset-19212020-160k-tracks.zip

# **DATASET CODE HERE**

upload -> sentence_model_state_dict (the saved fully trained model) 
---


In [None]:
model = SentenceModel(len(ds_full.vocab)+2, 64, 64, len(text.Emotion.unique()))
device = torch.device('cuda:0')
model.to(device);
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=0.1)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

In [None]:
# Model class must be defined somewhere
model.load_state_dict(torch.load("/content/sentence_model_state_dict.pth"))
model.eval()

In [None]:
ldr = torch.utils.data.DataLoader(ds_test) 
ldr.dataset[26][1]

In [None]:
device = torch.device('cuda:0')
model.to(device)
crit = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=0.1)
sched = optim.lr_scheduler.StepLR(opt, 1, gamma=1)

with torch.no_grad():
    print('Tensor with ground truth label: {0}'.format(ldr.dataset[26]))
    out_incorrect = model(ldr.dataset[26][1].unsqueeze(0).to(device))
    print(out_incorrect)
    print("Predicted: {0}".format(out_incorrect.argmax(1)))
    print("Predicted correctly: {0}".format(out_incorrect.argmax(1) == ldr.dataset[26][0]))
    print("Tensor converted to text: " + ' '.join([ds_full.vocab.itos[x] for x in ldr.dataset[26][1]]))

In [None]:
def get_weights(tensor=None, model=None): #tensor should have shape ([x, y, z, ...]) (1 dim), NOT ([[x, y, z, ...]]), the function itself unsqueezes the input tensor for you
  tensor = tensor.unsqueeze(0)
  device = torch.device('cuda:0')
  model.to(device)
  m = nn.ReLU()
  s = nn.Softmax(dim=1)
  with torch.no_grad():
    update_tensor = model(tensor.to(device))
    relud_logged = torch.log(m(update_tensor))
    print(relud_logged)
    print("Predicted: {0}".format(update_tensor.argmax(1)))
    print("Tensor converted to text: " + ' '.join([ds_full.vocab.itos[x] for x in tensor.squeeze()]))
    return s(relud_logged).squeeze()
  
  # this function calculates the appropriate probability distribution for every track based on input "tensor" and our model

In [None]:
x = 100

print(ldr.dataset[x])
print("\n")
weight = get_weights(ldr.dataset[x][1], model)

In [None]:
print(weight)
weight[2].item()

In [None]:
tracks = pd.read_csv('/content/tracks.csv')

In [None]:
tracks.drop(inplace=True, columns=['duration_ms','key', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'tempo', 'time_signature', 'loudness'])

In [None]:
tracks.sort_values(by=['popularity'], ascending=False)

sentiment = ['sadness', 'anger', 'love', 'surprise', 'fear', 'happy']

In [None]:
import numpy as np

In [None]:
# list_scores = []
# in = ('sadness', 'anger', 'love', 'surprise', 'fear', 'happy')
# for **track** in list of tracks over 65 popularity:
#    score = POPULARITY_WEIGHT * (track.popularity / 100) + in.sadness * (1 - track.valence) + in.anger * track.energy + in.love * (mean(track.energy + track.dancability)) + in.surprise * (1 - track.energy) + in.fear * valence + in.happy * (mean(track.danceability + track.energy))
#    list_scores.append(score)

# np.random.choice(tracks, list_scores)

In [None]:
new_df = tracks[tracks.popularity >=65]
new_df

In [None]:
def normalize(p): # makes sure the distribution probability list adds to 1
    if sum(p) != 1.0:
        p = np.asarray(p)*(1.0/sum(p))
    return p

In [None]:
#user input is x -> we have to convert user input string into tensor "x"
#so input would be "input = get_weights(x, model)"

POPULARITY_WEIGHT = 2 # can be changed 
NUM_TRACK_OPTIONS = 5 # can be changed

id_tracks = new_df["id"].values.tolist()
scores_id = []
input = get_weights(ldr.dataset[x][1], model)
for track in new_df.itertuples(index=False):
    score = POPULARITY_WEIGHT*(track.popularity/100) + input[0].item()*(1-track.valence) + input[1].item()*track.energy + input[2].item()*((track.energy + track.danceability)/2) + input[3].item()*(1-track.energy) + input[4].item()*track.valence + input[5].item()*((track.danceability + track.energy)/2)
    scores_id.append([score, track.id])

distribution = []
top_track_ids = []
s = sorted(scores_id, reverse=True)
s = s[0:NUM_TRACK_OPTIONS]

for score, id in s:
  distribution.append(score)
  top_track_ids.append(id)
print("\n")
id = np.random.choice(top_track_ids, p=normalize(distribution)) # picks a random track across a distribution generated by the scores
print("Distribution: {0}".format(input))
print("TRACK LINK: https://open.spotify.com/track/" + id)
actual_track = new_df[new_df['id']==id]
print(actual_track)