<a href="https://colab.research.google.com/github/paulcodrea/reddit_humor/blob/main/2b_tf_idf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LSTM with a tf-idf embedding vector 

## Imports + Config

In [111]:
import csv
import nltk
from nltk import word_tokenize, RegexpTokenizer
from nltk.tokenize import word_tokenize
nltk.download('stopwords')
from nltk.corpus import stopwords
stopWords = set(stopwords.words('english'))
nltk.download('punkt')
from nltk.stem.porter import *
stemmer = PorterStemmer()

import numpy as np
import pandas as pd

import math
import pickle

from keras.models import Sequential, load_model
from keras.layers.core import Dense, Dropout 
from keras.layers import LSTM, Embedding, TextVectorization
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences

from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [112]:
config = {
    "learning_rate": 0.001,
    "epochs": 3, 
    "batch_size": 70,
    "test_p": 0.2,
    "val_p": 0.1,
    "LSTM_layer": [50, 100],
    "Dropout_layer": [0.15, 0.2],
    "activation": 'sigmoid',
    ##################### SAVE FOR LIVE DEMO #############################
    "model_path": './model/2b_model.h5',
    "tokenizer_path": './model/2b_tokenizer.pickle',
    "data_path": "model/2b_data.csv",
}

# Methods:

In [113]:
class LSTM_model:
  def __init__(self, path):
    self.path = path
    self.data = pd.DataFrame()
    self.max_joke_size = 0
    self.vocabulary_size = 0
    self.list_jokes_tokenized = []
    self.model = None

  def read_dataset(self):
    self.data = pd.read_csv(self.path)
    print("Data successfuly read!")
    # print("This is the first line from the data file: \n", self.data.head(1))

  def tokenise_clean_text(self):
    """
    Tokenises the data from the CSV file.
    Since the CSV has a "clean_text" column, the only required pre-processing
    is splitting it from white spaces.
    """
    arr = []
    for line in self.data['clean_text']:
      arr.append(line.split(' '))
    self.list_jokes_tokenized = [x[:(len(x) - 1)] for x in arr] # remove '' at the end
    print('Tokenisation is complete!')
    # print("This is the first item from the tokenised list of jokes:\n", self.list_jokes_tokenized[0])
    # print("This is how many jokes the data set has:\n",len(self.list_jokes_tokenized))

  def return_vocab(self) -> list:
    """
    Retuns the vocabulary of the document.
    """
    computed_vocab = []
    for line in self.list_jokes_tokenized:
      for word in line:
        computed_vocab.append(word)
    computed_vocab = list(dict.fromkeys(computed_vocab))

    print("The vocabulary has been created!")
    # print("This is the len of the vocabulary:\n ", len(computed_vocab))
    # print("This is the first token from the corpus, pre-processed: \n", computed_vocab[0])

    self.vocabulary_size = len(computed_vocab)
    return computed_vocab

  def return_terms_frequencies(self, vocabulary):
    """
    Returns the term frequency dictionary. The keys are the vocabulary words and
    the values are the frequency of the words in the whole corpus.
    """
    vocab_freq = {}
    for word in vocabulary:
      vocab_freq[word] = 0
    for joke in self.list_jokes_tokenized:
      for token in joke:
        vocab_freq[token] = vocab_freq[token] + 1
    
    print("The vocabulary with the associated frequency for each word has been generated!")
    # print("This is how many times the word 'joke' is in our corpus\n", vocab_freq["joke"])
    
    return vocab_freq

  def generate_tf(self, vocab: dict):
    """
    Returns a term frequency dictionary. The keys are the vocabulary words and 
    the values are a list that represents the frequency of the word in each joke
    divided by the length of the joke.
    """
    dict_term_freq = {}
    for word in vocab.keys():
      dict_term_freq[word] = []
    for word in vocab.keys():
      for joke in self.list_jokes_tokenized:
        word_freq = 0
        for word_index in range(len(joke)):
          if word == joke[word_index]:
            word_freq = word_freq + 1
        dict_term_freq[word].append(word_freq / len(joke))

    print("Term frequency matrix/dictionary was generated!")
    # print("This is the term frequency vector for word 'joke':\n", dict_term_freq["joke"])

    return dict_term_freq

  def document_frequency(self, vocabulary: dict):
    """
    Returns a document frquency dictionary. The keys are all words in the 
    vocabulary and the values are a list of jokes the word appears in.
    """
    doc_freq = {}
    for word in vocabulary.keys():
      doc_freq[word] = []
      for joke_index in range(len(self.list_jokes_tokenized)):
        if word in self.list_jokes_tokenized[joke_index]:
          doc_freq[word].append(joke_index)

    print("A list of all documents in which the words from our vocab appear in has been generated!")
    # print("The list of all documents that contain the word 'Joke' looks like this:\n",doc_freq["joke"] )

    return doc_freq

  def generate_idf(self, document_frequencies):
    """
    Returns an inverse document frequency dictionary. The keys are the vocabulary
    words and the values are the corresponding idfs.
    """
    idf = {}
    for word in document_frequencies.keys():
      idf[word] = math.log(len(self.list_jokes_tokenized)/len(document_frequencies[word]))

    print("A dictionary with each word's tf-idf values has been generated!")

    return idf

  def generate_tf_idf(self, vocab_frequencies, tf, idf):
    """
    Returns a tf-idf dictionary. The keys are the vocabulary words and the 
    values are the corresponding tf-idfs.
    """
    dict_tf_idf = {}

    for word in vocab_frequencies.keys():
      dict_tf_idf[word] = []
      for tf_index in range(len(tf[word])):
        dict_tf_idf[word].append(tf[word][tf_index] * idf[word])

    print("The tf-idf values have beed generated!")
    # print("this is how the tf-idf vector looks like for the word 'joke':\n",dict_tf_idf["joke"])
    # print("This is the length of each tf-idf vector. Each value represent's the word's tf-idf for each sentence:\n", len(dict_tf_idf["joke"]))
    
    return dict_tf_idf

  def joke_as_tf_idf(self, dict_tf_idf):
    """
    Returns a list of jokes represented by the words' tf-idf values.
    """
    jokes_as_tf_idf = []

    for joke_index in range(len(self.list_jokes_tokenized)):
      joke = self.list_jokes_tokenized[joke_index]
      tf_idf_list = []
      for word in joke:
        tf_idf_list.append(dict_tf_idf[word][joke_index])
      tf_idf = np.asarray(tf_idf_list)
      jokes_as_tf_idf.append(tf_idf)

      # calculate max_joke size to use in the padding step
      if len(joke) > self.max_joke_size:
        self.max_joke_size = len(tf_idf_list)
        
    jokes_as_tf_idf = np.asarray(jokes_as_tf_idf)

    print("The jokes have been embedded by their tf-idf values!")
    print("This is the max joke length for our corpus: ", self.max_joke_size)
    # print("This is how the first joke looks like before the embedding step: \n", self.list_jokes_tokenized[0])
    # print("This is how the tf-idf embedding of the first joke looks like:\n",jokes_as_tf_idf[0])

    return jokes_as_tf_idf

  def pad_jokes(self, jokes_as_tf_idf_np):
    """
    Returns the input padded to the max_length of the corpus. The padding will
    be at the beginning of the input, and represented by the value 0.
    """
    new_arr = np.zeros((len(jokes_as_tf_idf_np), self.max_joke_size))
    for idx, joke in enumerate(jokes_as_tf_idf_np):
      len_joke = len(joke)
      joke_x = jokes_as_tf_idf_np[idx]
      new_arr[idx] = np.append([0] * (self.max_joke_size - len_joke), joke)

    print("The jokes (input) have been padded to the same size, ", self.max_joke_size)
    print("This is the shape of the input:\n", new_arr.shape)

    return new_arr

  def generate_train_test_data(self, jokes_list):
    """
    Returns the training and testing data. The data has been shuffled in the
    pre-processing step, so the function simply returns 80/20 training/testing
    data.
    """
    train_max_index = int(len(jokes_list) * 0.8)
    X_train = np.zeros(shape=(train_max_index, self.max_joke_size))
    X_test = np.zeros(shape=(len(jokes_list) - train_max_index, self.max_joke_size))
    index_returned_data = 0
    
    # training data
    for index in range(0, train_max_index):
      X_train[index_returned_data] = jokes_list[index]
      index_returned_data = index_returned_data + 1
    y_train = self.data["funny"][:train_max_index]
    # testing data
    index_returned_data = 0
    for index in range(train_max_index, len(jokes_list)):
      X_test[index_returned_data] = jokes_list[index]
      index_returned_data = index_returned_data + 1
    y_test = self.data["funny"][train_max_index:]
    
    return X_train, X_test, y_train, y_test

  def LSTM_model(self, X_train, X_test, y_train, y_test):
    """
    Constructs and evaluates the LSTM model.
    """

    print("X_train:", len(X_train))
    print("X_test:", len(X_test))
    print("y_train:", len(y_train))
    print("y_test:", len(y_test))

    model = Sequential()
    model.add(Embedding(input_dim=self.vocabulary_size, output_dim=self.max_joke_size, input_length=int(X_train.shape[1])))
    model.add(LSTM(50, return_sequences=True))
    model.add(LSTM(10))

    model.add(Dropout(0.5))
    model.add(Dense(units=1, activation=config['activation']))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

    model.fit(X_train, y_train, epochs=config['epochs'], batch_size=config['batch_size'], verbose='auto', validation_split=config['val_p'])
    
    print("The model completed the training step!")

    # Evaluate the model
    scores = model.evaluate(X_test, y_test)
    print("Accuracy: %.2f%%" % (scores[1] * 100))

    # Print Precision and Recall
    y_pred = model.predict(X_test)
    y_pred = np.round(y_pred)
    precision = precision_score(y_test, y_pred) * 100
    recall = recall_score(y_test, y_pred) * 100
    f1 = f1_score(y_test, y_pred) * 100
    print("Precision: %.2f%%" % (precision))
    print("Recall: %.2f%%" % (recall))
    print("F1-Score: %.2f%%" % (f1))

    # self.save_data(scores[1], precision, recall, f1)


  def save_data(self, accuracy, precision, recall, f1):
    """
    Saves the data.
    """
    # Add in dataframe master_df max_len, accuracy, precision, recall, f1-score
    ret = pd.DataFrame(columns=['max_len', 'accuracy', 'precision', 'recall', 'f1-score'])
    ret.loc[0] = [self.max_joke_size, accuracy, precision, recall, f1]
    ret.to_csv(config['data_path'])
    
    # self.model.save(config['model_path']) # save the model

    # save the tokenizer
    # with open(config['tokenizer_path'], 'wb') as handle:
    #     pickle.dump(self.tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)

# Loading data from the dataset:

In [114]:
path = 'drive/MyDrive/Humour_Detection_Dataset/final_jokes(2918).csv'

joke_model = LSTM_model(path)
joke_model.read_dataset()

Data successfuly read!


# Pre-Processing

The functions below enable the data from the csv to be transformed in variables used to compute the tf and idf.

In [115]:
joke_model.tokenise_clean_text()
vocabulary = joke_model.return_vocab()

Tokenisation is complete!
The vocabulary has been created!


# Computing Term Frequency (TF)

Create TF for each word in the vocabulary:

The number of times a word appears in a document divded by the total number of words in the document. Every document has its own term frequency.

In [116]:
# TF
# the frequency of each word from the vocabulary in the corpus 
vocab_freq = joke_model.return_terms_frequencies(vocabulary)
# Calculate tf number of times a word appears in a doc / total number of words in the doc
tf = joke_model.generate_tf(vocab_freq)

The vocabulary with the associated frequency for each word has been generated!
Term frequency matrix/dictionary was generated!


# Computing the Inverse Document Frequency (IDF)

Calculating IDF for each word:

The log of the number of documents divided by the number of documents that contain the word w. The IDF of a word is computed once for all documents.

In [117]:
# IDF
# create a list of all the docs in which the vocab words occur -> to be used in idf
dict_document_freq = joke_model.document_frequency(vocab_freq)
idf = joke_model.generate_idf(dict_document_freq)

A list of all documents in which the words from our vocab appear in has been generated!
A dictionary with each word's tf-idf values has been generated!


# Computing the TF-IDF vectors




TF-IDF is simply the TF multiplied by IDF.

The output will be a list of tf-idf values (word x joke)

In [118]:
# TF IDF
dict_tf_idf = joke_model.generate_tf_idf(vocab_freq, tf, idf)

The tf-idf values have beed generated!


Representing each joke by an embedding vector.


In [119]:
# embed
jokes_as_tf_idf_np = joke_model.joke_as_tf_idf(dict_tf_idf)

The jokes have been embedded by their tf-idf values!
This is the max joke length for our corpus:  134




# Creating training and testing datasets

Pad the jokes. 



In [120]:
# pad
jokes_as_tf_idf_np = joke_model.pad_jokes(jokes_as_tf_idf_np)

The jokes (input) have been padded to the same size,  134
This is the shape of the input:
 (2918, 134)


Split the data in training - 80% and testing - 20%.

In [121]:
X_train, X_test, y_train, y_test = joke_model.generate_train_test_data(jokes_as_tf_idf_np)

# Training and saving the model

In [122]:
joke_model.LSTM_model(X_train, X_test, y_train, y_test)

X_train: 2334
X_test: 584
y_train: 2334
y_test: 584
Epoch 1/3
Epoch 2/3
Epoch 3/3
The model completed the training step!
Accuracy: 60.10%
Precision: 60.87%
Recall: 68.18%
F1-Score: 64.32%
