In [None]:
!pip install lightning
!pip install scipy==1.12
!pip install gensim

# Redes Neurais e Aprendizado Profundo - Trabalho Final

## Grupo:
| Nome            | nUSP     |
|-----------------|----------|
| Rafael Zimmer   | 12542612 |
| Murilo Soave    | 10688813 |
| Fernando Cesar  | 10260559 |

## Tarefa:
**Classificação**

## Dados:
"*Sentiment Analysis for Financial News*"

## GitHub com resultados:
https://github.com/rzimmerdev/sentiment-analysis

# Descrição da Tarefa

A tarefa consiste em realizar a análise de sentimentos em um conjunto de dados de frases financeiras. O objetivo é classificar cada frase como positiva, negativa ou neutra (3 classes, ou seja, multi-class single-label) em relação ao seu conteúdo emocional. Para isso, utilizamos três abordagens diferentes de modelagem de forma comparativa:
- uma abordagem baseline com Bag of Words;
- uma abordagem state-of-the-art com um Transformer pré-treinado (BERT);
- uma abordagem adicional utilizando Word2Vec.

## Dataset Escolhido
O dataset escolhido para esta análise é o FinancialPhraseBank, disponivel no [Kaggle](https://www.kaggle.com/datasets/ankurzing/sentiment-analysis-for-financial-news).

É um conjunto de dados relativamente popular utilizado na análise de sentimentos financeiros. Este dataset contém frases extraídas de relatórios financeiros e principalmente de artigos de notícias.

É um dataset relativamente simples e pré-processado (até certo ponto, é necessário algumas transformações, principalmente para nossos modelos). Não é tão balanceado (59% positivas, 28% neutras e 12% negativas).

## Abordagem Adotada

Para a abordagem baseline, utilizamos um modelo Bag of Words (BoW), que transforma cada frase em um vetor de frequências de palavras, ignorando a ordem e o contexto das palavras, pois foi introduzido durante as aulas e também é usado comumente para problemas beeem simples que envolvam poucas classes ou dados não muito complexos.

Para a abordagem state of the art (SOA) escolhemos uma arquitetura de Transformer que tem capacidade de processar palavras em contexto, que é extremamente recorrente quando se trata de sentimentos em texto (o sentimento geralmente é definido por algumas palavras mas que dependem extremamente do contexto, por exemplo: "O cenário econômico está extremamente volátil, mas a Apple performou bem.", o sentimento é positivo ou negativo, depende se o contexto é o mercado ou a empresa Apple).

Para a abordagem adicional, escolhemos o W2V, e utilizamos uma rede recorrente (LSTM, especificamente) para a classificação. Essa escolha se dá pois é um bom ponto intermediário para os dois outros modelos, além de utilizar redes recorrentes que foi outro tópico abordado em aula.

In [38]:
import os
from enum import Enum

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence

from transformers import AutoTokenizer
from transformers import BertModel
from lightning import LightningModule, Trainer
from torchmetrics import Accuracy
from lightning.pytorch.loggers import CSVLogger
from gensim.models import Word2Vec
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import roc_curve, auc, confusion_matrix, f1_score, accuracy_score, log_loss

In [31]:
class PhraseDataset(Dataset):
    def __init__(self, x_col, y_col, tokenizer: torch.nn.Module, data: pd.DataFrame, max_len: int):
        self.tokenizer = tokenizer
        self.data = data
        self.max_len = max_len

        self.x_col = x_col
        self.y_col = y_col

        self.num_classes = len(self.data[self.y_col].unique())

    def __len__(self):
        return len(self.data)

    def __getitem__(self, item):
        phrase = str(self.data.loc[item, self.x_col])
        encoding = self.tokenizer.encode_plus(
            phrase,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding="max_length",
            return_attention_mask=True,
            return_tensors="pt",
            truncation=True
        )

        label = self.data.loc[item, self.y_col]
        target = F.one_hot(torch.tensor(label, dtype=torch.long), num_classes=self.num_classes)

        return {
            "phrase": phrase,
            "input_ids": encoding["input_ids"].flatten(),
            "attention_mask": encoding["attention_mask"].flatten(),
            "target": target.float()
        }


class Splitter:
    def __init__(self, train_size=0.8, seed=42, folder='data'):
        self.train_size = train_size
        self.seed = seed
        self.folder = folder

    @property
    def paths(self):
        folder_path = self.folder.split('/')
        if len(folder_path) > 1:
            folder = "/".join(folder_path[:-1])
        else:
            folder = folder_path[0]

        train_path = f'{folder}/train.csv'
        test_path = f'{folder}/test.csv'

        return train_path, test_path

    def split(self, df: pd.DataFrame):
        indices = torch.randperm(len(df)).tolist()

        train_size = int(self.train_size * len(df))

        train_indices = indices[:train_size]
        test_indices = indices[train_size:]

        train_df = df.loc[train_indices]
        test_df = df.loc[test_indices]

        train_path, test_path = self.paths

        train_df.to_csv(train_path, index=False, header=False)
        test_df.to_csv(test_path, index=False, header=False)

        return train_path, test_path


class FinancialPhraseDataset(PhraseDataset):
    """
    __getitem__ returns a dictionary with the following keys:
    - phrase: the original phrase
    - input_ids: the tokenized phrase
    - attention_mask: the attention mask
    - target: the target sentiment

    """
    def __init__(self,
                 tokenizer: torch.nn.Module = None,
                 path='data/archive.zip',
                 filename='data/all-data.csv',
                 max_len=512,
                 seed=None):
        splitter = Splitter(seed=seed)

        train_path, test_path = splitter.paths

        try:
            self.data = pd.read_csv(train_path, header=None, encoding='latin-1')
            self.test_data = pd.read_csv(test_path, header=None, encoding='latin-1')
        except FileNotFoundError:
            try:
                import zipfile
                with zipfile.ZipFile(path, 'r') as zip_ref:
                    zip_ref.extractall('data')
                self.data = pd.read_csv(filename, header=None, encoding='latin-1')
            except FileNotFoundError:
                raise FileNotFoundError("File not found. Please download the dataset from Kaggle.")
            splitter.split(self.data)

            self.data = pd.read_csv(train_path, header=None, encoding='latin-1')
            self.test_data = pd.read_csv(test_path, header=None, encoding='latin-1')

        if tokenizer is None:
            tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

        super().__init__(x_col=1, y_col=0, tokenizer=tokenizer, data=self.data, max_len=max_len)
        self.data = self.preprocess(self.data)
        self.test_data = self.preprocess(self.test_data)

    def preprocess(self, data):
        data[0] = data[0].map({'neutral': 0, 'positive': 1, 'negative': 2})
        data = data.dropna()
        data = data.reset_index(drop=True)
        return data

    def get_data_loaders(self, batch_size=8, shuffle=True, num_workers=0, train_size=0.8, train=True):
        if train:
            train_size = int(train_size * len(self.data))
            val_size = len(self.data) - train_size

            train_dataset, val_dataset = torch.utils.data.random_split(self, [train_size, val_size])

            train_loader = torch.utils.data.DataLoader(
                train_dataset,
                batch_size=batch_size,
                shuffle=shuffle,
                num_workers=num_workers
            )

            val_loader = torch.utils.data.DataLoader(
                val_dataset,
                batch_size=batch_size,
                shuffle=False,
                num_workers=num_workers
            )

            return train_loader, val_loader
        else:
            test_dataset = PhraseDataset(x_col=1,
                                         y_col=0,
                                         tokenizer=self.tokenizer,
                                         data=self.test_data,
                                         max_len=self.max_len)

            test_loader = torch.utils.data.DataLoader(
                test_dataset,
                batch_size=batch_size,
                shuffle=False,
                num_workers=num_workers
            )

            return test_loader

# Bag of Words

A nossa implementação tem dois pontos importantes:
- A classe de vetorização, que utiliza uma função de contagem de frequência como entrada do modelo de classificação.
- O modelo de classificação em si, que é apenas uma rede neural totalmente conectada (pesos lineares) com uma saída em porcentagem.

Essa abordagem é extremamente simples, baseada no teorema de Bayes, em que a frequência é utilizada como o Prior.

![BOW vector](https://uc-r.github.io/public/images/analytics/feature-engineering/bow-image.png)

In [32]:
class BoWVectorizer:
    def __init__(self, max_features=4000):
        self.vectorizer = CountVectorizer(max_features=int(max_features))

    def fit_transform(self, texts):
        return self.vectorizer.fit_transform(texts).toarray()

    def transform(self, texts):
        return self.vectorizer.transform(texts).toarray()

    def save(self, path):
        import pickle
        with open(path, 'wb') as f:
            pickle.dump(self.vectorizer, f)

    def load(self, path):
        import pickle
        with open(path, 'rb') as f:
            self.vectorizer = pickle.load(f)


class BowClassifier(nn.Module):
    def __init__(self, input_dim, hidden_layers=5, output_dim=3):
        super().__init__()

        self.softmax = nn.Softmax(dim=1)

        self.sequential = nn.Sequential(
            *[nn.Sequential(
                nn.Linear(input_dim, input_dim),
                nn.ReLU()
            ) for _ in range(hidden_layers - 1)],
            nn.Linear(input_dim, input_dim // 2),
            nn.ReLU(),
            nn.Linear(input_dim // 2, output_dim),
            self.softmax
        )

    def forward(self, x):
        return self.sequential(x)


class LitBowClassifier(LightningModule):
    def __init__(self, input_dim, hidden_layers=5, lr=1e-3):
        super().__init__()
        input_dim = int(input_dim)
        self.model = BowClassifier(input_dim, hidden_layers=hidden_layers)
        self.lr = lr
        self.loss = nn.CrossEntropyLoss()
        self.accuracy = Accuracy(task='multiclass', num_classes=3)
        self.vectorizer = BoWVectorizer(max_features=input_dim)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        texts = batch['phrase']
        target = batch['target']

        # Transform texts to BoW vectors
        input_ids = torch.tensor(self.vectorizer.transform(texts), dtype=torch.float32, device=self.device)

        output = self(input_ids)

        loss = self.loss(output, target)
        acc = self.accuracy(torch.argmax(output, dim=1), torch.argmax(target, dim=1))

        self.log('train_loss', loss, on_epoch=True, prog_bar=True)
        self.log('train_acc', acc, on_epoch=True, prog_bar=True)

        return loss

    def test_step(self, batch, batch_idx):
        texts = batch['phrase']
        target = batch['target']

        # Transform texts to BoW vectors
        input_ids = torch.tensor(self.vectorizer.transform(texts), dtype=torch.float32, device=self.device)

        output = self(input_ids)

        loss = self.loss(output, target)
        acc = self.accuracy(output, target)

        return output, target, loss, acc

    def configure_optimizers(self):
        return torch.optim.AdamW(self.parameters(), lr=self.lr)

    def save(self, model_path, vectorizer_path):
        torch.save(self.state_dict(), model_path)
        self.vectorizer.save(vectorizer_path)

    def load(self, model_path, vectorizer_path):
        self.load_state_dict(torch.load(model_path))
        self.vectorizer.load(vectorizer_path)

    def fit_vectorizer(self, train_texts):
        self.vectorizer.fit_transform(train_texts)

# Word To Vector

O W2V é em dificuldade de implementação um pouco mais complexo do que o BOW puro, devido à necessidade de criar um espaço latente de embeddings (representação vetorial das palavras, ou seja, transformar as palavras em números).

Nossa implementação conta com o método de vetorização, similar ao do BOW, mas utilizado a biblioteca Gensim que tem uma implementação para a criação do espaço automaticamente utilizando as frases existentes.

![W2V embeddings](https://cdn.coveo.com/images/w_1200,h_700,c_scale/v1707326301/blogprod/WordEmbeddings_106321438d/WordEmbeddings_106321438d.png?_i=AA)

In [33]:
class Word2VecVectorizer:
    def __init__(self, embedding_dim=2000):
        self.embedding_dim = embedding_dim
        self.model = None

    def fit(self, sentences):
        self.model = Word2Vec(sentences, vector_size=self.embedding_dim, window=5, min_count=1, workers=4)

    def transform(self, sentences):
        embeddings = [torch.tensor(np.array([self.model.wv[word] for word in sentence if word in self.model.wv]), dtype=torch.float32)
                      for sentence in sentences]
        return pad_sequence(embeddings, batch_first=True, padding_value=0.0)

    def save(self, path):
        self.model.save(path)

    def load(self, path):
        self.model = Word2Vec.load(path)


class Word2VecClassifier(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, output_dim, num_layers=1):
        super().__init__()
        self.softmax = nn.Softmax(dim=1)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, output_dim),
            self.softmax
        )

    def forward(self, x):
        _, (hidden, _) = self.lstm(x)
        return self.classifier(hidden[-1])


class LitWord2VecClassifier(LightningModule):
    def __init__(self, embedding_dim, hidden_dim, output_dim=3, lr=1e-3, num_layers=8):
        super().__init__()
        self.model = Word2VecClassifier(embedding_dim, hidden_dim, output_dim, num_layers)
        self.lr = lr
        self.loss = nn.CrossEntropyLoss()
        self.accuracy = Accuracy(task='multiclass', num_classes=output_dim)
        self.vectorizer = Word2VecVectorizer(embedding_dim)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        texts = batch['phrase']
        target = batch['target']

        # Transform texts to Word2Vec embeddings
        input_ids = self.vectorizer.transform(texts).to(self.device)

        output = self(input_ids)

        loss = self.loss(output, target)
        acc = self.accuracy(torch.argmax(output, dim=1), torch.argmax(target, dim=1))

        self.log('train_loss', loss, on_epoch=True, prog_bar=True)
        self.log('train_acc', acc, on_epoch=True, prog_bar=True)

        return loss

    def test_step(self, batch, batch_idx):
        texts = batch['phrase']
        target = batch['target']

        # Transform texts to Word2Vec embeddings
        input_ids = self.vectorizer.transform(texts).to(self.device)

        output = self(input_ids)

        loss = self.loss(output, target)
        acc = self.accuracy(output, target)

        self.log('test_loss', loss, on_step=True, on_epoch=True)
        self.log('test_acc', acc, on_step=True, on_epoch=True)

        pred = output.argmax(dim=1)
        self.log('test_pred', pred, on_step=True, on_epoch=True)
        self.log('test_target', target, on_step=True, on_epoch=True)

        return output, target, loss, acc

    def configure_optimizers(self):
        return torch.optim.AdamW(self.parameters(), lr=self.lr)

    def save(self, model_path, vectorizer_path):
        torch.save(self.state_dict(), model_path)
        self.vectorizer.save(vectorizer_path)

    def load(self, model_path, vectorizer_path):
        self.load_state_dict(torch.load(model_path))
        self.vectorizer.load(vectorizer_path)

    def fit_vectorizer(self, train_texts):
        tokenized_texts = [text.split() for text in train_texts]
        self.vectorizer.fit(tokenized_texts)

# Transformer com Transfer Learning

O Transformer é um modelo de rede neural que foi introduzido em 2017 e é extremamente eficaz para tarefas de NLP, pois
consegue entender o contexto de uma palavra observando tanto o que vem antes quanto o que vem depois dela (bidirecional).

Para a nossa implementação, utilizamos o modelo Bert-Small, que é uma versão mais leve do BERT, mas com a mesma
arquitetura. Congelamos os pesos do modelo e inserimos camadas adicionais para a classificação, que é uma forma de
transfer learning, ou seja, treinar um modelo em cima de outro modelo já treinado.

In [34]:
class TransformerClassifier(nn.Module):
    def __init__(self, bert, hidden_layers=5, output_dim=3):
        super().__init__()
        self.bert = bert
        embedding_dim = bert.config.to_dict()['hidden_size']

        # Freeze the BERT model parameters
        for param in self.bert.parameters():
            param.requires_grad = False

        # unfreeze last
        for param in self.bert.encoder.layer[-1].parameters():
            param.requires_grad = True

        self.softmax = nn.Softmax(dim=1)
        self.sequential = nn.Sequential(
            *[nn.Sequential(
                nn.Linear(embedding_dim, embedding_dim),
                nn.ReLU()
            ) for _ in range(hidden_layers - 1)],
            nn.Linear(embedding_dim, embedding_dim // 2),
            nn.ReLU(),
            nn.Linear(embedding_dim // 2, output_dim),
            self.softmax
        )

    def forward(self, input_ids, attention_mask):
        output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        hidden_state = output.last_hidden_state
        return self.sequential(hidden_state[:, 0, :])


class LitTransformerClassifier(LightningModule):
    def __init__(self, hidden_layers=5, lr=1e-3):
        super().__init__()
        bert = BertModel.from_pretrained('bert-base-uncased')
        self.model = TransformerClassifier(bert, hidden_layers=hidden_layers, output_dim=3)
        self.lr = lr
        self.loss = nn.CrossEntropyLoss()
        self.accuracy = Accuracy(task='multiclass', num_classes=3)

    def forward(self, input_ids, attention_mask):
        return self.model(input_ids, attention_mask)

    def training_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        target = batch['target']

        output = self(input_ids, attention_mask)

        loss = self.loss(output, target)
        acc = self.accuracy(output, target)

        self.log('train_loss', loss, on_epoch=True, prog_bar=True)
        self.log('train_acc', acc, on_epoch=True, prog_bar=True)

        return loss

    def test_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        target = batch['target']

        output = self(input_ids, attention_mask)

        loss = self.loss(output, target)
        acc = self.accuracy(output, target)

        self.log('test_loss', loss, on_step=True, on_epoch=True)
        self.log('test_acc', acc, on_step=True, on_epoch=True)

        return output, target, loss, acc

    def configure_optimizers(self):
        return torch.optim.AdamW(self.parameters(), lr=self.lr)

    def save(self, path):
        torch.save(self.state_dict(), path)


In [35]:
def plot_training_history(log_dir, model):
    metrics = pd.read_csv(f'{log_dir}/metrics.csv')
    plt.figure()

    # Make a 1/30 moving average
    window = 10
    metrics['train_loss_step'] = metrics['train_loss_step'].bfill().rolling(window=window).mean()
    metrics['train_acc_step'] = metrics['train_acc_step'].bfill().rolling(window=window).mean()

    fig, ax1 = plt.subplots()

    ax1.set_xlabel('Step')
    ax1.set_ylabel('Training Loss', color='tab:blue')
    ax1.plot(metrics['step'], metrics['train_loss_step'], label='Training Loss', color='tab:blue')
    ax1.tick_params(axis='y', labelcolor='tab:blue')

    ax2 = ax1.twinx()
    ax2.set_ylabel('Training Accuracy', color='tab:orange')
    ax2.plot(metrics['step'], metrics['train_acc_step'], label='Training Accuracy', color='tab:orange')
    ax2.tick_params(axis='y', labelcolor='tab:orange')

    fig.tight_layout()
    plt.title(f'Training History - {model}')
    fig.legend(loc='upper left', bbox_to_anchor=(0.1, 0.9))
    plt.savefig(f'{model}_training_history.png')


def train(model, max_epochs=5, batch_size=64, num_workers=4, lr=1e-5):
    dataset = FinancialPhraseDataset()
    print(f"Training {model} model")
    print(f"Maimum number of epochs: {max_epochs}. "
          f"Batch size: {batch_size}. Number of workers: {num_workers}. Learning rate: {lr}")
    train_loader, _ = dataset.get_data_loaders(batch_size=batch_size, num_workers=num_workers, train_size=1)
    lr = float(lr)

    if model == 'bow':
        model_path = 'checkpoints/bow'
        lit_model = LitBowClassifier(2000, 6, lr=lr)
        lit_model.fit_vectorizer(dataset.data[1])
    elif model == 'w2v':
        model_path = 'checkpoints/w2v'
        lit_model = LitWord2VecClassifier(embedding_dim=512, hidden_dim=256, num_layers=8, lr=lr)
        lit_model.fit_vectorizer(dataset.data[1])
    elif model == 'transformer':
        model_path = 'checkpoints/transformer'
        lit_model = LitTransformerClassifier(hidden_layers=3, lr=lr)
    else:
        raise ValueError('Unknown model')

    log_dir = os.path.join(f'logs/{model}')
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    logger = CSVLogger("logs", name=model)
    trainer = Trainer(max_epochs=max_epochs, logger=logger)
    trainer.fit(model=lit_model, train_dataloaders=train_loader)

    # if directory does not exist, create it
    if not os.path.exists('checkpoints'):
        os.makedirs('checkpoints')

    # save model
    if model == 'transformer':
        trainer.save_checkpoint(model_path + '.ckpt')
    elif model == 'bow':
        lit_model.save(model_path + '.pth', model_path + '.pkl')
    elif model == 'w2v':
        lit_model.save(model_path + '.pth', model_path + '.model')

    # save results
    # list all versions, and access metrics from latest
    version = max([int(version.split('_')[-1]) for version in os.listdir(f'logs/{model}')])
    log_dir = os.path.join(f'logs/{model}', f'version_{version}')
    plot_training_history(log_dir, model)


class Models(Enum):
    transformer = 'transformer'
    bow = 'bow'
    w2v = 'w2v'


# Especificações

## Bag of Words (BOW)

Inserimos uma sequência de camadas totalmente conectadas (uma DNN padrão) após a vetorização dos dados como modelo de
classificação para a label de sentimento.

### Hiperparâmetros:

- input_dim=2000
- hidden_dim=6

### Argumentos de treino:

- max_epochs=50
- batch_size=64
- num_workers=8
- lr=1e-6

## Word To Vector (W2V)

Para a implementação do W2V, utilizamos a biblioteca Gensim, que tem uma implementação pronta para a criação de
embeddings.

Na nossa abordagem utilizamos uma camada LSTM (Long- Short-Term Memory), ou seja, montamos uma rede recorrente como
parte do modelo de classificação, além das camadas lineares que tem como entrada a saída do CBOW.

### Hiperparâmetros:

- embedding_dim=512
- hidden_dim=256
- num_layers=8

### Argumetnos de treino:

- max_epochs=20
- batch_size=64
- num_workers=8
- lr=1e-5

## Transformer (Transfer Learning baseado em pesos do Bert-Small)

O BERT (Bidirectional Encoder Representations from Transformers) é um modelo de Transformer desenvolvido pelo Google,
mas escolhemos uma versão com menos pesos, mas a mesma arquitetura. Ele é treinado para entender o contexto de uma
palavra observando tanto o que vem antes quanto o que vem depois dela (bidirecional).

Isso permite uma interpretação
artifical de contexto, o que torna BERT extremamente eficaz para tarefas de NLP e realizar Transfer Learning em cima.
Congelamos os pesos do modelo bert-small e inserimos camadas adicionais que foram treinadas em cima dos nossos dados.

### Hiperparâmetros:

- hidden_layers=3 (apenas as camadas não congeladas)

## Argumentos de treino:

- max-epochs=10 (o modelo demora bastante para ser treinado)
- batch_size=64
- num_workers=8
- lr=1e-4

# AVISO: Todos os modelos foram treinados em máquinas nossas, e demoram bastante para serem treinados na GPU do colab!!!

In [None]:
train('transformer', max_epochs=10, batch_size=64, num_workers=8, lr=1e-4)

In [None]:
train('w2v', max_epochs=20, batch_size=64, num_workers=8, lr=1e-5)

In [None]:
train('bow', max_epochs=50, batch_size=64, num_workers=8, lr=1e-6)

# Resultados e Conclusão

Salvamos os pesos, assim como os resultados de teste no nosso GitHub, pois o Google Colab não mantém em memória os arquivos.

Acesse aqui: https://github.com/rzimmerdev/sentiment-analysis
[ *Última edição 16/06/2024* ]

In [39]:
def evaluate(model, model_dir=None):
    dataset = FinancialPhraseDataset()
    test_loader = dataset.get_data_loaders(train=False, batch_size=64, num_workers=8, train_size=1)
    model_path = f"{model_dir}/{model}"

    if model == 'bow':
        lit_model = LitBowClassifier(2000, 6)
        weights = model_path + '.pth'
        vectorizer = model_path + '.pkl'
        lit_model.load(weights, vectorizer)
    elif model == 'w2v':
        lit_model = LitWord2VecClassifier(embedding_dim=512, hidden_dim=256, num_layers=8)
        weights = model_path + '.pth'
        vectorizer = model_path + '.model'
        lit_model.load(weights, vectorizer)
    elif model == 'transformer':
        weights = model_path + '.ckpt'
        lit_model = LitTransformerClassifier.load_from_checkpoint(weights, hidden_layers=3)
    else:
        raise ValueError('Unknown model')

    lit_model.eval()

    # Calculate metrics
    all_outputs = []
    all_labels = []

    for idx, batch in enumerate(test_loader):
        output, target, _, _ = lit_model.test_step(batch, idx)

        all_outputs.extend(output.tolist())
        all_labels.extend(target.tolist())

    outputs = np.array(all_outputs)
    labels = np.array(all_labels)

    pred = np.argmax(outputs, axis=1)
    target = np.argmax(labels, axis=1)

    # Accuracy (argmax, argmax
    test_accuracy = accuracy_score(target, pred)

    # Confusion Matrix
    conf_matrix = confusion_matrix(target, pred)

    # F1 Score
    f1 = f1_score(target, pred, average='weighted')

    # Log loss
    test_loss = log_loss(all_labels, all_outputs)

    # ROC Curve and AUC
    for i in range(3):
        fpr, tpr, _ = roc_curve(labels[:, i], outputs[:, i])
        roc_auc = auc(fpr, tpr)

    # Plot ROC Curve
        plt.figure()
        plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic')
        plt.legend(loc="lower right")
        plt.savefig(f'{model}_class_{i}_roc.png')

    plt.figure()
    plt.imshow(conf_matrix, interpolation='nearest', cmap='viridis')
    plt.title('Confusion Matrix')
    plt.colorbar()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.savefig(f'{model}_confusion_matrix.png')

    # AIC Score (assuming a binomial model, and AIC = 2k - 2ln(L))
    # k = num of model params
    k = sum(p.numel() for p in lit_model.parameters())
    L = -test_loss * len(all_labels)  # log-likelihood
    AIC = 2 * k - 2 * L

    print(f'Test Accuracy: {test_accuracy}')
    print(f'Test Loss: {test_loss}')
    print(f'F1 Score: {f1}')
    print(f'AIC: {AIC}')

## Evaluation

Comparando os modelos.

Para comparar os modelos, precisamos de uma função que carregue os pesos de um modelo treinado e calcule as métricas de avaliação.
As métricas utilizadas são:

- Acurácia: a porcentagem de previsões corretas.
- Matriz de confusão: uma tabela que mostra o número de previsões corretas e incorretas.
- F1 Score: a média ponderada da precisão e recall.
- Log Loss: a função de perda logarítmica.
- AUC: a área sob a curva ROC (Receiver Operating Characteristic).
- AIC: o critério de informação de Akaike.

Conclusão:
Comparando os três modelos - o BOW como baseline, o Word2Vec e o Transformer - podemos ver que o Transformer obteve a melhor acurácia e F1 Score.
O Word2Vec obteve a pior acurácia e F1 Score, mas ainda assim é um modelo razoável.
O BOW obteve resultados intermediários, mas é o mais simples dos três modelos.


### BOW:

Test Accuracy: 0.6876288659793814
Test Loss: 3.4750539769234146
F1 Score: 0.6822113699098331
AIC: 44034747.60471523


### W2V:



### Transformer:

Test Accuracy: 0.8030927835051547
Test Loss: 1.7498943486647303
F1 Score: 0.7983950697516664
AIC: 58845512.79503641


In [None]:
evaluate('transformer', 'checkpoints')

In [None]:
evaluate('w2v', 'checkpoints')

In [None]:
evaluate('bow', 'checkpoints')