## Modelos para Classificação Textual de Manifestações

> - Support Vector Machine + Bag of Words
> - Support Vector Machine + Word Embeddings
> - MLP (Multi Layer Perceptron) + Word Embeddings
> - BERT (Bidirectional Encoder Representations from Transformers)

### Imports e Configurações

Importação de dependências e funções utilitárias desenvolvidas para o auxílio nas atividades de tokenização, manipulação do conjunto de dados para a tarefa de classificação textual e conversão de tokens para as diferentes representações dos dados requeridas pelos modelos utilizads neste notebook.

Conectando ao drive e definindo base_path

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# SE FOR RODAR NO GOOGLE COLAB TEM QUE COLOCAR
# O CAMINHO COMPLETO PARA O NOTEBOOK NO BASEPATH
base_path = './'
%cd {base_path}
!pwd

Instalando dependências necessárias

In [None]:
!pip install transformers

Imports necessários

In [None]:
import pandas as pd
import numpy as np
import nltk
import spacy #TODO resolver erro ao importar lib localmente
import matplotlib.pyplot as plt
import tensorflow as tf

from nltk.stem import RSLPStemmer
from gensim.models import Word2Vec
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.svm import SVC
from sklearn import metrics
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from tensorflow.keras.optimizers import Adam
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification

from utils.plotting import print_confusion_matrix_as_table
from utils.tokenization import MyCustomTokenizer, extract_vocabulary_from_tokens
from utils.dataframes import compute_tokenized_columns_in_dataframe, mapping_str_class_to_target
from utils.embeddings import calculate_dimension_of_sentences, convert_tokens_to_embeddings, apply_average_in_embeddings, apply_padding_in_embeddings

Configurações globais

In [None]:
%load_ext autoreload
%autoreload 2
nltk.download('rslp')
nltk.download('punkt')
nltk.download('stopwords')
plt.style.use('classic')

### Parâmetros do Experimento

In [None]:
EXP_ID = '/exp01'
EXP_DATASET_ID = '/01'
EXP_SEED = 42
EXP_SAMPLER_STRATEGY = None # 'not minority' | 'not majority' | None
EXP_SAMPLER_CLASS = None # RandomUnderSampler | RandomOverSampler | None
EXP_DESCRIPTION = 'Descrição do Experimento'

### Criando diretórios do experimento

In [None]:
import os
path_to_results = '../results'
path_to_folder = path_to_results + EXP_ID
path_to_images = path_to_folder + '/images'
path_to_reports = path_to_folder + '/reports'
path_to_models = path_to_folder + '/models'
path_to_params = path_to_folder + '/params'
path_to_history = path_to_folder + '/history'
path_to_cmatrix = path_to_folder + '/cmatrix'

if os.path.isdir(path_to_folder) == False:
  os.mkdir(path_to_folder)
  os.mkdir(path_to_images)
  os.mkdir(path_to_reports)
  os.mkdir(path_to_models)
  os.mkdir(path_to_params)
  os.mkdir(path_to_history)
  os.mkdir(path_to_cmatrix)
  print('Diretório {} criado com sucesso!'.format(path_to_folder))
else:
  print('Diretório {} já existe!'.format(path_to_folder))

### Carregamento dos dados

Carregando dados das manifestações utilizadas no treinamento do classificador atual.

In [None]:
# CONFIGUAR CAMINHO PARA CONJUNTOS DE TREINAMENTO, VALIDAÇÃO E TESTE
path_to_dataset = '../dataset' + EXP_DATASET_ID

train = pd.read_csv(path_to_dataset + '/train.csv', sep = ';')
test = pd.read_csv(path_to_dataset + '/test.csv', sep = ';')
valid = pd.read_csv(path_to_dataset + '/valid.csv', sep = ';')

In [None]:
train.head()

In [None]:
test.head()

In [None]:
valid.head()

In [None]:
train.info()

In [None]:
test.info()

In [None]:
valid.info()

In [None]:
train['Assunto'].value_counts()

In [None]:
test['Assunto'].value_counts()

In [None]:
valid['Assunto'].value_counts()

### Tratamento dos dados

Identificando e removendo dados faltantes

In [None]:
train.isna().sum()

In [None]:
test.isna().sum()

In [None]:
valid.isna().sum()

In [None]:
train.dropna(inplace=True)
test.dropna(inplace=True)
valid.dropna(inplace=True)

In [None]:
train['Texto'] = train['Texto'].astype('string')
train['Assunto'] = train['Assunto'].astype('string')
test['Texto'] = test['Texto'].astype('string')
test['Assunto'] = test['Assunto'].astype('string')
valid['Texto'] = valid['Texto'].astype('string')
valid['Assunto'] = valid['Assunto'].astype('string')

In [None]:
train.info()

In [None]:
test.info()

In [None]:
valid.info()

In [None]:
!pip install unidecode
from unidecode import unidecode

print(sorted(train['Assunto'].unique()))
print(sorted(test['Assunto'].unique()))
print(sorted(valid['Assunto'].unique()))

def remove_accent(val):
    return unidecode(val)

train['Assunto'] = train['Assunto'].apply(remove_accent)
test['Assunto'] = test['Assunto'].apply(remove_accent)
valid['Assunto'] = valid['Assunto'].apply(remove_accent)

print(sorted(train['Assunto'].unique()))
print(sorted(test['Assunto'].unique()))
print(sorted(valid['Assunto'].unique()))

### Divisão do dados

In [None]:
print('Total de exemplos no conjunto de treino:', len(train))
print('Total de exemplos no conjunto de teste :', len(test))
print('Total de exemplos no conjunto de valid :', len(valid))

### Pré-processamento

In [None]:
stopwords = nltk.corpus.stopwords.words('portuguese')
stemmer = RSLPStemmer()

Tokenizando dados de treinamento e teste e extraindo tokens para o vocabulário

In [None]:
train_text_tokenized, tokens = MyCustomTokenizer(stopwords = stopwords, stemmer = stemmer).tokenize(train['Texto'].values)
test_text_tokenized, _ = MyCustomTokenizer(stopwords = stopwords, stemmer = stemmer).tokenize(test['Texto'].values)
valid_text_tokenized, _ = MyCustomTokenizer(stopwords = stopwords, stemmer = stemmer).tokenize(valid['Texto'].values)

Criando o vocabulário a partir dos tokens mais frequentes extraídos do conjunto de treinamento

In [None]:
# estamos usando todos os tokens
vocabulary = extract_vocabulary_from_tokens(tokens)

In [None]:
print('Tamanho do vocabulário:', len(vocabulary))

Aqui, adicionaremos novas colunas com o identificador do assunto de cada manifestação baseado no campo *Assunto*. Os assuntos serão categorizados em ordem alfabética e o indice de cada um na lista de categorias será utilizado para definição do identificador do assunto em valor numérico. Primeiro no conjunto de treinamento e, em seguida, no de teste.

In [None]:
train = compute_tokenized_columns_in_dataframe(train, train_text_tokenized)
train = mapping_str_class_to_target(train)

In [None]:
train.head()

In [None]:
train.describe()

In [None]:
test = compute_tokenized_columns_in_dataframe(test, test_text_tokenized, drop_size_zero = False)
test = mapping_str_class_to_target(test)

In [None]:
test.head()

In [None]:
test.describe()

In [None]:
valid = compute_tokenized_columns_in_dataframe(valid, valid_text_tokenized, drop_size_zero = False)
valid = mapping_str_class_to_target(valid)

In [None]:
valid.head()

In [None]:
valid.describe()

In [None]:
target_names = sorted(train['Assunto'].astype('string').unique())
print(target_names)
target_names_masked = ['Classe {}'.format(i) for i in range(len(target_names))]
print(target_names_masked)

### Balanceamento

Identificamos que o conjunto de dados possui mais assuntos de determinadas classes do que de outras. Esta distribuição dos dados pode ser bastante prejudicial para a performance dos modelos e, portanto, foi necessário ajustar o conjunto de treinamento para que o total de exemplos de cada classe fique mais balanceado ao moldes do que é feito em https://medium.com/analytics-vidhya/re-sampling-imbalanced-training-corpus-for-sentiment-analysis-c9dc97f9eae1.

In [None]:
original = train['Target'].value_counts()
index = np.arange(len(original))
fig, ax = plt.subplots()
width = 0.35
ax.bar(index, original, label = 'Original')
ax.set_xticks(index, original.index)
ax.legend()
plt.title('Divisão das classes antes do balanceamento')
plt.show()
plt.savefig(path_to_images + '/balanceamento_antes.png')

In [None]:
train_resampled = train

if EXP_SAMPLER_CLASS != None and EXP_SAMPLER_STRATEGY != None:
  sampler = EXP_SAMPLER_CLASS(sampling_strategy=EXP_SAMPLER_STRATEGY, random_state = EXP_SEED)
  train_resampled, _ = sampler.fit_resample(train_resampled, train_resampled['Target'])

In [None]:
resample = train_resampled['Target'].value_counts()
index = np.arange(len(original))
fig, ax = plt.subplots()
width = 0.35
ax.bar(index + width/2, resample, label = 'Resample')
ax.set_xticks(index, original.index)
ax.legend()
plt.title('Divisão das classes após o balanceamento')
plt.show()
plt.savefig(path_to_images + '/balanceamento_depois.png')

In [None]:
train = train_resampled

### Bag of Words + SVM

Criando Bag of Words de contagem de palavras do vocabulário

In [None]:
vectorizer = CountVectorizer(vocabulary = vocabulary, binary = False)

Preparando conjunto de treinamento e testes

In [None]:
X_train = vectorizer.fit_transform(train['Texto_Processado'])
y_train = train['Target']
X_test = vectorizer.fit_transform(test['Texto_Processado'])
y_test = test['Target']

In [None]:
print(vectorizer.get_feature_names_out())

Treinamento do classificador

In [None]:
clf1 = SVC(kernel = "linear", C = 1000)
clf1.fit(X_train, y_train)

Avaliação do modelo no conjunto de teste

In [None]:
predicted1 = clf1.predict(X_test)

In [None]:
print(metrics.classification_report(y_test, predicted1, target_names = target_names))

In [None]:
cmatrix1 = confusion_matrix(y_test, predicted1)
display = ConfusionMatrixDisplay(
    confusion_matrix=cmatrix1,
    display_labels=target_names_masked)

fig, ax = plt.subplots()
ax.set_title('Matriz de Confusão para o modelo SVM utilizando Bag of Words\n')
display.plot(ax = ax, xticks_rotation = 'vertical')
fig.savefig(path_to_images + '/matriz-confusao-svm-bow.png', bbox_inches='tight')

In [None]:
np.save(path_to_cmatrix + '/svm_bow', cmatrix1)
np.load(path_to_cmatrix + '/svm_bow.npy')

### Embeddings + SVM

Treinando modelo de embeddings a partir de texto tokenizado

In [None]:
embeddings = Word2Vec(train['Texto_Tokenizado'], vector_size=100, window=5, min_count=1, workers=4)

In [None]:
plt.boxplot(train['Numero_de_Tokens'].values, showfliers=False)
plt.title('Distribuição da quantidade de tokens por documento no conjunto de treinamento\n')
plt.plot()
plt.savefig(path_to_images + '/boxplot-tokens-por-documento.png')

Extraindo conjuntos de treinamento e teste

In [None]:
X_train = convert_tokens_to_embeddings(train['Texto_Tokenizado'], embeddings)
X_train = apply_average_in_embeddings(X_train, embeddings.vector_size)
y_train = train['Target']

X_test = convert_tokens_to_embeddings(test['Texto_Tokenizado'], embeddings)
X_test = apply_average_in_embeddings(X_test, embeddings.vector_size)
y_test = test['Target']

In [None]:
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

In [None]:
clf2 = SVC(kernel = "linear", C = 1000)
clf2.fit(X_train, y_train)

In [None]:
predicted2 = clf2.predict(X_test)

In [None]:
print(metrics.classification_report(y_test, predicted2, target_names = target_names, zero_division = 0))

In [None]:
cmatrix2 = confusion_matrix(y_test, predicted2)

display = ConfusionMatrixDisplay(
    confusion_matrix=cmatrix2,
    display_labels=target_names_masked)

fig, ax = plt.subplots()
ax.set_title('Matriz de Confusão para o modelo SVM utilizando Word Embeddings\n')
display.plot(ax = ax, xticks_rotation = 'vertical')
fig.savefig(path_to_images + '/matriz-confusao-svm-we.png', bbox_inches='tight')

In [None]:
np.save(path_to_cmatrix + '/svm_we', cmatrix2)
np.load(path_to_cmatrix + '/svm_we.npy')

### Embeddings + Multilayer Perceptron

Calculando tamanho do padding a ser aplicação no tensor de entrada da MLP e tamanho do vetor do modelo de embeddings

In [None]:
dimension = calculate_dimension_of_sentences(train['Numero_de_Tokens'])
vector_size = embeddings.vector_size

In [None]:
print('Tamanho da Dimensão Calculada :', dimension)
print('Tamanho do Vetor de Embeddings:', vector_size)

Preparando conjunto de treinamento e testes

In [None]:
X_train = train['Texto_Tokenizado']
X_train = convert_tokens_to_embeddings(X_train, embeddings)
X_train = apply_padding_in_embeddings(X_train, vector_size , dimension)
y_train = train['Target']

X_valid = valid['Texto_Tokenizado']
X_valid = convert_tokens_to_embeddings(X_valid, embeddings)
X_valid = apply_padding_in_embeddings(X_valid, vector_size , dimension)
y_valid = valid['Target']

X_test = test['Texto_Tokenizado']
X_test = convert_tokens_to_embeddings(X_test, embeddings)
X_test = apply_padding_in_embeddings(X_test, vector_size , dimension)
y_test = test['Target']

Definindo arquitetura e iniciando treinamento da MLP

In [None]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout, LSTM

model = Sequential()
model.add(Flatten(input_shape=(dimension, vector_size)))
model.add(Dense(1000, activation='relu', kernel_regularizer='l2')),
model.add(Dropout(0.4))
model.add(Dense(1000, activation='relu', kernel_regularizer='l2')),
model.add(Dropout(0.4))
model.add(Dense(1000, activation='relu', kernel_regularizer='l2')),
model.add(Dropout(0.4))
model.add(Dense(10, activation='softmax'))
model.summary()

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint

path_to_checkpoint = '/tmp/checkpoint/mlp' + EXP_ID

checkpoint = ModelCheckpoint(filepath=path_to_checkpoint,
                             save_weights_only = True,
                             monitor="val_loss",
                             mode='min',
                             save_best_only=True)
callbacks = [checkpoint]

model.compile(optimizer = Adam(4e-5), loss = "sparse_categorical_crossentropy", metrics = ["accuracy"])
history_mlp = model.fit(X_train, y_train, epochs= 10, batch_size = 100, validation_data=(X_valid, y_valid), callbacks = callbacks)

In [None]:
import json

json.dump(history_mlp.history, open(path_to_history + '/mlp.json', 'w'))

Carregando melhor modelo

In [None]:
model.load_weights(path_to_checkpoint)

Avaliando modelo no conjunto de teste

In [None]:
predicted3 = model.predict(X_test)
predicted3 = np.argmax(predicted3, axis = 1)

In [None]:
print(metrics.classification_report(y_test, predicted3, target_names = target_names, zero_division = 0))

In [None]:
cmatrix_mlp = confusion_matrix(y_test, predicted3)

display = ConfusionMatrixDisplay(
    confusion_matrix=cmatrix_mlp,
    display_labels=target_names_masked)

fig, ax = plt.subplots()
ax.set_title('Matriz de Confusão para o modelo MLP utilizando Word Embeddings\n')
display.plot(ax = ax, xticks_rotation = 'vertical')
fig.savefig(path_to_images + '/matriz-confusao-mlp-we.png', bbox_inches='tight')

In [None]:
np.save(path_to_cmatrix + '/mlp_we', cmatrix_mlp)
np.load(path_to_cmatrix + '/mlp_we.npy')

### BERT (Bidirectional Encoder Representations from Transformers)

In [None]:
bertimbau_huggingface_url = 'neuralmind/bert-base-portuguese-cased'

In [None]:
bert_tokenizer = AutoTokenizer.from_pretrained(bertimbau_huggingface_url)

Como estamos realizando o finetuning de um modelo pré-treinado, temos que obedecer o formato de input dos dados usado no treinamento do modelo original, que, neste caso, foi de até 512 caracteres por sentença. O nosso dataset contém avaliações superiores a este valor e tivemos que truncar algumas dessas sentenças conforme é feito em
https://stackoverflow.com/questions/60551906/tensorflow-huggingface-invalid-argument-indices0-624-624-is-not-in-0.

In [None]:
def truncate_text_for_bert(data, max_size = 512):
    truncated_text = []
    for i, text in enumerate(data):
        words = text.strip()
        size = len(words)
        out = words[0:min(size, max_size)]
        truncated_text.append(out)
    return truncated_text

train_bert_text = truncate_text_for_bert(train['Texto'].to_list())
valid_bert_text = truncate_text_for_bert(valid['Texto'].to_list())
test_bert_text = truncate_text_for_bert(test['Texto'].to_list())

Preparando conjunto de treinamento e teste a partir do tokenizador do bert

In [None]:
X_train = bert_tokenizer(train_bert_text, return_tensors = "np", padding = True)
y_train = train['Target']
X_valid = bert_tokenizer(valid_bert_text, return_tensors = "np", padding = True)
y_valid = valid['Target']
X_test = bert_tokenizer(test_bert_text, return_tensors = "np", padding = True)
y_test = test['Target']

In [None]:
bert_model = TFAutoModelForSequenceClassification.from_pretrained(bertimbau_huggingface_url, num_labels = 10)
bert_model.summary()

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint

path_to_checkpoint = '/tmp/checkpoint/bert' + EXP_ID

checkpoint = ModelCheckpoint(filepath=path_to_checkpoint,
                             save_weights_only = True,
                             monitor="val_loss",
                             mode='min',
                             save_best_only=True)
callbacks = [checkpoint]

loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True)
bert_model.compile(optimizer = Adam(5e-6), loss = loss, metrics = ['accuracy'])

In [None]:
history_bert = bert_model.fit(dict(X_train), y_train, validation_data = (dict(X_valid), y_valid), batch_size = 16, epochs = 4, callbacks = callbacks)

In [None]:
import json

json.dump(history_bert.history, open(path_to_history + '/bertimbau.json', 'w'))

In [None]:
bert_model.load_weights(path_to_checkpoint)

In [None]:
predicted4 = bert_model.predict(dict(X_test))['logits']
predicted4 = tf.nn.softmax(predicted4)
predicted4 = np.argmax(predicted4, axis = 1)

In [None]:
print(metrics.classification_report(y_test, predicted4, target_names = target_names, zero_division = 0))

In [None]:
cmatrix_bertimbau = confusion_matrix(y_test, predicted4)
display = ConfusionMatrixDisplay(
    confusion_matrix=cmatrix_bertimbau,
    display_labels=target_names_masked)

fig, ax = plt.subplots()
ax.set_title('Matriz de Confusão para o modelo BERTimbau\n')
display.plot(ax = ax, xticks_rotation = 'vertical')
fig.savefig(path_to_images + '/matriz-confusao-bertimbau.png', bbox_inches='tight')

In [None]:
np.save(path_to_cmatrix + '/bertimbau', cmatrix_bertimbau)
np.load(path_to_cmatrix + '/bertimbau.npy')

### Salvando modelos

In [None]:
import pickle

path_to_svm = path_to_models + '/svm'

path_to_svm_bow = path_to_svm + '/svm_bof.sav'
path_to_svm_we = path_to_svm + '/svm_we.sav'
if os.path.isdir(path_to_svm) == False:
  os.mkdir(path_to_svm)
pickle.dump(clf1, open(path_to_svm_bow, 'wb'))
pickle.dump(clf2, open(path_to_svm_we, 'wb'))

In [None]:
path_to_mlp = path_to_models + '/mlp'
if os.path.isdir(path_to_mlp) == False:
  os.mkdir(path_to_mlp)
model.save(path_to_mlp + '/model.h5')

In [None]:
path_to_bert = path_to_models + '/bert'
bert_tokenizer.save_pretrained(path_to_bert)
bert_model.save_pretrained(path_to_bert)
#loaded_tokenizer = AutoTokenizer.from_pretrained(path_to_bert)
#loaded_model = TFAutoModelForSequenceClassification.from_pretrained(path_to_bert)

### Preview da Execução

In [None]:
MODEL_NAMES = ['BoW_SVM', 'WE_SVM', 'WE_MLP', 'BERTimbau']
PREDICTS = [predicted1, predicted2, predicted3, predicted4]
REPORTS = []

for index, model_name in enumerate(MODEL_NAMES):
    report = metrics.classification_report(y_test, PREDICTS[index], target_names = target_names, zero_division = 0, output_dict = True)
    REPORTS.append(report)

In [None]:
from IPython.display import HTML, Markdown, Latex

Markdown(print_confusion_matrix_as_table(REPORTS, MODEL_NAMES))

### Salvando Resultados

In [None]:
import json

for index, model_name in enumerate(MODEL_NAMES):
    filename = path_to_reports + '/' + model_name.lower() + '.json'
    with open(filename, 'w') as outfile:
        dictionary = REPORTS[index]
        json.dump(dictionary, outfile)

### Salvando Metadados

In [None]:
metadados = dict({"experiment" : EXP_ID,
             "dataset" : EXP_DATASET_ID,
             "seed": EXP_SEED,
             "strategy" : str(EXP_SAMPLER_STRATEGY),
             "sampler": str(EXP_SAMPLER_CLASS),
             "description": EXP_DESCRIPTION})

path_to_metadata = path_to_params + '/metadata.json'

with open(path_to_metadata, 'w') as outfile:
  json.dump(metadados, outfile)

### Liberando memória

In [None]:
import gc
gc.collect()

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)