# Personality Analysis using a Bimodel LSTM Network

In [1]:
!pip3 install torch skorch transformers pandas matplotlib

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import numpy as np
import skorch
import torch

from IPython.display import display
from skorch import NeuralNet
from torch import nn, optim, tensor
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, pad_sequence

from model.LstmModel import LstmModel
from utils import progress_bar

torch.manual_seed(0)
device = "cuda:0" if torch.cuda.is_available() else 'cpu'



## Prepare Dataset

In [3]:
# Path to the MyPersonality dataset
MY_PERSONALITY_PATH = "data/mypersonality.csv"

# Path to the Essays dataset
ESSAYS_PATH = "data/essays.csv"

# List of traits to analyse
TRAITS = ["cAGR", "cCON", "cEXT", "cOPN", "cNEU"]

# Max length of tokens when calculating embeddings
MAX_LENGTH = 250

# Specify what dataset to use, can be either ESSAYS or MY_PERSONALITY
DATASET_TO_USE = "ESSAYS"

# Specify was model from the Transformers library to use to calculate embeddings
EMBEDDINGS_MODEL = "bert-base-uncased"

In [4]:
import pandas

from skorch.helper import SliceDict
from transformers import AutoModel, AutoTokenizer
from utils import progress_bar

def generate_embeddings(texts):
    """Generate word embeddings for all the texts"""

    embeddings = []
    
    with torch.no_grad():
        progress = display(progress_bar(0, 100), display_id=True)

        tokenizer = AutoTokenizer.from_pretrained(EMBEDDINGS_MODEL)

        embeddings_model = AutoModel.from_pretrained(EMBEDDINGS_MODEL).to(device)
        embeddings_model.eval()

        for i, text in enumerate(texts):
            encoded_text = tokenizer.encode_plus(
                text.lower(),
                add_special_tokens=True,
                max_length=MAX_LENGTH,
                pad_to_max_length=True,
            )
            input_ids = encoded_text["input_ids"]
            attention_mask = encoded_text["attention_mask"]

            input = torch.tensor(input_ids).to(device).unsqueeze(0)
            input_mask = torch.tensor(attention_mask).to(device).unsqueeze(0)

            output = embeddings_model(input, attention_mask=input_mask)[0]
            output = output.squeeze().to("cpu")

            embeddings.append(output.numpy())

            progress.update(progress_bar(i, len(texts)))

    return np.stack(embeddings, axis=0)

def load_dataset():
    """Loads a dataset and returns a PyTorch Dataset with embeddings"""

    if DATASET_TO_USE == "MY_PERSONALITY":
        path = MY_PERSONALITY_PATH
        text_field = "STATUS"
    elif DATASET_TO_USE == "ESSAYS":
        path = ESSAYS_PATH
        text_field = "TEXT"

    df = pandas.read_csv(path, encoding="latin1")
    df[TRAITS] = df[TRAITS].replace(to_replace=["y","n"], value=[1.0,0.0])
    df = df.rename(columns={text_field: "TEXT"})

    embeddings = generate_embeddings(df["TEXT"])
    labels = {}

    for trait in TRAITS:
        labels[trait] = df[trait].to_numpy()
    
    return embeddings, labels

## Training

In [5]:
import csv

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from skorch.callbacks import Callback

def scorer(y, y_pred):
    """Calculates the accuracy of the model"""
    y_pred = np.rint(y_pred)
    return accuracy_score(y, y_pred)


def calculate_accuracy(net, dataset, y_true):
    """Calculates the accuracy of the model"""
    y_pred = np.rint(net.predict(dataset))
    return accuracy_score(y_true, y_pred)


def calculate_precision(net, dataset, y_true):
    """Calculates the precision of the model"""
    y_pred = net.predict(dataset)
    y_pred = np.rint(y_pred)
    return f1_score(y_true, y_pred)


def calculate_recall(net, dataset, y_true):
    """Calculates the recall of the model"""
    y_pred = net.predict(dataset)
    y_pred = np.rint(y_pred)
    return precision_score(y_true, y_pred)


def calculate_f1_score(net, dataset, y_true):
    """Calculates the F1 score of the model"""
    y_pred = net.predict(dataset)
    y_pred = np.rint(y_pred)
    return recall_score(y_true, y_pred)

class HiPlotLog(Callback):
    def on_epoch_end(self, net, **kwargs):
        current = net.history[-1]
        filename = DATASET_TO_USE.lower()

        with open(f"output/variance_{filename}_bert.csv", "a") as file:
            writer = csv.writer(file)
            writer.writerow([
                trait,
                current["epoch"],
                parameters["learning_rate"],
                parameters["dropout_input"],
                parameters["dropout_output"],
                parameters["weight_decay"],
                parameters["batch_size"],
                parameters["hidden_dim"],
                EMBEDDINGS_MODEL,
                "BCE",
                "sigmoid",
                current["train_loss"],
                current["valid_loss"],
                current["accuracy"],
                current["precision"],
                current["recall"],
                current["f1_score"]
            ])

class SaveBestModel(Callback):
    def on_epoch_end(self, net, **kwargs):
        current = net.history[-1]

        if current["accuracy_best"]:
            filename = DATASET_TO_USE.lower()
            
            with open(f"trained/best_{filename}.csv", "r+") as file:
                reader = csv.reader(file)
                best_scores = list(reader)[-1]

                index = TRAITS.index(trait)
                best_score = best_scores[index]

                if float(best_score) < current["accuracy"]:
                    with open(f"trained/best_{filename}.csv", "a") as file2:
                            writer = csv.writer(file2)
                            best_scores[index] = current["accuracy"]
                            writer.writerow(best_scores)
                            net.save_params(f_params=f"trained/{trait}.pt")


class FixRandomSeed(Callback):
    def __init__(self, seed=0):
        self.seed = 0
    
    def initialize(self):
        torch.manual_seed(self.seed)
        torch.cuda.manual_seed(self.seed)
        
        try:
            random.seed(self.seed)
        except NameError:
            import random
            random.seed(self.seed)

        np.random.seed(self.seed)
        torch.backends.cudnn.deterministic=True

In [6]:
from skorch.callbacks import EpochScoring, ProgressBar, EarlyStopping

def create_net(parameters):
    return NeuralNet(
        module=LstmModel,
        module__dropout_input=parameters["dropout_input"],
        module__dropout_output=parameters["dropout_output"],
        module__hidden_dim=parameters["hidden_dim"],
        module__embedding_dim=parameters["embedding_dim"],
        criterion=nn.BCELoss,
        optimizer=optim.Adam,
        optimizer__weight_decay=parameters["weight_decay"],
        optimizer__lr=parameters["learning_rate"],
        iterator_train__shuffle=True,
        iterator_valid__shuffle=True,
        max_epochs=parameters["max_epochs"],
        batch_size=parameters["batch_size"],
        train_split=skorch.dataset.CVSplit(parameters["cross_validation_split"], stratified=True, random_state=0),
        callbacks=[
#             FixRandomSeed(parameters["seed"]),
            EpochScoring(calculate_accuracy, name="accuracy", lower_is_better=False),
            EpochScoring(calculate_precision, name="precision", lower_is_better=False),
            EpochScoring(calculate_recall, name="recall", lower_is_better=False),
            EpochScoring(calculate_f1_score, name="f1_score", lower_is_better=False),
            HiPlotLog(),
            SaveBestModel(),
            EarlyStopping(patience=40),
            ProgressBar(),
        ],
        device=device,
    )


def train(parameters, trait, X, y):
    """Train the model and print the output"""

    print(f"Training {trait}:")
    print(f"  Batch Size: {parameters['batch_size']}")
    print(f"  Learning Rate: {parameters['learning_rate']}")
    print(f"  Max Epochs: {parameters['max_epochs']}")
    print(f"  Input Dropout: {parameters['dropout_input']}")
    print(f"  Output Dropout: {parameters['dropout_output']}")
    print(f"  Weight Decay: {parameters['weight_decay']}")
    print(f"  Hidden Dim: {parameters['hidden_dim']}")
    print(f"  Cross Validation Split: {parameters['cross_validation_split']}")
    print(f"  Embeddings Model: {EMBEDDINGS_MODEL}")
    print("\n")
    
    net = create_net(parameters)
    net.fit(X, y=y)

In [None]:
# parameters = {
    "learning_rate": 0.00001,
    "max_epochs": 200,
    "batch_size": 128,
    "hidden_dim": 192,
    "embedding_dim": 768,
    "dropout_input": 0.2,
    "dropout_output": 0.1,
    "weight_decay": 0.001,
    "cross_validation_split": 10,
}

# EMBEDDINGS_MODEL = "roberta-base"
# embeddings, labels = load_dataset()

# for x in range(0, 50):
#     for trait in TRAITS:
#         print(f"Iteration: {x}")
#         train(parameters, trait, embeddings, labels[trait])

EMBEDDINGS_MODEL = "bert-base-uncased"
embeddings, labels = load_dataset()

for x in range(0, 12):
    for trait in TRAITS:
        print(f"Iteration: {x}")
        train(parameters, trait, embeddings, labels[trait])

In [None]:

from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer
from skorch.helper import SliceDataset

parameters = {
    "learning_rate": 0.00001,
    "max_epochs": 150,
    "batch_size": 128,
    "hidden_dim": 192,
    "embedding_dim": 768,
    "dropout_input": 0.2,
    "dropout_output": 0.1,
    "weight_decay": 0.001,
    "cross_validation_split": 10,
    "seed": 0
}

grid_search_parameters = {
    "module__dropout_input": [0.1, 0.2, 0.3, 0.4, 0.5],
    "module__dropout_output": [0.1, 0.2, 0.3, 0.4, 0.5],
    "optimizer__weight_decay": [0.00001, 0.0001, 0.001, 0.01]
}

EMBEDDINGS_MODEL = "roberta-base"
embeddings, labels = load_dataset()

for trait in TRAITS:
    net = create_net(parameters)

    grid_searc
    h = RandomizedSearchCV(net, grid_search_parameters, cv=parameters["cross_validation_split"], scoring=make_scorer(scorer))
    grid_search.fit(embeddings, labels[trait])

    print(grid_search.best_score_, grid_search.best_params_)

## Analysis

In [None]:
import hiplot as hip

def display_plot(filename):
    with open(f"output/{filename}.csv") as file:
        experiment = hip.Experiment.from_csv(file)
        experiment.parameters_definition["accuracy"].force_range(0.45, 0.75)
        experiment.display()

display_plot("mypersonality")