In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
import pandas as pd
from torch.utils.data import DataLoader
from torch.nn import BCEWithLogitsLoss
from statistics import mean 
from tqdm.auto import tqdm
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, classification_report


In [None]:
import torch
import pandas as pd
from torch.utils.data import Dataset
from transformers import BertTokenizer, BertModel


class MyDataset(Dataset):
    def __init__(self, dataFrame: pd.DataFrame, labels: list, column: str, tokenizer: BertTokenizer,
                  device: torch.device, modelo: BertModel) -> list:
        self.X = dataFrame[column].tolist()
        self.Y = dataFrame[labels].values.tolist()
        self.tokenizer = tokenizer
        self.device = device
        self.modelo = modelo

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
        sample=self.X[index]
        sample=self.get_text_split(sample, self.tokenizer)
        tokens = self.tokenize(sample, self.tokenizer)
        tokens = { k: v.to(self.device) for k, v in tokens.items() }
        with torch.no_grad():
            output=self.modelo(input_ids=tokens['input_ids'],attention_mask=tokens['attention_mask'])
        toks_embeds = torch.stack(output.hidden_states)
        try:
            toks_embeds = toks_embeds[12]
        except:
            toks_embeds = toks_embeds[6]
        embed_final=torch.mean(torch.mean(toks_embeds, dim=1), dim=0)
        return embed_final, self.Y[index]
    
    def get_text_split(self, text: str, tokenizer: BertTokenizer, length: int = 200, overlap: int = 0, max_chunks: int = 200) -> list:
        """
        Função que divide o texto em pedaços de tamanho length com overlap de tamanho overlap.
        Parâmetros:
            text: texto a ser dividido
            length: tamanho de cada pedaço
            overlap: tamanho da sobreposição entre os pedaços
            max_chunks: número máximo de pedaços
        Retorno:
            l_total: lista com os pedaços do texto
        """
        l_total = []
        l_parcial = []
        n_words = len(text.split()) 
        #n_words = len(tokenizer.tokenize(text))
        n = n_words//(length-overlap)+1
        if n_words % (length-overlap) == 0:
            n = n-1
        if n ==0:
            n = 1
        n = min(n, max_chunks)
        for w in range(n):
            if w == 0:
                l_parcial = text.split()[:length]
            else:
                l_parcial = text.split()[w*(length-overlap):w*(length-overlap) + length]
            l = " ".join(l_parcial)
            if w==n-1:
                if len(l_parcial) < 0.75*length and n!=1:
                    continue
            l_total.append(l)
        return l_total
    
    def tokenize(self, text: str, tokenizer: BertTokenizer) -> dict:
        """
        Função que tokeniza o texto.
        Parâmetros:
            text: texto a ser tokenizado
            tokenizer: tokenizer
        Retorno:
            tokens: dicionário com os tokens
        """
        text = list(text)
        tokens = tokenizer(
            text, 
            return_attention_mask=True,
            truncation=True,
            max_length=512,
            padding='max_length',
            return_tensors='pt'
        )
        return {'input_ids': tokens['input_ids'], 'attention_mask': tokens['attention_mask']}

In [None]:
import torch
import torch.nn as nn

class Classifier(nn.Module):
    def __init__(self, input_size, output_size):
        super(Classifier, self).__init__()
        self.trans = torch.nn.TransformerEncoderLayer(d_model=768, nhead=2)
        self.fc = torch.nn.Linear(768, 30)
        self.classifier = torch.nn.Linear(30, output_size)
        
    def forward(self, x):
        x = self.trans(x.unsqueeze(0))
        x = self.fc(x)
        x = self.classifier(x)
        return x

In [None]:
def collate_func(batch: list) -> list:
    """
    Função que prepara o batch para ser passado para o modelo.
    Parâmetros:
        batch: batch de dados
    Retorno:
        X: lista com os textos
        Y: lista com os labels
    """
    X = [x[0] for x in batch]
    Y = [x[1] for x in batch]
    return [X,Y]

In [None]:
device = torch.device("cuda")

In [None]:
treino = pd.read_csv('/var/projetos/Jupyterhubstorage/victor.silva/HelBERT/Datasets/Indicios/bid_notices_weak/dataset_bid_notices_weak_treino.csv')
teste = pd.read_csv('/var/projetos/Jupyterhubstorage/victor.silva/HelBERT/Datasets/Indicios/bid_notices_weak/dataset_bid_notices_weak_teste.csv')
validacao = pd.read_csv('/var/projetos/Jupyterhubstorage/victor.silva/HelBERT/Datasets/Indicios/bid_notices_weak/dataset_bid_notices_weak_validacao.csv')

In [None]:
dados = pd.concat([treino, teste, validacao])
dados.reset_index(drop=True, inplace=True)
dados

In [None]:
dados[dados['n_min_max_limitacao_atestados'] == 1]

In [None]:
dados['text'][2]

In [None]:
model_name = 'neuralmind/bert-base-portuguese-cased'
model_embedding = AutoModel.from_pretrained(model_name, output_hidden_states=True).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
labels = dados.columns.values.tolist()[1:]
data = MyDataset(dados, labels, 'text', tokenizer, device, model_embedding)
test_loader = DataLoader(dataset=data, batch_size=4, collate_fn=collate_func)

In [None]:
classifier = Classifier(input_size = 768, output_size=7).to(device)
classifier.load_state_dict(torch.load('/var/projetos/Jupyterhubstorage/victor.silva/HelBERTModel/Modelos/Indicios/SplitedClassification/BERTimbau-model.pth')['model_state_dict'])

In [None]:
criterion = BCEWithLogitsLoss()

In [None]:
def test_step(test_loader: DataLoader):
    """
    Função que realiza o teste do modelo.
    Parâmetros:
        test_loader: dataloader de validação
        device: dispositivo a ser utilizado
        epoch: época atual
    Retorno:
        metricas: dicionário com as métricas de teste.
    """
    classifier.eval()        
    preds=[]
    trues=[]
    test_losses=[]
    loop = tqdm(test_loader, leave=True, colour='yellow')
    for embeddings, labels in loop:
        embeddings=torch.stack(embeddings).to(device)
        logits=classifier(embeddings)
        loss=None
        labels=torch.tensor(labels,dtype=float).to(device)
        loss=criterion(logits.squeeze(0),labels)
        loop.set_description(f'Realizando o teste')
        loop.set_postfix(loss=loss.item())
        test_losses.append(float(loss.detach().cpu().numpy()))
        probs=torch.sigmoid(logits)
        predictions=torch.clone(probs)
        predictions[predictions >= 0.5] = 1
        predictions[predictions < 0.5] = 0
        preds.append(torch.tensor(predictions.cpu().detach().numpy()).squeeze(0))
        trues.append(torch.tensor(labels.cpu().detach().numpy())) 
    y_true=torch.cat(trues,0)
    y_pred=torch.cat(preds,0)
    precisao=precision_score(y_true, y_pred,average='weighted',zero_division=0)
    recall=recall_score(y_true, y_pred,average='weighted', zero_division=0)
    f1=f1_score(y_true=y_true, y_pred=y_pred, average='weighted', zero_division=0)
    acuracia=accuracy_score(y_true, y_pred)
    cf_report = classification_report(y_true, y_pred, output_dict=True)
    return {'test_losses': mean(test_losses), 'precision':precisao, 'recall':recall, 'f1':f1, 'accuracy':acuracia, 'cf_report':cf_report}

In [None]:
bertimbau = test_step(test_loader)
bertimbau

In [None]:
import json
json.dump(bertimbau, open('metricasWeakBertimbau.json', 'w'))