# **LSTM Model**

# **Data Format:**
# **Columns: Text, Label**

|  TEXT  |   LABEL   | 
|--------|-----------|
| TEXT_1 |  LABEL_1  |
| TEXT_2 |  LABEL_1  |
| TEXT_3 |  LABEL_2  |

# **Load Data**

In [None]:
from google.colab import files
files.upload()

In [None]:
import pandas as pd
csv_path = ""
data = pd.read_csv(csv_path)

# **Pre-Processing Columns**

In [None]:
data = data.drop(columns=['segment', 'speaker'])
data.columns = ['text', 'label']
data.head(5)

In [None]:
data.label.value_counts()

# **Reduce Data**

In [None]:
df = data.copy()

In [None]:
import random

def reduceData(df, n=10000000):
    dic = df.groupby(by="label").groups
    selected_texts = []
    selected_labels = []
    selected_id = []
    for k in dic.keys():
        if (len(dic[k]) > n):
            dic[k] = random.sample(list(dic[k]), n)
        for i in dic[k]:
            selected_labels.append(k)
            selected_texts.append(df.text[i])
    return pd.DataFrame(data={"text": selected_texts, "label": selected_labels})

df = reduceData(df, 500)

In [None]:
df.label.value_counts()

# **Text Preprocessing**

In [None]:
import functools
from keras.preprocessing.sequence import pad_sequences

def vectorizer(texts):
    dic = {}
    r = []
    count = 1
    for t in texts:
        text = []
        for w in t.split(" "):
            if w in dic:
                text.append(dic[w])
            else:
                dic[w] = count
                text.append(dic[w])
                count += 1
        r.append(text)
    return r, dic

def textPreprocessing(texts):
    texts, dic = vectorizer(texts)
    vocab_size = len(dic.keys())
    max = len(functools.reduce(lambda a, b: a if len(a) > len(b) else b, texts))
    texts = pad_sequences(texts, maxlen = max, padding= "pre")
    return texts, vocab_size, max

texts, vocab_size, max = textPreprocessing(df.text.values)

# **Labels Preprocessing**

In [None]:
import numpy as np
from sklearn.preprocessing import OneHotEncoder as OHE

def labelsPreprocessing(labels):
    encoder = OHE().fit(np.array(labels).reshape(-1,1))
    labels = encoder.transform(np.array(labels).reshape(-1,1)).toarray()
    return labels, encoder

labels, encoder = labelsPreprocessing(df.label.values)

# **Training Model**

In [None]:
from tensorflow.keras.layers import Dense, LSTM, BatchNormalization, Dropout, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy as CC
from tensorflow.keras.activations import relu, softmax
from tensorflow.keras.initializers import he_uniform, glorot_uniform
from tensorflow.keras.metrics import AUC
from tensorflow.keras import Model
from tensorflow.keras.regularizers import l2


class LSTMModel(object):
    
    def build_model(self, input_dim, output_shape, steps, dropout_rate, kernel_regularizer, bias_regularizer):
        input_layer= Input(shape=(steps, input_dim))
        
        #make lstm_layer
        lstm = LSTM(units= steps)(input_layer)

        dense_1 = Dense(output_shape, kernel_initializer = he_uniform(),
                       bias_initializer= "zeros", 
                       kernel_regularizer= l2(l = kernel_regularizer),
                       bias_regularizer= l2(l = bias_regularizer))(lstm)
        x = BatchNormalization()(dense_1)

        x = relu(x)
        x = Dropout(rate = dropout_rate)(x)

        o = Dense(output_shape, kernel_initializer= glorot_uniform(),
                 bias_initializer= "zeros", 
                 kernel_regularizer= l2(l = kernel_regularizer), 
                 bias_regularizer= l2(l = bias_regularizer))(dense_1)
        o = BatchNormalization()(o)

        output = softmax(o, axis= 1)
        loss = CC()
        metrics = AUC()
        optimizer = Adam()

        self.model= Model(inputs= [input_layer], outputs= [output])
        self.model.compile(optimizer= optimizer, loss= loss, metrics= [metrics])
        
        
    def train(self, x, y, validation_split, epochs):
        self.model.fit(x, y, validation_split = validation_split, epochs= epochs)
        
    def predict(self, x):
        return self.model.predict(x)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import seaborn as sn
import matplotlib.pyplot as plt

def transform_x(data, len_keys):
    output_shape = [data.shape[0],
                  data.shape[1],
                  len_keys]
    results= np.zeros(output_shape)
    
    for i in range(data.shape[0]):
        for ii in range(data.shape[1]):
            results[i, ii, data[i,ii]-1]= 1
    return results

def generateModel(x_transformed, output_shape):
    steps = x_transformed.shape[1]
    dim = x_transformed.shape[2]

    model = LSTMModel()
    model.build_model(input_dim= dim,
                      output_shape = output_shape,
                      steps = steps, 
                      dropout_rate = 0.7, 
                      bias_regularizer = 0.3, 
                      kernel_regularizer = 0.3)
    return model

def plotConfusionMatrix(y_true, y_pred):
    data = {
      'real_value': y_true,
      'predicted': y_pred
    }
    fig, ax = plt.subplots(figsize=(14,14))
    df = pd.DataFrame(data, columns=['real_value','predicted'])
    confusion_matrix = pd.crosstab(df['real_value'], df['predicted'], rownames=['Real'], colnames=['Predicted'])
    sn.heatmap(confusion_matrix, annot=True, cbar=False, fmt='g')
    plt.show()

def runModel(texts, labels, vocab_size, output_length, encoder):
    X_train, X_test, y_train, y_test = train_test_split(texts, labels, test_size=0.2, random_state=3)
    xtr_transformed = transform_x(X_train, vocab_size)
    xts_transformed = transform_x(X_test, vocab_size)
    
    model = generateModel(xtr_transformed, output_length)
    model.train(xtr_transformed, y_train, 0.2, 30)
    
    prediction_test = encoder.inverse_transform(model.predict(xts_transformed))
    print(classification_report(encoder.inverse_transform(y_test), prediction_test))
    
    y_true = list (map(lambda l: l[0], encoder.inverse_transform(y_test)))
    y_pred = list (map(lambda l: l[0], prediction_test))
    plotConfusionMatrix(y_true, y_pred)

In [None]:
output_length = len(df.label.unique())
runModel(texts, labels, vocab_size, output_length, encoder)