In [2]:
import os
import time
import pickle
import numpy as np
import pandas as pd
import torch, re, string
import spacy, nltk
from collections import Counter, OrderedDict
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator, GloVe, Vectors, vocab
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, multilabel_confusion_matrix
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

### Pre-processing

In [3]:
df = pd.read_csv("https://raw.githubusercontent.com/zappocalypse/jubilant-meme/main/mbti_3_compressed.csv")

In [4]:
# Spacy tokenizer
tokenizer = get_tokenizer('spacy', language='en_core_web_sm') 

VOCAB_SIZE = 20000

# build the vocab
counter = Counter()
for i, line in enumerate(df.spell_corrected):
    counter.update(tokenizer(line))
    
ordered_dict = OrderedDict(counter.most_common()[:VOCAB_SIZE])
voc = vocab(ordered_dict)

# insert special tokens and set default index to 'unknown'
voc.insert_token('<PAD>', 0)
voc.insert_token('<UNK>', 1)
voc.set_default_index(1)

In [5]:
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

# tokenize dataset
TRUNCATE_LEN = 1000
df['truncated'] = df.spell_corrected.apply(lambda x: " ".join(x.split()[:TRUNCATE_LEN]))
df['tokenized'] = df.truncated.apply(lambda x: voc(tokenizer(x)))
df['tok_length'] = df.tokenized.apply(lambda x: len(x))

In [6]:
# padding tokenized texts to the same MAX_LEN
MAX_LEN = max(df.tok_length)
print(MAX_LEN)

def padding(tokenized, MAX_LEN):
    original = tokenized.copy()
    pads = [0] * (MAX_LEN - len(tokenized))
    return original + pads

df['padded'] = df.tokenized.apply(lambda x: padding(x, MAX_LEN))

In [7]:
train, test = train_test_split(df, random_state=2021 , stratify=df.type)

### Build Dataset

In [29]:
X_train = []
X_test = []

for i, tok in enumerate(train.padded.values):
    X_train.append(tok)
    
for i, tok in enumerate(test.padded.values):
    X_test.append(tok)

y_train = train.is_I.to_numpy()
y_test = test.is_I.to_numpy()

X_train = np.array(X_train)
X_test = np.array(X_test)

assert X_train.shape[0] == len(y_train)
assert X_test.shape[0] == len(y_test)

In [30]:
# create Tensor datasets
train_data = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
valid_data = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))

# dataloaders
batch_size = 30

# make sure to SHUFFLE your data
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size, drop_last=True)
valid_loader = DataLoader(valid_data, shuffle=True, batch_size=batch_size, drop_last=True)

### Model

In [31]:
class RNN(nn.Module):
    def __init__(self, nlayers, outsize, vsize, hsize, emsize, drop_prob=0.5):
        super(RNN, self).__init__()
        self.outsize = outsize
        self.hsize = hsize
        self.nlayers = nlayers
        self.vsize = vsize
        self.embedding = nn.Embedding(vsize, emsize)
        self.lstm = nn.LSTM(input_size=emsize, hidden_size=hsize, num_layers=nlayers, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(self.hsize, outsize)
        self.sig = nn.Sigmoid()

    def forward(self, x, hidden):
        # print("x.shape: ", x.shape)
        # print("x.grad: ", x.grad)
        batch_size = x.size(0)
        embeds = self.embedding(x)
        self.embedding.weight.retain_grad()
#         lstm_out, hidden = self.lstm(embeds)
#         lstm_out = embeds.mean(1)
        lstm_out, hidden = self.lstm(embeds, hidden)
        lstm_out = lstm_out.contiguous().view(-1, self.hsize)

        # dropout and fully connected layer
        out = self.fc(self.dropout(lstm_out))

        # sigmoid function
        sig_out = self.sig(out)

        # reshape to be batch_size first
        sig_out = sig_out.view(batch_size, -1)
        sig_out = sig_out[:, -1]
        return sig_out, hidden

#     def init_hidden(self, batch_size):
#         ''' Initializes hidden state '''
#         # Create two new tensors with sizes n_layers x batch_size x hidden_dim,
#         # initialized to zero, for hidden state and cell state of LSTM
#         h0 = torch.zeros((self.nlayers, batch_size, self.hsize)).to(device)
#         c0 = torch.zeros((self.nlayers, batch_size, self.hsize)).to(device)
#         hidden = (h0, c0)
#         return hidden
    
    def init_hidden(self, batchsize):
        weight = next(self.parameters())
        return (
            weight.new_zeros(self.nlayers, batchsize, self.hsize),
            weight.new_zeros(self.nlayers, batchsize, self.hsize)
        )

In [32]:
nlayers = 2
vsize = len(voc)
emsize = 100
outsize = 1
hsize = 100

model = RNN(nlayers, outsize, vsize, hsize, emsize)

#moving to gpu
model.to(device)
print(model)

In [33]:
# loss and optimization functions
lr = 0.001

criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

# function to predict accuracy
def acc(pred,label):
    pred = torch.round(pred.squeeze())
    return torch.sum(pred == label.squeeze()).item()

### Train

In [34]:
clip = 5
epochs = 30
valid_loss_min = np.Inf
# train for some number of epochs
epoch_tr_loss, epoch_vl_loss = [], []
epoch_tr_acc, epoch_vl_acc = [], []

for epoch in range(epochs):
    train_losses = []
    train_acc = 0.0
    model.train()
    # initialize hidden state
    h = model.init_hidden(batch_size)
    print("h1: ", h[0].shape, h[1].shape)
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        # Creating new variables for the hidden state, otherwise
        # we'd backprop through the entire training history
        h = tuple([each.data for each in h])

        model.zero_grad()
        output, h = model(inputs, h)

        # calculate the loss and perform backprop
        loss = criterion(output.squeeze(), labels.float())
        loss.backward()
        train_losses.append(loss.item())
        # calculating accuracy
        accuracy = acc(output, labels)
        train_acc += accuracy
        # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

    val_h = model.init_hidden(batch_size)
    val_losses = []
    val_acc = 0.0
    model.eval()
    for inputs, labels in valid_loader:
        val_h = tuple([each.data for each in val_h])
        inputs, labels = inputs.to(device), labels.to(device)
        output, val_h = model(inputs, val_h)
        val_loss = criterion(output.squeeze(), labels.float())
        val_losses.append(val_loss.item())
        accuracy = acc(output, labels)
        val_acc += accuracy

    epoch_train_loss = np.mean(train_losses)
    epoch_val_loss = np.mean(val_losses)
    epoch_train_acc = train_acc/len(train_loader.dataset)
    epoch_val_acc = val_acc/len(valid_loader.dataset)
    epoch_tr_loss.append(epoch_train_loss)
    epoch_vl_loss.append(epoch_val_loss)
    epoch_tr_acc.append(epoch_train_acc)
    epoch_vl_acc.append(epoch_val_acc)
    print(f'Epoch {epoch+1}')
    print(f'train_loss : {epoch_train_loss} val_loss : {epoch_val_loss}')
    print(
        f'train_accuracy : {epoch_train_acc*100} val_accuracy : {epoch_val_acc*100}')
    if epoch_val_loss <= valid_loss_min:
        torch.save(model.state_dict(), '../working/is_I.pt')
        print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
            valid_loss_min, epoch_val_loss))
        valid_loss_min = epoch_val_loss
    print(25*'==')

### Combine

In [35]:
models = {}
models['F'] = RNN(nlayers, outsize, vsize, hsize, emsize)
models['J'] = RNN(nlayers, outsize, vsize, hsize, emsize)
models['N'] = RNN(nlayers, outsize, vsize, hsize, emsize)
models['I'] = RNN(nlayers, outsize, vsize, hsize, emsize)

In [36]:
models['F'].load_state_dict(torch.load('../working/is_F.pt'))
models['J'].load_state_dict(torch.load('../working/is_J.pt'))
models['N'].load_state_dict(torch.load('../working/is_N.pt'))
models['I'].load_state_dict(torch.load('../working/is_I.pt'))

In [62]:
def predict(model, text):
    model.eval()
    if isinstance(text, str):
        padded_text = np.array(padding(voc(tokenizer(text)), MAX_LEN)).reshape(1,-1)
        feature_tensor = torch.from_numpy(padded_text)
    else:
        feature_tensor = torch.from_numpy(np.array(text).reshape(1,-1))
    batch_size = 1
    h = model.init_hidden(batch_size)
    h = tuple([each.data for each in h])
    output, h = model(feature_tensor, h)
    return(output.item())

In [64]:
s = "this is a stupid test let see if it works!"
predict(models['N'], df.padded[0])

In [87]:
from sklearn.metrics import classification_report, multilabel_confusion_matrix
from sklearn.metrics import roc_auc_score

In [73]:
def get_all4(tokenized_input):
    out = np.zeros(4)
    idx = {"I":0, "N":1, "F":2, "J":3}
    for key, model in models.items():
        out[idx[key]] = predict(model, tokenized_input)
    return out > 0.5

In [78]:
predicted = []
for each in test.padded:
    predicted.append(get_all4(each))
predicted = np.array(predicted)

In [91]:
auroc = {}
auroc["I"] = roc_auc_score(test.is_I.to_numpy(), predicted[:,0])
auroc["N"] = roc_auc_score(test.is_I.to_numpy(), predicted[:,1])
auroc["F"] = roc_auc_score(test.is_I.to_numpy(), predicted[:,2])
auroc["J"] = roc_auc_score(test.is_I.to_numpy(), predicted[:,3])
auroc

In [100]:
targets = [test.is_I.to_numpy(), test.is_N.to_numpy(), test.is_F.to_numpy(), test.is_J.to_numpy()]
targets = np.array(targets).T
result = np.prod(targets == predicted, axis=1)

In [103]:
print(classification_report(
  targets, 
  predicted,
  zero_division=0
))