In [None]:
'''
Código responsável pela criação e desenvolvimento do modelo
'''

In [37]:
import pandas as pd
import numpy as np
import tensorflow as tf
import torch
import os
from transformers import BertTokenizer, TFBertForSequenceClassification

In [25]:
modelo_neural = 'neuralmind/bert-base-portuguese-cased'
diretorio_trabalho = 'C:/Users/Pedro/Documents/pantanal_dev/pantanal_dev-dev/'
arquivo_csv = 'trusted_zone/datasets/infomoney_pronto.xlsx'
coluna_indice = 'indice'
colunas_remover = [
    'data_publicacao', 
    'assunto', 
    'link_noticia', 
    'data_noticia', 
    'label'
    ]
numero_tuplas_dataset = 10000
tamanho_bytes = 128
tamanho_batch = 64
tensor_bits = tf.int32
camadas = 3
porcentagem_treinamento = 0.80
taxa_aprendizagem = 2e-5
epocas = 5

In [26]:
tokenizador = BertTokenizer.from_pretrained(modelo_neural)
modelo = TFBertForSequenceClassification.from_pretrained(modelo_neural, num_labels=camadas)

All model checkpoint layers were used when initializing TFBertForSequenceClassification.

Some layers of TFBertForSequenceClassification were not initialized from the model checkpoint at neuralmind/bert-base-portuguese-cased and are newly initialized: ['classifier', 'bert/pooler/dense/bias:0', 'bert/pooler/dense/kernel:0']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [27]:
def pegar_dataset():
    return pd.read_excel(diretorio_trabalho + arquivo_csv)

def formatar_dataset(dataset):
    return dataset.drop(colunas_remover, axis=1)

def cortar_dataset(dataset):
    return dataset.head(numero_tuplas_dataset)

def detalhes_dataset(total, negativos, neutros, positivos, corte):
    print("\nTotal Inserido: {}".format(total))
    print("\nNegativos: {}".format(negativos))
    print("Neutros: {}".format(neutros))
    print("Positivos: {}".format(positivos))
    print("\nCorte: {}".format(corte))
    print("Total Treinavel: {}".format(corte * 3))

def balancear_dataset(dataset):
    tuplas_negativas = dataset.loc[dataset['classificacao'] == 'negativo']
    tuplas_neutras = dataset.loc[dataset['classificacao'] == 'neutro']
    tuplas_positivas = dataset.loc[dataset['classificacao'] == 'positivo']

    numero_negativos = len(tuplas_negativas)
    numero_neutros = len(tuplas_neutras)
    numero_positivos = len(tuplas_positivas)

    corte_menor = min(numero_negativos, numero_neutros, numero_positivos)

    detalhes_dataset(len(dataset), numero_negativos, numero_neutros, numero_positivos, corte_menor)

    corte_negativos = tuplas_negativas.head(corte_menor)
    corte_neutros = tuplas_neutras.head(corte_menor)
    corte_positivos = tuplas_positivas.head(corte_menor)

    dataset_concatenado = pd.concat([corte_negativos, corte_neutros, corte_positivos])
    dataset_sorteado = dataset_concatenado.sample(frac=1).reset_index(drop=True)

    return dataset_sorteado

def tamanho_sentencas(dataset):
    quantidade_caracteres = []

    for indice, tupla in dataset.iterrows():
      quantidade_caracteres.append(len(tupla['titulo']))

    caracter_minimo = min(quantidade_caracteres)
    caracter_maximo = max(quantidade_caracteres)
    caracter_medio = int(sum(quantidade_caracteres) / len(quantidade_caracteres))

    print('\nCaracteres Mínimo: {}'.format(caracter_minimo))
    print('Caracteres Médio: {}'.format(caracter_medio))
    print('Caracteres Máximo: {}'.format(caracter_maximo))

def converter_labels(labels):
    labels_saida = []

    for label in labels:
        if (label == 'negativo'):
            labels_saida.append(0)
        elif (label == 'neutro'):
            labels_saida.append(1)
        elif (label == 'positivo'):
            labels_saida.append(2)
        else:
            labels_saida.append(-1)
            print('ERRO')
    
  return labels_saida

def gerador():
    for exemplo in dados_tokenizados:
        yield (
            {
                'input_ids': exemplo['input_ids'],
                'attention_mask': exemplo['attention_mask']
            },
            exemplo['labels']
            )

def interpertar_predicao(predicao):
    if (predicao == 0):
        return 'Negativo'
    elif (predicao == 1):
        return 'Neutro'
    elif (predicao == 2):
        return 'Positivo'
    else:
        return 'ERRO'

In [28]:
dataset = pegar_dataset()
dataset = formatar_dataset(dataset)
dataset = balancear_dataset(dataset)

tamanho_sentencas(dataset)

sentencas = dataset['titulo'].to_list()
labels = dataset['classificacao'].to_list()
labels = converter_labels(labels)


Total Inserido: 5049

Negativos: 1080
Neutros: 3009
Positivos: 960

Corte: 960
Total Treinavel: 2880

Caracteres Mínimo: 5
Caracteres Médio: 73
Caracteres Máximo: 132


In [29]:
tokens = tokenizador.tokenize(sentencas[2])
token_ids = tokenizador.convert_tokens_to_ids(tokens)

print(f' Sentença: {sentencas[2]}')
print(f'   Tokens: {tokens}')
print(f'.     IDs: {token_ids}')

 Sentença: Reclamações contra bancos aumentam 17% em fevereiro, aponta BC
   Tokens: ['Rec', '##lama', '##ções', 'contra', 'bancos', 'aumentam', '17', '%', 'em', 'fevereiro', ',', 'aponta', 'B', '##C']
.     IDs: [2325, 4839, 315, 598, 9213, 20958, 1040, 110, 173, 1812, 117, 12110, 241, 22304]


In [30]:
ids = []
mascaras = []
dados_tokenizados = []

for sentenca_unitaria, label_unitario in zip(sentencas, labels):
    sentenca_tokenizada = tokenizador.encode_plus(
        sentenca_unitaria,
        add_special_tokens=True,
        return_token_type_ids=False,
        max_length=tamanho_bytes,
        pad_to_max_length=True,
        return_attention_mask=True,
        return_tensors='tf'
        )
    
    entrada = {
        'input_ids': sentenca_tokenizada['input_ids'][0],
        'attention_mask': sentenca_tokenizada['attention_mask'][0],
        'labels': label_unitario
        }
    
    dados_tokenizados.append(entrada)

dataset = tf.data.Dataset.from_generator(
    gerador,
    (
        {'input_ids': tensor_bits, 'attention_mask': tensor_bits},
        tensor_bits
    ),
    (
        {
            'input_ids': tf.TensorShape([tamanho_bytes]),
            'attention_mask': tf.TensorShape([tamanho_bytes])
        },
        tf.TensorShape([])
    )
)

Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


In [31]:
tamanho_treinamento = int(len(dados_tokenizados) * porcentagem_treinamento)
dataset_treinamento = dataset.take(tamanho_treinamento).shuffle(tamanho_treinamento).batch(tamanho_batch)
dataset_teste = dataset.skip(tamanho_treinamento).batch(tamanho_batch)

In [32]:
otimizador = tf.keras.optimizers.Adam(learning_rate=taxa_aprendizagem)
perda = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

In [33]:
modelo.layers[-1].units = camadas
modelo.layers[-1].activation = tf.keras.activations.softmax

In [None]:
modelo.compile(
    optimizer=otimizador, 
    loss=perda, 
    metrics=['accuracy']
    )
    
modelo.fit(
    dataset_treinamento, 
    epochs=epocas, 
    validation_data=dataset_teste
    )

In [53]:
def list_files_in_folder(folder_path):
    files_list = list()

    for root, dirs, files in os.walk(folder_path):
        for file in files:
            file_path = os.path.join(root, file)
            files_list.append(file_path)
    
    return files_list

folder_path = diretorio_trabalho + "raw_zone/datasets/infomoney/"
files_list = list_files_in_folder(folder_path)

In [None]:
for contagem, arquivo in enumerate(files_list):
    df = pd.read_excel(arquivo)

    titulos = df["titulo"]
    titulos_classificados = list()

    for titulo in titulos:
        frase_tokenizada = tokenizador.encode_plus(
            titulo,
            add_special_tokens=True,
            return_token_type_ids=False,
            max_length=tamanho_bytes,
            pad_to_max_length=True,
            return_attention_mask=True,
            return_tensors='tf'
        )

        id_teste = frase_tokenizada['input_ids']
        mascara_teste = frase_tokenizada['attention_mask']
        predicoes = modelo.predict({'input_ids': id_teste, 'attention_mask': mascara_teste})
        logits = predicoes.logits
        classe_predita = tf.argmax(logits, axis=1).numpy()[0]
        titulos_classificados.append(interpertar_predicao(classe_predita))
    
    df["classificacao"] = titulos_classificados
    df.to_excel(diretorio_trabalho + "model/datasets_classificados/{}.xlsx".format(contagem + 2007))