# Dataset

In [6]:
import csv
import string

import sys
import nltk
from nltk.tokenize import word_tokenize

from sklearn.model_selection import train_test_split

def load_dataset(path):
    
    x = []
    y = []

    with open(path, 'r', newline='', encoding="utf-8") as csvfile:
        
        reader = csv.reader(csvfile, quotechar='"', delimiter=',')
        
        # Taking the header of the file + the index of useful columns:
        header = next(reader)
        ind_label = header.index('label')
        ind_text = header.index('text')
        
        for row in reader:
            
            label = row[ind_label]
            if label == "democrat":
                y.append(0)
            elif label == "republican":
                y.append(1)
            else:
                print("ERROR : " + str(row))
                continue
                
            x.append(row[ind_text])
            

        assert len(x) == len(y)

        return x, y


# Path of the dataset
path = "databases/stemmed.csv"

X, y = load_dataset(path)

train_valid_X, test_X, train_valid_Y, test_Y = train_test_split(X, y, test_size=0.15, random_state=12)

train_X, valid_X, train_Y, valid_Y = train_test_split(train_valid_X, train_valid_Y, test_size=0.18, random_state=12)

print("Length of training set : ", len(train_X))
print("Length of validation set : ", len(valid_X))
print("Length of test set : ", len(test_X))


ERROR : ['label', 'text']
Length of training set :  117888
Length of validation set :  25879
Length of test set :  25371


# Dictionary

In [7]:
def bigram(tokens):
    """
    tokens: a list of strings
    """
    # Init array
    bigrams = []
    
    # Go through tokens
    for i in range(0,len(tokens)-1):
        bigrams.append(" ".join([tokens[i],tokens[i+1]]))
    
    # This function returns the list of bigrams
    return bigrams


# Returns unique words
def buildDict(tweets, addBigram=False):
    
    # Init empty set
    wordDict = set()
    
    # Go through each tweet of the validation set
    for tweet in tweets:

        # Tokenize
        words = word_tokenize(tweet)
        
        # Add Bigram
        if(addBigram):
            words = words + bigram(words)

        # Go through each word
        for word in words:

            # Append to dictionary if not already there
            if(word not in wordDict):
                wordDict.add(word)
                
    # Get the stats
    print("Dict Dimension: " + str(len(wordDict)))
    
    return list(wordDict)


# Create a dictionary of all the words
wordDict = buildDict(valid_X,addBigram=True)

Dict Dimension: 163646


# LSTM

In [4]:
# Pytorch Dependencies
import torch
import torch.nn as nn
import torch.nn.functional as F

from tqdm import tqdm_notebook as tqdm

In [None]:
class LSTM(nn.Module):
    
    # backbone = ResNet50
    def __init__(self):
        super(LSTM, self).__init__()
        
        # Input Layer
        self.input = nn.Input()
        
        # We get the number of features at the output layer of the backbone
        num_ftrs = backbone.fc.in_features
        
        # Fully connected layer at the output of our backbone to reduce the dimensionality
        self.backbone.fc = nn.Linear(num_ftrs, HIDDEN_DIM)
        
        # Randomly zeroes some of the elements of the input tensor with probability p, helps with overfitting
        self.dropout1 = nn.Dropout(DROPOUT) 
        
        # Fully connected layer to reduce the dimensionality from HIDDEN_DIM to MAX_LEN
        self.linear2 = nn.Linear(HIDDEN_DIM, MAX_LEN)
        
        # Init a LSTM (input_size, hidden_size)
        # input_size = The number of expected features in the input tensor (number of chars)
        # hidden_size = The number of features out of our LTSM
        # batch_first = makes the output tensors to be provided as (batch, seq, feature). 
        self.lstm = nn.LSTM(OUTPUT_DIM, HIDDEN_DIM, batch_first=True)
        
        # The dimensionality of our LSTM gets reduced to fit the number of char
        self.out = nn.Linear(HIDDEN_DIM, OUTPUT_DIM)
        
        
        # Add an Input Layer
        input_layer = layers.Input((70, ))

        # Add the word embedding Layer
        embedding_layer = layers.Embedding(len(word_index) + 1, 300, weights=[embedding_matrix], trainable=False)(input_layer)
        embedding_layer = layers.SpatialDropout1D(0.3)(embedding_layer)

        # Add the LSTM Layer
        lstm_layer = layers.LSTM(100)(embedding_layer)

        # Add the output Layers
        output_layer1 = layers.Dense(50, activation="relu")(lstm_layer)
        output_layer1 = layers.Dropout(0.25)(output_layer1)
        output_layer2 = layers.Dense(1, activation="sigmoid")(output_layer1)

        # Compile the model
        model = models.Model(inputs=input_layer, outputs=output_layer2)
        model.compile(optimizer=optimizers.Adam(), loss='binary_crossentropy')

        return model
        
        
        
        
    
    # x = tensor containing the images in a batch [32,3,IMG_HEIGHT,IMG_WIDTH]
    # target = tensor containing the one-hot encoded labels for these images [32,1,OUTPUT_DIM,MAX_LEN]
    def forward(self, x, target):
        
        # --- We pass the payload through the first layers of our NN ---
        
        x = self.backbone(x)             # the backbone NN ouputs HIDDEN_DIM features
        latent = F.relu(x)               # rectified linear unit (floors at 0)
        latent = self.dropout1(latent)   # we randomly set to zero, some elements of the input tensor
        length = self.linear2(latent)    # we reduce the number of features to MAX_LEN
        
        
        # --- We pass the payload through our LSTM --- 
        inputs = torch.zeros(BATCH_SIZE, 1, OUTPUT_DIM).to(self.device)
        hidden = (latent.unsqueeze(0), torch.zeros(1, BATCH_SIZE, HIDDEN_DIM).to(self.device)) # (T[1,32,512] , T[1,32,512])
        number = []
        
        for i in range(MAX_LEN):
            
            output, hidden = self.lstm(inputs, hidden)    # output = T[BATCH_SIZE,1,HIDDEN_DIM]
    
            # We remove the channel dimension, we have a batch of HIDDEN_DIM features
            flatten_output = output[:,-1,:]               # flatten_output = T[BATCH_SIZE,HIDDEN_DIM]
            
            # We get the proba. distri. of all the potential characters for that batch of HIDDEN_DIM features
            digit_prob_distri = self.out(flatten_output)  # T[32,13]
            number.append(digit_prob_distri.unsqueeze(0)) # append to number
            
            # target = T[32,10,13], so we move to a different segment T[32,x,13]
            inputs = target[:, i, :].unsqueeze(1)
              
                
        # Reassemble in a T[32,MAX_LEN,OUTPUT_DIM], contains the digit prob distr for every segment of the batch
        payload = torch.cat(number, 0).transpose(0, 1)
        
        return length, payload


    # Here we receive the predicted label in the form of indexes of CHARS
    def to_num(self, number):
        
        clean_number = []
        
        # We go through the indexes and append their corresponding char
        for index in number:
            clean_number.append(CHARS[index])
            
        return ''.join(clean_number)
    
    
    # Function to predict the label of an image
    def predict(self, x):
        
        # --- We pass the payload through the first layers of our NN ---
        x = self.backbone(x)             # the backbone NN ouputs HIDDEN_DIM features
        latent = F.relu(x)               # rectified linear unit (floors at 0)
        # ** we dont apply dropout because we are in evaluation mode
        
        
        # --- We pass the payload through our LSTM --- 
        
        inputs = torch.zeros(1, 1, OUTPUT_DIM).to(self.device)
        hidden = (latent.unsqueeze(0), torch.zeros(1, 1, HIDDEN_DIM).to(self.device))  
        probDistri = torch.zeros([MAX_LEN,OUTPUT_DIM],dtype=torch.float)
        
        # --- Go through each segment of the image and extract the digit with highest probability ---
        number = []
        for i in range(MAX_LEN):
            output, hidden = self.lstm(inputs, hidden)
            
            # We get the probability distribution of all the potential characters for that feature segment
            digit_prob_distri = self.out(output[:, -1, :])
            
            # Append to tensor
            norm_digit_prob_distri = F.softmax(F.relu(digit_prob_distri),dim=1)
            rounded_norm_digit_prob_distri = torch.round(norm_digit_prob_distri*1000.0)/1000.0
            probDistri[i] = rounded_norm_digit_prob_distri
            
            # We get the index of the max probability
            index = torch.max(digit_prob_distri, -1)[1][0]
            
            # If the index points to a blank space we exit the loop (means we are done)
            if index == CHARS.index(' '):
                break
            
            # We update the position of our moving feature window (LSTM)
            inputs = torch.zeros((1, 1, OUTPUT_DIM)).to(self.device)
            inputs[0,0,index] = 1
            
            # We add the index of the predicted character
            number.append(index.item())
            
        return probDistri, self.to_num(number)
    
    
    # Function to predict the label of an image, but returns null if the level of confidence is not met
    def safePredict(self,x,confidenceThresh):
    
        # Get the prediction
        nbrDistri, number = self.predict(x)

        # Go through the probability of each character of CHARS
        index = 0
        for charDistri in nbrDistri:

            # Stop if we are after the number
            if(index == len(number)):
                break
                
            # Go through the probabilities of all the chars
            maxIsValid = False
            for prob in charDistri:

                # if one item of the prob distribution is higher than the confidence threshold we are good
                if(prob.item() >= confidenceThresh):
                    maxIsValid = True
                    break

            # If the digit with maximal probability is under the confidence threshold we exit with false
            if(not maxIsValid):
                return False
            

            # Increment char index
            index = index + 1
            
        # If all the NN was confident for all the chars, we return the full number
        return number

    
    # This function prints the probability distribution of the prediction in a nice format
    def printPredict(self,x):
        
        # Get the prediction
        nbrDistri, number = self.predict(x)
        
        # take second element for sort
        def takeSecond(elem):
            return elem[1]

        # Print the number it think it is
        print("Predicted Nbr : \t",number,"\n")

        # Go through the probability of each character of CHARS
        index = 0
        for charDistri in nbrDistri:

            # Print the predicted char
            if(index < len(number)):
                print("-",number[index],"-")
            else:
                break
                

            # Go through all the probability of the other potential candidates for that char
            probs = []
            index2 = 0
            for prob in charDistri:

                # Only consider the predicted char with p > 0.001
                if(prob > 0.001):

                    # Round to the probability to it's 3rd decimal
                    qty = round(prob.item()*1000.0)/1000.0

                    # Add to the char candidate and its prob to the array
                    probs.append([CHARS[index2],qty])

                # increment the index
                index2 = index2 + 1

            # Sort in descending order the probabilities
            orderedProbs = sorted(probs, reverse=True, key=takeSecond)

            # Print out the distribution
            for prob in orderedProbs:
                print(prob[0],": ",prob[1])

            # Carriage return
            print("\n")

            # Increment char index
            index = index + 1
            
            
            
            
            
            
            
            
            
            
            
            
            
            
            

def create_rnn_lstm():

    # Add an Input Layer
    input_layer = layers.Input((70, ))

    # Add the word embedding Layer
    embedding_layer = layers.Embedding(len(word_index) + 1, 300, weights=[embedding_matrix], trainable=False)(input_layer)
    embedding_layer = layers.SpatialDropout1D(0.3)(embedding_layer)

    # Add the LSTM Layer
    lstm_layer = layers.LSTM(100)(embedding_layer)

    # Add the output Layers
    output_layer1 = layers.Dense(50, activation="relu")(lstm_layer)
    output_layer1 = layers.Dropout(0.25)(output_layer1)
    output_layer2 = layers.Dense(1, activation="sigmoid")(output_layer1)

    # Compile the model
    model = models.Model(inputs=input_layer, outputs=output_layer2)
    model.compile(optimizer=optimizers.Adam(), loss='binary_crossentropy')
    
    return model

classifier = create_rnn_lstm()
accuracy = train_model(classifier, train_seq_x, train_y, valid_seq_x, is_neural_net=True)
print("RNN-LSTM, Word Embeddings" + str(accuracy))

