In [None]:
import numpy as np
import tensorflow as tf
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import re
from tqdm import tqdm

from transformers import BertTokenizer, BertModel
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.metrics import confusion_matrix, accuracy_score
from scipy.stats import pearsonr

!pip install transformers torch


In [None]:
np.random.seed(42)

SAMPLES = 10000
BATCH_SIZE = 16
EPOCHS = 3

# Data pre processing

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
path = '/content/drive/MyDrive/B2W-Reviews01.csv'

In [None]:
df_raw = pd.read_csv(path)

In [None]:
def calculate_vowel_density(text):
    text = re.sub(r'[^A-Za-zÇçÃãÁáÉéÍíÓóÚúÀàÊêÔô]', '', text)  # remove non alphabetical chars
    total_letters = len(text)
    total_vowels = len(re.findall(r'[AEIOUaeiouÁÉÍÓÚáéíóúÀàÃãÊêÔô]', text))
    return total_vowels / total_letters if total_letters > 0 else 0

In [None]:
def preprocessing(df):
    cols = ['review_text']
    df = df[cols]
    df.rename(columns={'review_text': 'text'}, inplace=True)
    df = df.dropna()
    return df

df = preprocessing(df_raw)

# Regression on vowel quantities

In [None]:
df['density'] = df['text'].apply(calculate_vowel_density)

In [None]:
# 10 000  samples to speed up
df = df.sample(n=SAMPLES)
print(len(df))

In [None]:
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
train_df, val_df  = train_test_split(train_df, test_size=0.25, random_state=42)

print(len(train_df), 'train examples')
print(len(val_df), 'validation examples')
print(len(test_df), 'test examples')

In [None]:
len(train_df)

## BERTimbau fine tunning

In [None]:
# Tokenizer and BERTimbau
tokenizer = BertTokenizer.from_pretrained('neuralmind/bert-base-portuguese-cased')
model = BertModel.from_pretrained('neuralmind/bert-base-portuguese-cased')

class TextDataset(Dataset):
    def __init__(self, text, density):
        self.text    = text
        self.density = density

    def __len__(self):
        return len(self.text)

    def __getitem__(self, idx):
        text = self.text[idx]
        density = self.density[idx]
        inputs = tokenizer(text, return_tensors='pt', max_length=512, truncation=True, padding='max_length')
        return inputs, torch.tensor(density, dtype=torch.float)

# TextoDataset
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_dataset = TextDataset(train_df['text'], train_df['density'])
validate_dataset = TextDataset(val_df['text'], val_df['density'])
test_dataset = TextDataset(test_df['text'], test_df['density'])

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
validate_loader = DataLoader(validate_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)


In [None]:
class BertForRegression(nn.Module):
    def __init__(self, bert_model):
        super(BertForRegression, self).__init__()
        self.bert = bert_model
        self.regressor = nn.Linear(768, 1)  # 768 is the dimensions of the characterist vector of BERT

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        return self.regressor(pooled_output)

model = BertForRegression(model)


In [None]:
device = torch.device("cuda")
model.to(device)

# Loss criterion and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)

In [None]:
def train_model(model, criterion, optimizer, train_loader, validate_loader, epochs=3):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        # Train loop with tqdm
        train_loop = tqdm(train_loader, leave=True)
        for inputs, labels in train_loop:
            input_ids = inputs['input_ids'].to(device)
            attention_mask = inputs['attention_mask'].to(device)
            labels = labels.to(device)

            input_ids = input_ids.view(-1, input_ids.size(-1))
            attention_mask = attention_mask.view(-1, attention_mask.size(-1))

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            outputs = outputs.view(-1, 1)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            # Update tqdm description with current loss
            train_loop.set_description(f'Epoch {epoch+1}')
            train_loop.set_postfix(loss=loss.item())

        model.eval()
        val_loss = 0.0
        print('\n Validating...')

        # Validation loop with tqdm
        validate_loop = tqdm(validate_loader, leave=True)
        with torch.no_grad():
            for inputs, labels in validate_loop:
                input_ids = inputs['input_ids'].to(device)
                attention_mask = inputs['attention_mask'].to(device)
                labels = labels.to(device)

                input_ids = input_ids.view(-1, input_ids.size(-1))
                attention_mask = attention_mask.view(-1, attention_mask.size(-1))

                outputs = model(input_ids, attention_mask)
                outputs = outputs.view(-1, 1)

                # Colocar ideia aqui
                loss = criterion(outputs, labels)
                val_loss += loss.item()


                validate_loop.set_postfix(val_loss=loss.item())

        print(f"Epoch {epoch+1}, Training Loss: {running_loss/len(train_loader)}, Validation Loss: {val_loss/len(validate_loader)}")


In [None]:
train_model(model, criterion, optimizer, train_loader, validate_loader, epochs=EPOCHS)

In [None]:
# Saving the entire model
torch.save(model, 'reg_model.pth')

In [None]:
print(model)

In [None]:
def rmse(y_true, y_pred):
    return np.sqrt(mean_squared_error(y_true, y_pred))

def mae(y_true, y_pred):
    return mean_absolute_error(y_true, y_pred)

def mape(y_true, y_pred):
    y_true += 1e-5 # Numeric problems
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def pearson_correlation(y_true, y_pred):
    corr, _ = pearsonr(y_true, y_pred)
    return corr

In [None]:
def eval_test(model, test_loader):
    y_pred_model = []
    y_true       = []

    # Evaluating the model of density of vowels per sentence on the test set
    model.eval()
    with torch.no_grad():
        for inputs, labels in test_loader:
            input_ids = inputs['input_ids'].to(device)
            attention_mask = inputs['attention_mask'].to(device)
            labels = labels.to(device)

            input_ids = input_ids.view(-1, input_ids.size(-1))
            attention_mask = attention_mask.view(-1, attention_mask.size(-1))

            outputs = model(input_ids, attention_mask)
            outputs = outputs.view(-1, 1)

            y_pred_model.extend(outputs.view(-1).cpu().numpy())
            y_true.extend(labels.cpu().numpy())

    # Convert predictions and true values to numpy arrays
    y_pred_model = np.array(y_pred_model)
    y_true = np.array(y_true)
    return y_true, y_pred_model

In [None]:
# Evaluating the model of density of vowels per sentence on the test set
y_true_sentence, y_pred_model_sentence = eval_test(model, test_loader)

In [None]:
# Evaluating the model of density of vowels on the global density of the test
# Use the train dataset to prevent leakage
global_density = list(train_df.text.values)
separator = ' '
global_density = calculate_vowel_density(separator.join(global_density))

test_df2 = test_df.copy()
test_df2 = test_df2.reset_index(drop=True)
test_df2['density'] = global_density

test_dataset2 = TextDataset(test_df2['text'], test_df2['density'])
test_loader2 = DataLoader(test_dataset2, batch_size=BATCH_SIZE)

y_true_global, y_pred_model_global = eval_test(model, test_loader2)

In [None]:
# Evaluating the model of density of vowels on the first word density
def get_first_word_density(text):
    list_word = text.split(' ')
    word = list_word[0]
    return calculate_vowel_density(word)

first_density = list(test_df['text'].apply(lambda x : get_first_word_density(x)).values)

test_df3 = test_df.copy()
test_df3 = test_df3.reset_index(drop=True)
test_df3['density'] = first_density

test_dataset3 = TextDataset(test_df3['text'], test_df3['density'])
test_loader3 = DataLoader(test_dataset3, batch_size=BATCH_SIZE)

y_true_first, y_pred_model_first = eval_test(model, test_loader3)

In [None]:
# Evaluating the model of density of vowels on the last word density
def get_last_word_density(text):
    list_word = text.split(' ')
    word = list_word[len(list_word)-1]
    return calculate_vowel_density(word)

last_density = list(test_df['text'].apply(lambda x : get_last_word_density(x)).values)

test_df4 = test_df.copy()
test_df4 = test_df4.reset_index(drop=True)
test_df4['density'] = last_density

test_dataset4 = TextDataset(test_df4['text'], test_df4['density'])
test_loader4 = DataLoader(test_dataset4, batch_size=BATCH_SIZE)

y_true_last, y_pred_model_last = eval_test(model, test_loader4)


In [None]:
def get_metrics(y_true, y_pred_model):
    return {
        "RMSE": rmse(y_true, y_pred_model),
        "MAE": mae(y_true, y_pred_model),
        "MAPE": mape(y_true, y_pred_model),
        "R2": r2_score(y_true, y_pred_model),
        "Pearson": pearson_correlation(y_true, y_pred_model)
    }

In [None]:
metrics_dict_first = get_metrics(y_true_first, y_pred_model_first)
metrics_dict_last = get_metrics(y_true_last, y_pred_model_last)
metrics_dict_sentence = get_metrics(y_true_sentence, y_pred_model_sentence)
metrics_dict_global = get_metrics(y_true_global, y_pred_model_global)

metrics_df = pd.DataFrame([metrics_dict_first,
                           metrics_dict_last,
                           metrics_dict_global,
                           metrics_dict_sentence], index=
                            ['Using only the first word',
                             'Using only the last word',
                             'Using the global density',
                             'Using the density per sentence'])
metrics_df

# Classification problem

In [None]:
def classify_vowel_density(density):
    if density < 1/3:
        return 0  # Class 1
    elif 1/3 <= density <= 2/3:
        return 1  # Class 2
    else:
        return 2  # Class 3

# Applying the classification function to the 'densidades' column
df['labels'] = df['density'].apply(classify_vowel_density)


In [None]:
plt.figure()
df['labels'].hist(log=True)
plt.title('Distribution of labels on log scale')
plt.show()

In [None]:
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
train_df, val_df  = train_test_split(train_df, test_size=0.25, random_state=42)

print(len(train_df), 'train examples')
print(len(val_df), 'validation examples')
print(len(test_df), 'test examples')

In [None]:
# Classe do Dataset
class TextDataset(Dataset):
    def __init__(self, text, labels):
        self.text = text
        self.labels = labels

    def __len__(self):
        return len(self.text)

    def __getitem__(self, idx):
        text = self.text[idx]
        label = self.labels[idx]
        inputs = tokenizer(text, return_tensors='pt', max_length=512, truncation=True, padding='max_length')
        input_ids = inputs['input_ids'].squeeze(0)  # Remove batch dimension
        attention_mask = inputs['attention_mask'].squeeze(0)  # Remove batch dimension
        return input_ids, attention_mask, torch.tensor(label, dtype=torch.long)

# Preparando DataFrames (assumindo que você já os tem divididos em treino, validação e teste)
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

# Substitua 'densidades' por uma coluna apropriada de etiquetas de classe
train_dataset = TextDataset(train_df['text'], train_df['labels'])
validate_dataset = TextDataset(val_df['text'], val_df['labels'])
test_dataset = TextDataset(test_df['text'], test_df['labels'])

# Criar DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
validate_loader = DataLoader(validate_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)


## Defining and training the model

## Unbalanced case

In [None]:
class BertForClassification(nn.Module):
    def __init__(self, bert_model, num_classes):
        super(BertForClassification, self).__init__()
        self.bert = bert_model
        # Assuming 768-dimensional feature vectors from BERT, with num_classes outputs
        self.classifier = nn.Linear(768, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        return self.classifier(pooled_output)

# Example usage
bert_model = BertModel.from_pretrained('bert-base-uncased')  # Use a pre-trained BERT model
model = BertForClassification(bert_model, num_classes=3)  # Specify the number of classes

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss function and optimizer for classification (using CrossEntropyLoss for multi-class)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)

In [None]:
def train_model(model, criterion, optimizer, train_loader, validate_loader, num_classes, epochs=3):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        # Train loop with tqdm
        train_loop = tqdm(train_loader, leave=True)
        for input_ids, attention_mask, labels in train_loop:
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)

            # For classification, outputs are likely to be logits, need to reshape for CrossEntropyLoss
            outputs = outputs.view(-1, num_classes)  # num_classes is the number of classes in your classification task

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        # Update tqdm description with current loss
        train_loop.set_description(f'Epoch {epoch+1}')
        train_loop.set_postfix(loss=loss.item())

        model.eval()
        val_loss = 0.0
        print('\n Validating...')

        # Validation loop with tqdm
        validate_loop = tqdm(validate_loader, leave=True)
        with torch.no_grad():
            for input_ids, attention_mask, labels in validate_loop:
                input_ids = input_ids.to(device)
                attention_mask = attention_mask.to(device)
                labels = labels.to(device)

                outputs = model(input_ids, attention_mask)
                outputs = outputs.view(-1, num_classes)  # Adjusted to match the number of classes

                loss = criterion(outputs, labels)
                val_loss += loss.item()

                validate_loop.set_postfix(val_loss=loss.item())

        print(f"Epoch {epoch+1}, Training Loss: {running_loss/len(train_loader)}, Validation Loss: {val_loss/len(validate_loader)}")

In [None]:
train_model(model, criterion, optimizer, train_loader, validate_loader, epochs=EPOCHS, num_classes=3)

In [None]:
# Saving the entire model
torch.save(model, 'classification_model_unbalanced.pth')

In [None]:
print(model)

In [None]:
def evaluate_model(model, test_loader, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for input_ids, attention_mask, labels in test_loader:
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            outputs = model(input_ids, attention_mask)
            _, preds = torch.max(outputs, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return np.array(all_preds), np.array(all_labels)

# Get predictions and true labels
preds, labels = evaluate_model(model, test_loader, device)

# Total accuracy
total_accuracy = accuracy_score(labels, preds)

# Confusion matrix
conf_matrix = confusion_matrix(labels, preds)
per_class_accuracy = conf_matrix.diagonal() / conf_matrix.sum(axis=1)
sensitivity = np.diag(conf_matrix) / np.sum(conf_matrix, axis=1)
specificity = np.diag(conf_matrix) / np.sum(conf_matrix, axis=0)

# Output the metrics
print("Total Accuracy:", total_accuracy)
print("Per Class Accuracy:", per_class_accuracy)
print("Sensitivity (Recall) per Class:", sensitivity)
print("Specificity per Class:", specificity)


## Balanced case

In [None]:
#TODO: Make the dataset baalnced
df = preprocessing(df_raw)
df.head()

In [None]:
len(df)

In [None]:
df['densidades'] = df['texto'].apply(calculate_vowel_density)
df['labels'] = df['densidades'].apply(classify_vowel_density)

In [None]:
min_label = 0
for i in list(np.unique(df.labels.values)):
    if len(df[df['labels'] == i]) < len(df[df['labels']==min_label]):
        min_label = i

# resample of the classes
samples = []
n_samples = len(df[df['labels']==min_label])
for i in list(np.unique(df.labels.values)):
    df_class = df[ df['labels'] == i ]
    samples.append( df_class.sample(n=n_samples) )

df_balanced = pd.concat(samples)
df_balanced['labels'].hist()

In [None]:
train_df, test_df = train_test_split(df_balanced, test_size=0.2, random_state=42)
train_df, val_df  = train_test_split(train_df, test_size=0.25, random_state=42)

print(len(train_df), 'train examples')
print(len(val_df), 'validation examples')
print(len(test_df), 'test examples')

In [None]:
tokenizer = BertTokenizer.from_pretrained('neuralmind/bert-base-portuguese-cased')
model = BertModel.from_pretrained('neuralmind/bert-base-portuguese-cased')

class TextDataset(Dataset):
    def __init__(self, text, labels):
        self.text = text
        self.labels = labels

    def __len__(self):
        return len(self.text)

    def __getitem__(self, idx):
        text = self.text[idx]
        label = self.labels[idx]
        inputs = tokenizer(text, return_tensors='pt', max_length=512, truncation=True, padding='max_length')
        input_ids = inputs['input_ids'].squeeze(0)  # Remove batch dimension
        attention_mask = inputs['attention_mask'].squeeze(0)  # Remove batch dimension
        return input_ids, attention_mask, torch.tensor(label, dtype=torch.long)

# Preparando DataFrames (assumindo que você já os tem divididos em treino, validação e teste)
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

# Substitua 'densidades' por uma coluna apropriada de etiquetas de classe
train_dataset = TextoDataset(train_df['text'], train_df['labels'])
validate_dataset = TextoDataset(val_df['text'], val_df['labels'])
test_dataset = TextoDataset(test_df['text'], test_df['labels'])

# Criar DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
validate_loader = DataLoader(validate_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
class BertForClassification(nn.Module):
    def __init__(self, bert_model, num_classes):
        super(BertForClassification, self).__init__()
        self.bert = bert_model
        # Assuming 768-dimensional feature vectors from BERT, with num_classes outputs
        self.classifier = nn.Linear(768, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        return self.classifier(pooled_output)

# Example usage
bert_model = BertModel.from_pretrained('bert-base-uncased')  # Use a pre-trained BERT model
model = BertForClassification(bert_model, num_classes=3)  # Specify the number of classes

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss function and optimizer for classification (using CrossEntropyLoss for multi-class)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)

In [None]:
def train_model(model, criterion, optimizer, train_loader, validate_loader, num_classes, epochs=3):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        # Train loop with tqdm
        train_loop = tqdm(train_loader, leave=True)
        for input_ids, attention_mask, labels in train_loop:
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)

            # For classification, outputs are likely to be logits, need to reshape for CrossEntropyLoss
            outputs = outputs.view(-1, num_classes)  # num_classes is the number of classes in your classification task

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        # Update tqdm description with current loss
        train_loop.set_description(f'Epoch {epoch+1}')
        train_loop.set_postfix(loss=loss.item())

        model.eval()
        val_loss = 0.0
        print('\n Validating...')

        # Validation loop with tqdm
        validate_loop = tqdm(validate_loader, leave=True)
        with torch.no_grad():
            for input_ids, attention_mask, labels in validate_loop:
                input_ids = input_ids.to(device)
                attention_mask = attention_mask.to(device)
                labels = labels.to(device)

                outputs = model(input_ids, attention_mask)
                outputs = outputs.view(-1, num_classes)  # Adjusted to match the number of classes

                loss = criterion(outputs, labels)
                val_loss += loss.item()

                validate_loop.set_postfix(val_loss=loss.item())

        print(f"Epoch {epoch+1}, Training Loss: {running_loss/len(train_loader)}, Validation Loss: {val_loss/len(validate_loader)}")

In [None]:
train_model(model, criterion, optimizer, train_loader, validate_loader, epochs=3, num_classes=EPOCHS)

In [None]:
# Saving the entire model
torch.save(model, 'classification_model_balanced.pth')

In [None]:
def evaluate_model(model, test_loader, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for input_ids, attention_mask, labels in test_loader:
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            outputs = model(input_ids, attention_mask)
            _, preds = torch.max(outputs, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return np.array(all_preds), np.array(all_labels)

# Get predictions and true labels
preds, labels = evaluate_model(model, test_loader, device)

# Total accuracy
total_accuracy = accuracy_score(labels, preds)

# Confusion matrix
conf_matrix = confusion_matrix(labels, preds)
per_class_accuracy = conf_matrix.diagonal() / conf_matrix.sum(axis=1)
sensitivity = np.diag(conf_matrix) / np.sum(conf_matrix, axis=1)
specificity = np.diag(conf_matrix) / np.sum(conf_matrix, axis=0)

# Output the metrics
print("Total Accuracy:", total_accuracy)
print("Per Class Accuracy:", per_class_accuracy)
print("Sensitivity (Recall) per Class:", sensitivity)
print("Specificity per Class:", specificity)