In [None]:
PATH_DATASET = 'XXX'
MODEL = 'neuralmind/bert-base-portuguese-cased' #'raquelsilveira/legalbertpt_fp'

## Instalando e importando libs

In [34]:
import tqdm
import copy
import torch
import random
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup

## Carregando dados

In [None]:
df_data = pd.read_csv(PATH_DATASET)

## Divisão dos dados

In [None]:
def train_val_test_split_dataset(df_data, doc_name):

    df = df_data.copy()

    df_label = pd.DataFrame([['Outros', 0], [doc_name, 1]], columns=['label', 'target'])

    df = pd.merge(df, df_label, on='label', how='left')

    df['target'].fillna(value=0, inplace=True)

    df['target'] = df['target'].astype(int)

    df_train = df[df['subset'] == 'train']

    df_val = df[df['subset'] == 'val']

    df_test = df[df['subset'] == 'test']
    
    X_train, y_train = df_train['texto'].values, df_train['target'].values
    X_val, y_val = df_val['texto'].values, df_val['target'].values
    X_test, y_test = df_test['texto'].values, df_test['target'].values
    
    return X_train, X_val, X_test, y_train, y_val, y_test

In [None]:
X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split_dataset(df_data, 'Decisão Inicial')

X_train.shape, X_val.shape, X_test.shape, y_train.shape, y_val.shape, y_test.shape

## Fine-tunnig

In [48]:
def prepare_dataloader(tokenizer, X, y, token_size=512, batch_size=4):

    encoded_data = tokenizer.batch_encode_plus(
      list(X),
      add_special_tokens=True,
      return_attention_mask=True,
      pad_to_max_length=True,
      max_length=token_size,
      return_tensors='pt'
    )

    input_ids = encoded_data['input_ids']
    attention_masks = encoded_data['attention_mask']
    labels = torch.tensor(y)

    dataset = TensorDataset(input_ids, attention_masks, labels)

    dataloader = DataLoader(dataset,
                          sampler=RandomSampler(dataset),
                          batch_size=batch_size)

    return dataloader

In [50]:
def evaluate(model, dataloader, device='cuda'):

    model.eval()

    loss_test_total = 0
    predictions, true_test = [], []

    for batch in dataloader:

        batch = tuple(b.to(device) for b in batch)

        inputs = {'input_ids':      batch[0],
                  'attention_mask': batch[1],
                  'labels':         batch[2]
                 }

        with torch.no_grad():
            outputs = model(**inputs)

        loss = outputs[0]
        logits = outputs[1]
        loss_test_total += loss.item()

        logits = logits.detach().cpu().numpy()
        label_ids = inputs['labels'].cpu().numpy()
        predictions.append(logits)
        true_test.append(label_ids)

    loss_test_avg = loss_test_total/len(dataloader)

    predictions = np.concatenate(predictions, axis=0)
    true_test = np.concatenate(true_test, axis=0)

    return loss_test_avg, predictions, true_test

In [51]:
from sklearn import metrics
from sklearn.metrics import f1_score, classification_report, confusion_matrix

def f1_score_func(preds, labels, metric):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return f1_score(labels_flat, preds_flat, average=metric)

def f1_score_func_average(preds, labels, average_f1):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return f1_score(labels_flat, preds_flat, average=average_f1)

def accuracy_score_func(preds, labels):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return metrics.accuracy_score(labels_flat, preds_flat)

def classification_report_func(preds, labels):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    report = metrics.classification_report(labels_flat,preds_flat)
    print(report)

def confusion_matrix_class(labels, preds):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return confusion_matrix(labels_flat,preds_flat)

def accuracy_per_class(preds, labels):
    label_dict_inverse = {v: k for k, v in label_dict.items()}

    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()

    for label in np.unique(labels_flat):
        y_preds = preds_flat[labels_flat==label]
        y_true = labels_flat[labels_flat==label]
        print(f'Class: {label_dict_inverse[label]}')
        print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)}\n')

In [54]:
def fit(model, dataloader_train, dataloader_val, device='cuda', epochs=10):

    optimizer = AdamW(model.parameters(), lr=1e-5, eps=1e-8)

    scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps=0,
                                            num_training_steps=len(dataloader_train)*epochs)

    model.to(device)

    history = {
      'train': {
        'f1_weighted': [], 'f1_micro': [], 'f1_macro': [], 'acc': [], 'loss': []
      },
      'val': {
        'f1_weighted': [], 'f1_micro': [], 'f1_macro': [], 'acc': [], 'loss': []
      }
    }

    best_model = None
    best_metric = None

    for epoch in tqdm.tqdm(range(1, epochs+1)):
        
        model.train()

        loss_train_total = 0

        progress_bar = tqdm.tqdm(dataloader_train, desc='Epoch {:1d}'.format(epoch), leave=False, disable=False)
        
        for batch in progress_bar:

            model.zero_grad()

            batch = tuple(b.to(device) for b in batch)

            inputs = {'input_ids':      batch[0],
                      'attention_mask': batch[1],
                      'labels':         batch[2],
                    }

            outputs = model(**inputs)

            loss = outputs[0]
            loss_train_total += loss.item()
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

            optimizer.step()
            scheduler.step()

            progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(batch))})

        tqdm.tqdm.write(f'\nEpoch {epoch}')

        loss_train_avg = loss_train_total/len(dataloader_train)

        train_loss, predictions_train, true_train = evaluate(model, dataloader_train, device=device)
        val_loss, predictions_val, true_val = evaluate(model, dataloader_val, device=device)

        tqdm.tqdm.write(f'Train loss: {train_loss}, Val loss: {val_loss}')

        acc_train = accuracy_score_func(predictions_train, true_train)
        f1_micro_train = f1_score_func(predictions_train, true_train, 'micro')
        f1_macro_train = f1_score_func(predictions_train, true_train, 'macro')
        f1_weighted_train = f1_score_func(predictions_train, true_train, 'weighted')

        acc_val = accuracy_score_func(predictions_val, true_val)
        f1_micro_val = f1_score_func(predictions_val, true_val, 'micro')
        f1_macro_val = f1_score_func(predictions_val, true_val, 'macro')
        f1_weighted_val = f1_score_func(predictions_val, true_val, 'weighted')

        history['train']['acc'].append(acc_train)
        history['train']['loss'].append(train_loss)
        history['train']['f1_macro'].append(f1_micro_train)
        history['train']['f1_micro'].append(f1_macro_train)
        history['train']['f1_weighted'].append(f1_weighted_train)

        history['val']['acc'].append(acc_val)
        history['val']['loss'].append(val_loss)
        history['val']['f1_macro'].append(f1_micro_val)
        history['val']['f1_micro'].append(f1_macro_val)
        history['val']['f1_weighted'].append(f1_weighted_val)

        if best_model is None or f1_weighted_val > best_metric:
            best_metric = f1_weighted_val
            best_model = copy.copy(model)
            print(f'best = {epoch} - {np.round(f1_weighted_val, 2)}')

    return best_model, history

In [None]:
def save_results(history, best_model, doc_name, dataloader_test):

    plt.figure(figsize=(12,10))

    epochs = list(range(len(history['train']['loss'])))

    subplots = 1
    metrics = ['loss', 'acc', 'f1_macro', 'f1_micro', 'f1_weighted']

    for m in metrics:

        plt.subplot(3,2,subplots)
        plt.plot(epochs, history['train'][m], '.-', label='train')
        plt.plot(epochs, history['val'][m], '.-', label='val')
        plt.ylabel(m)

        plt.legend()

        subplots += 1

    plt.tight_layout()
    plt.savefig(f'./results/history-{doc_name}.png')
    plt.clf()

    test_loss, predictions, true_test = evaluate(best_model, dataloader_test, device='cuda')

    preds = np.argmax(predictions, axis=1)

    print(doc_name)
    print(classification_report(true_test, preds))
    file = open(f'./results/classification-report-{doc_name}.txt', 'w')
    file.write(str(classification_report(true_test, preds)))

    plt.figure(figsize=(6,5))

    cm = confusion_matrix_class(true_test, predictions)

    sns.heatmap(cm, annot=True, fmt='d', cmap='viridis')

    plt.xticks([0.5, 1.5], ['Outros', doc_name])
    plt.yticks([0.5, 1.5], ['Outros', doc_name])
    plt.xlabel('Predito', fontsize=14)
    plt.ylabel('Real', fontsize=14)
    plt.savefig(f'./results/cm-{doc_name}.png')
    plt.clf()

In [None]:
documents = ['Despacho ou decisão saneador(a)', 'Decisão Inicial', 'Réplica', 'Petição Inicial']

for doc in documents:

    print(doc)
    print('split train val test')
    X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split_dataset(df_data, doc)

    address_model = MODEL
    tokenizer = BertTokenizer.from_pretrained(address_model)
    model = BertForSequenceClassification.from_pretrained(address_model, num_labels=2, output_attentions=False, output_hidden_states=False)

    print('prepare dataloaders')
    dataloader_train = prepare_dataloader(tokenizer, X_train, y_train, token_size=512, batch_size=8)
    dataloader_val   = prepare_dataloader(tokenizer, X_val, y_val, token_size=512, batch_size=8)
    dataloader_test  = prepare_dataloader(tokenizer, X_test, y_test, token_size=512, batch_size=8)

    print('fit model')
    best_model, history = fit(model, dataloader_train, dataloader_val, device='cuda', epochs=8)

    print('save model')
    model_name = address_model.replace('/', '_')
    best_model.save_pretrained(f'./results/{model_name}_classbin-{doc}.model')

    save_results(history, best_model, doc, dataloader_test)