# Imports

In [None]:
import torch
import numpy as np
import pandas as pd
import torch.nn as nn

from pprint import pprint
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import recall_score, precision_score
from imblearn.metrics import specificity_score


# Configuração e hiperparâmetros

In [None]:
VOCAB_SIZE = 8000
D_MODEL = 384     #768
N_HEADS = 6       #12
N_LAYERS = 2      #12
DROPOUT = 0.1
MAX_EPOCHS = 20
BATCH_SIZE = 32
BLOCK_SIZE = 256  #512
LEARNING_RATE = 1e-4
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# 1. Classes Utilitárias

## 1.1 Tokenizador

In [None]:
import re
from collections import Counter

class MostFrequentWordsTokenizer:
    def __init__(self, vocab_size):
        self.vocab_size = vocab_size
        self.token_to_idx = {}
        self.idx_to_token = {}

    def clean_sentence(self, sentence):
        sentence = sentence.lower()
        return re.sub(r'[^a-z0-9\s]', '', sentence)

    def build_vocab(self, sentences):
        words = []
        for sentence in sentences:
            words.extend(self.clean_sentence(str(sentence)).split())

        word_counts = Counter(words)
        most_common_words = word_counts.most_common(self.vocab_size - 4)
        
        self.token_to_idx = {'[PAD]': 0, '[UNK]': 1, '[CLS]': 2, '[SEP]': 3} # Tokens especiais
        for i, (word, _) in enumerate(most_common_words, 4):
            self.token_to_idx[word] = i

        self.idx_to_token = {i: w for w, i in self.token_to_idx.items()}

    def tokenize(self, sentence):
        cleaned_sentence = self.clean_sentence(str(sentence))
        return cleaned_sentence.split()

    def convert_tokens_to_ids(self, tokens):
        return [self.token_to_idx.get(token, self.token_to_idx['[UNK]']) for token in tokens]
    
    def encode(self, sentence):
        tokens = ['[CLS]'] + self.tokenize(sentence) + ['[SEP]']
        return self.convert_tokens_to_ids(tokens)
    
    def decode(self, ids):
        return ' '.join([self.idx_to_token.get(i, '[UNK]') for i in ids])

    def get_vocab_size(self):
        return len(self.token_to_idx)


## 1.2 Camadas do Transformer

### 1.2.1 Attention

In [None]:
class SingleHeadAttention(nn.Module):
    def __init__(self, d_model, head_dim, dropout):
        super().__init__()
        self.head_dim = head_dim
        self.fc_q = nn.Linear(d_model, head_dim)
        self.fc_k = nn.Linear(d_model, head_dim)
        self.fc_v = nn.Linear(d_model, head_dim)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer('scale', torch.sqrt(torch.FloatTensor([head_dim])))

    def forward(self, x, mask=None):
        Q, K, V = self.fc_q(x), self.fc_k(x), self.fc_v(x)
        qV = torch.matmul(Q, K.transpose(-2, -1)) / self.scale
        if mask is not None:
            qV = qV.masked_fill(mask == 0, -1e10)
        attention = torch.softmax(qV, dim=-1)
        return torch.matmul(self.dropout(attention), V) 

### 1.2.2 Multi Head Attention

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout):
        super().__init__()
        assert d_model % n_heads == 0

        self.heads = nn.ModuleList([
            SingleHeadAttention(d_model, d_model // n_heads, dropout)
            for _ in range(n_heads)
        ])

        self.fc_out = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        head_outputs = [head(x, mask) for head in self.heads]
        concatenated = torch.cat(head_outputs, dim=-1)
        return self.fc_out(concatenated)

### 1.2.3 Feed Forward

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, ff_dim, dropout):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(ff_dim, d_model)
        )

    def forward(self, x):
        return self.net(x)

# 2. Carregando o dataset

In [None]:
df = pd.read_csv('transcription.csv')

## 2.2 Remove linhas sem transcrição ou especialidade

In [None]:
df.dropna(subset=['transcription', 'medical_specialty'], inplace=True)

## 2.1 Seleciona especialidades 

In [None]:
df = df[df['medical_specialty'].isin(
  [' Cardiovascular / Pulmonary',
   ' Orthopedic', ' Radiology', ' General Medicine', ' Gastroenterology', 'Neurology',
  ])].reset_index(drop=True)
df['medical_specialty'].value_counts()


In [None]:
pprint(df['transcription'][50][:475])
print(f'Especialidade: {df["medical_specialty"][50]}')

# 3. Pré-processamento

## 3.1 Transformando especialidade em números

In [None]:
labels = sorted(df['medical_specialty'].unique())

label_to_int = {label: i for i, label in enumerate(labels)}
int_to_label = {i: label for label, i in label_to_int.items()}

df['label'] = df['medical_specialty'].map(label_to_int)
df['label'].value_counts()

In [None]:
NUM_CLASSES = len(labels)
print(f"Número de classes: {NUM_CLASSES}")

## 3.2 Divindo conjunto de treino e validação

In [None]:
X_train, X_val, y_train, y_val = train_test_split(
    df['transcription'], df['label'], test_size=0.2, random_state=42, stratify=df['label']
)
print(f"Treino: {len(X_train)}")
print(f"Validação: {len(X_val)}")

## 3.3 Tokenizando o texto

In [None]:
tokenizer = MostFrequentWordsTokenizer(vocab_size=VOCAB_SIZE)
tokenizer.build_vocab(X_train)
print(f"\nTamanho do vocabulário: {tokenizer.get_vocab_size()}")

tokenizer.tokenize("Joe broke his leg playing soccer.")

## 3.4 Criando um dataset para carregar o texto

In [None]:
class SkinLesionDataset(Dataset):
    def __init__(self, sentences, labels, tokenizer, block_size):
        self.sentences = sentences
        self.labels = labels
        self.tokenizer = tokenizer
        self.block_size = block_size

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = str(self.sentences.iloc[idx])
        label = self.labels.iloc[idx]

        # Tokeniza a frase
        token_ids = self.tokenizer.encode(sentence)

        # Trunca frases maiores que block_size
        token_ids = token_ids[:self.block_size] 

        # Completa frases menores que block_size com [PAD]
        id_pad_token = self.tokenizer.convert_tokens_to_ids(['[PAD]'])
        padding_len = self.block_size - len(token_ids) 
        token_ids = token_ids + id_pad_token * padding_len

        # Cria máscara de atenção para ignorar tokens de padding
        attention_mask = [1 if id != id_pad_token else 0 for id in token_ids]
        attention_mask = torch.tensor(attention_mask, dtype=torch.long).unsqueeze(0)

        return {
            'text': torch.tensor(token_ids, dtype=torch.long),
            'mask': attention_mask,
            'label': torch.tensor(label, dtype=torch.long)
        }

train_dataset = SkinLesionDataset(X_train, y_train, tokenizer, BLOCK_SIZE)
val_dataset = SkinLesionDataset(X_val, y_val, tokenizer, BLOCK_SIZE)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)


In [None]:
max_len = 0
for batch in train_dataset:
  text = batch['text']
  if len(text) > max_len:
    max_len = len(text)
print(f'Maior sequência de treino: {max_len}')

# 4. Bidirectional Encoder Representations from Transformers (BERT)

## 4.1 Implementando Encoder do Transformer

In [None]:
class TransformerEncoderBlock(nn.Module):
    def __init__(self, d_model, n_heads, ff_dim, dropout):
        super().__init__()
        self.attention = MultiHeadAttention(d_model, n_heads, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.ff = FeedForward(d_model, ff_dim, dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_output = self.attention(x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.ff(x)
        return self.norm2(x + self.dropout(ff_output))

## 4.2 Implementando BERT

In [None]:
class BERT(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, n_heads, num_classes, block_size, dropout):
        super().__init__()
        self.input_embedding = nn.Embedding(vocab_size, d_model)
        self.position_embedding = nn.Embedding(block_size, d_model)
        self.layers = nn.ModuleList([
            TransformerEncoderBlock(d_model, n_heads, d_model * 4, dropout)
            for _ in range(n_layers)
        ])
        self.dropout = nn.Dropout(dropout)
        self.embed_layer_norm = nn.LayerNorm(d_model)
        self.linear = nn.Linear(d_model, num_classes)
        self.register_buffer('scale', torch.sqrt(torch.FloatTensor([d_model])))
        self.register_buffer('pos', torch.arange(block_size).unsqueeze(0))

    def forward(self, src, src_mask):
        input_emb = self.input_embedding(src) * self.scale
        positional_emb = self.position_embedding(self.pos)
        x = self.dropout(self.embed_layer_norm(input_emb + positional_emb))
        for layer in self.layers:
            x = layer(x, src_mask)
        cls_output = x[:, 0, :]
        return self.linear(cls_output)

# 5. Treinamento

In [None]:
class Trainer:
	def __init__(self, device, save_name, weights):
		self.device = device
		self.criterion = nn.CrossEntropyLoss(weight=weights)
		self.save_name = save_name

	def fit(self, model, learning_rate, max_epochs, train_loader, val_loader):
		# configura o otimizador
		optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.98), eps=1e-9)

		print("\nIniciando Treinamento...")

		self.min_loss = float('inf')
		for epoch in range(max_epochs):
			# treina uma época
			train_loss = self._train_epoch(model, train_loader, optimizer, self.criterion)

			# avalia no conjunto de validação
			eval_validation = self.evaluate(model, val_loader)

			# se a loss de validação for a menor já vista
			if eval_validation['loss'] < self.min_loss:
				self.min_loss = eval_validation['loss']
				# salva o modelo com menor loss de validação
				torch.save(model.state_dict(), f'{self.save_name}')

			print(f"Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Val. Loss: {eval_validation['loss']:.3f} | Val. Recall: {eval_validation['recall']*100:.2f}% | Val. FPR: {eval_validation['fpr']*100:.2f}%")

		# Após treinar, carrega o melhor modelo salvo
		model.load_state_dict(torch.load(self.save_name))

	def _train_epoch(self, model, iterator, optimizer, criterion):
		model.train()
		epoch_loss = 0
		for batch in iterator:
			output, label = self.get_logits_targets(model, batch)
			optimizer.zero_grad()
			loss = criterion(output, label)
			loss.backward()
			optimizer.step()
			epoch_loss += loss.item()
		return epoch_loss / len(iterator)

	def evaluate(self, model, iterator):
		model.eval()

		epoch_loss = 0
		all_preds, all_labels = [], []

		with torch.no_grad():
			for batch in iterator:
				output, label = self.get_logits_targets(model, batch)
				loss = self.criterion(output, label)
				epoch_loss += loss.item()
				preds = torch.argmax(output, dim=1)
				all_preds.extend(preds.cpu().numpy())
				all_labels.extend(label.cpu().numpy())

		all_preds = np.array(all_preds)
		all_labels = np.array(all_labels)

		recall = recall_score(all_labels, all_preds, average='macro', zero_division=0)
		precision = precision_score(all_labels, all_preds, average='macro', zero_division=0)
		fpr = 1 - specificity_score(all_labels, all_preds, average='macro')

		return {
			'fpr': fpr,
			'loss': epoch_loss / len(iterator),
			'recall': recall,
			'precision': precision,
			'all_preds': all_preds,
			'all_labels': all_labels,
		}

	def get_logits_targets(self, model, batch):
		x, label, mask = batch['text'].to(self.device), batch['label'].to(self.device), batch['mask'].to(self.device) 
		return model(x, mask), label

## 5.1 Defindo pesos para cada especialidade

In [None]:
def get_class_weights(y_train, int_to_label, device):
  weights = 1 / (torch.bincount(torch.tensor(y_train)) / len(y_train)).to(device)
  print(f"Peso da classe:")
  for i, weight in enumerate(weights):
      print(f"'{int_to_label[i]}': {weight:.2f}")

weights = get_class_weights(y_train.values, int_to_label, DEVICE)

## 5.2 Otimizando o modelo

In [None]:
model = BERT(
    vocab_size=tokenizer.get_vocab_size(),
    d_model=D_MODEL,
    n_layers=N_LAYERS,
    n_heads=N_HEADS,
    num_classes=NUM_CLASSES,
    block_size=BLOCK_SIZE,
    dropout=DROPOUT
).to(DEVICE)

trainer = Trainer(device=DEVICE, save_name='mini_bert_anamnese.pth', weights=weights)
trainer.fit(model,
            LEARNING_RATE,
            MAX_EPOCHS,
            train_loader,
            val_loader,
        )

# 6. Avaliação

In [None]:
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=True)
eval_validation = next(iter(val_loader))
text, mask, label = eval_validation['text'].to(DEVICE), eval_validation['mask'].to(DEVICE), eval_validation['label'].to(DEVICE)

model.eval()
with torch.no_grad():
    pprint(tokenizer.decode(text[0, :].cpu().numpy().tolist()))
    outputs = model(text, mask)
    predictions = torch.argmax(outputs, dim=1)
    print(f'Predição: {[int_to_label[p.item()] for p in predictions]}')
    print(f'Verdadeiro: {[int_to_label[l.item()] for l in label]}')

# Exercícios

1. O número máximo de tokens na sequência (BLOCK_SIZE) utilizado é adequado? Muitas sentenças estão sendo truncadas?

2. Aumente o BLOCK_SIZE. Qual valor máximo suportado pela memória da GPU? E com o tokenizador baseado em palavras?

3. Adapte o código acima para outra tarefa de classificação na sua área de pesquisa/atuação.