In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import tiktoken
import matplotlib.pyplot as plt
import time
import os
import glob

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Rodando no dispositivo: {device}")

In [None]:
# HIPERPARÂMETROS v6 

# O Contexto (Memória de curto prazo)
# Mantemos 256 para economizar VRAM e investir em "Inteligência" (n_embd)
block_size = 256      

# O Tamanho do Lote
# Reduzimos de 32 para 16 porque o modelo ficou "gordo" e profundo.
# Se deixar 32 com a config abaixo, vai dar "Out of Memory".
batch_size = 16       

# Duração do Treino
# Aumentamos para dar tempo de ler o BrWaC
max_iters = 10000     

# Taxa de Aprendizado
learning_rate = 3e-4  

# Frequência de Avaliação
eval_iters = 200      

# A Largura (Inteligência/Vocabulário)
# Aumentamos de 256 para 384. 
n_embd = 384          

# A Profundidade (Raciocínio)
n_layer = 8           

# Cabeças de Atenção
# 384 dividido por 6 = 64
n_head = 6            

# Dropout
# Reduzido para 0.1 pois agora temos muitos dados (BrWaC), 
dropout = 0.1

In [None]:
# Pega TODOS os arquivos .txt da pasta data/
files = glob.glob('data/*.txt')
print(f"Encontrados {len(files)} arquivos para treinamento.")

# Palavras para banir
blacklist = [
    "Página", "Page", 
    "Colleen Hoover", "Machado de Assis", 
    "Sumário", "Capítulo", "Copyright", 
    "Todos os direitos reservados"
]

all_text_content = "" # Vamos acumular tudo numa string gigante

for file_name in files:
    try:
        with open(file_name, 'r', encoding='utf-8') as f:
            raw_text = f.read()
            
            # FASE DE LIMPEZA
            clean_lines = []
            lines = raw_text.split('\n')
            
            for line in lines:
                line = line.strip()
                
                # Filtros de Lixo (Números de página, linhas vazias)
                if not line: continue
                if line.isdigit(): continue # Remove "12", "45"
                
                # Filtro de Blacklist (Cabeçalhos repetidos)
                if any(bad_word in line for bad_word in blacklist):
                    continue
                
                # Filtro de tamanho (Evita linhas com 1 letra que não sejam pontuação)
                if len(line) < 2 and line not in ['.', '?', '!', '—']:
                    continue

                clean_lines.append(line)
            
            # Juntar as linhas limpas deste livro
            book_text = "\n".join(clean_lines)
            
            # Adicionar ao texto total com o marcador especial
            # O marcador <|endoftext|> avisa o modelo que a história acabou
            all_text_content += book_text + " <|endoftext|> \n"
            
            print(f" - Processado: {file_name} ({len(clean_lines)} linhas úteis)")

    except Exception as e:
        print(f"Erro ao ler {file_name}: {e}")

text = all_text_content
print(f"Dataset FINAL criado com {len(text)/1e6:.2f} MB de texto.")

In [None]:
# TOKENIZAÇÃO TIKTOKEN
print("Tokenizando com GPT-2 BPE")
enc = tiktoken.get_encoding("gpt2")
encoded_data = enc.encode(text, allowed_special={"<|endoftext|>"})
vocab_size = enc.n_vocab # Isso será 50257 (Padrão GPT-2)
print(f"Tamanho do Vocabulário: {vocab_size} tokens únicos")
print(f"Total de Tokens para treino: {len(encoded_data)}")

In [None]:
# Em vez de pegar os primeiros 90% (que é só Machado) e testar nos 10% (que é só SAC),
# vamos criar pedaços aleatórios.
data = torch.tensor(encoded_data, dtype=torch.long)

# Dividir em blocos aleatórios para garantir mistura de domínios
# Mas como estamos num modelo simples de sequencia, o jeito mais fácil e seguro
# para manter a coerência do texto é garantir que o split seja feito DEPOIS de misturar 
# se tivéssemos amostras independentes.
# Como é um texto contínuo, vamos fazer o seguinte:

n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

In [None]:
# Infraestrutura do Modelo Transformer
def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    return x.to(device), y.to(device)

@torch.no_grad()
def estimate_metrics():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        accuracies = torch.zeros(eval_iters)

        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()

            probs = F.softmax(logits, dim=-1)
            pred = torch.argmax(probs, dim=-1)
            acc = (pred == Y.view(-1)).float().mean()
            accuracies[k] = acc.item()

        out[split] = {
            'loss': losses.mean(),
            'acc': accuracies.mean()
        }
    model.train()
    return out

class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)   
        q = self.query(x) 
        wei = q @ k.transpose(-2, -1) * C**-0.5 
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) 
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)
        out = wei @ v 
        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)
        
    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x
        
class GPTLanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        # Anteriormente no gpt-v3 o vocab_size era 95 vocab_size agora é 50257
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, index, targets=None):
        B, T = index.shape
        tok_emb = self.token_embedding_table(index) 
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) 
        x = tok_emb + pos_emb
        x = self.blocks(x) 
        x = self.ln_f(x) 
        logits = self.lm_head(x) 

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        """
        temperature: 1.0 = normal. >1.0 = mais criativo/doido. <1.0 = mais focado/conservador.
        top_k: Se definido (ex: 50), só escolhe entre as 50 melhores palavras. Evita erros bizarros.
        """
        for _ in range(max_new_tokens):
            # Corta o contexto se passar do block_size
            idx_cond = idx if idx.size(1) <= block_size else idx[:, -block_size:]
            
            # Pega as predições
            logits, _ = self(idx_cond)
            
            # Foca apenas no último passo de tempo e aplica temperatura
            logits = logits[:, -1, :] / temperature
            
            # Opcional: Corta as opções ruins (Top-K Sampling)
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf')
                
            # Calcula probabilidades
            probs = F.softmax(logits, dim=-1)
            
            # Sorteia o próximo token
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

In [None]:
# Execução e Checkpointing
def save_checkpoint(step, model, optimizer, loss_train, loss_val, acc_train, acc_val, filename="checkpoint_gpt_v6.pth"):
    checkpoint = {
        'step': step,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss_train': loss_train,
        'loss_val': loss_val,
        'acc_train': acc_train,
        'acc_val': acc_val
    }
    torch.save(checkpoint, filename)
    print(f"Checkpoint salvo em step {step}")

def load_checkpoint(model, optimizer, filename="checkpoint_gpt_v6.pth"):
    if os.path.exists(filename):
        print(f"Carregando checkpoint '{filename}'...")
        checkpoint = torch.load(filename, map_location=device)

        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

        step = checkpoint['step']
        loss_train = checkpoint.get('loss_train', [])
        loss_val = checkpoint.get('loss_val', [])

        acc_train = checkpoint.get('acc_train', []) 
        acc_val = checkpoint.get('acc_val', [])

        print(f"Retomando do step {step}")
        return step, loss_train, loss_val, acc_train, acc_val
    return 0, [], [], [], []

model = GPTLanguageModel()
m = model.to(device)
print(f"Modelo PT-BR criado com {sum(p.numel() for p in m.parameters())/1e6:.2f} M parâmetros")

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

# Decaimento cosseno
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=max_iters, eta_min=1e-5)

# Tenta carregar checkpoint
start_iter, loss_history_train, loss_history_val, acc_history_train, acc_history_val = load_checkpoint(model, optimizer)

print("Iniciando treinamento em Português...")
start_time = time.time()

for iter in range(start_iter, max_iters):
    
    if iter % eval_iters == 0:
        metrics = estimate_metrics()

        t_loss, v_loss = metrics['train']['loss'], metrics['val']['loss']
        t_acc, v_acc = metrics['train']['acc'], metrics['val']['acc']

        print(f"step {iter}: loss {t_loss:.3f}/{v_loss:.3f} | acc {t_acc:.3f}/{v_acc:.3f}")

        # Atualiza históricos
        loss_history_train.append(t_loss)
        loss_history_val.append(v_loss)
        acc_history_train.append(t_acc)
        acc_history_val.append(v_acc)
        
        # Salvar checkpoint
        save_checkpoint(iter, model, optimizer, 
                        loss_history_train, loss_history_val, 
                        acc_history_train, acc_history_val)

    # Backpropagation padrão
    xb, yb = get_batch('train')
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    optimizer.step()
    scheduler.step()

end_time = time.time()
print(f"Treinamento finalizado em {(end_time - start_time)/60:.2f} minutos.")

In [None]:
# Plotagem do Gráfico
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Gráfico 1: Loss
ax1.plot(loss_history_train, label='Treino')
ax1.plot(loss_history_val, label='Validação')
ax1.set_title('Evolução da Loss (Entropia Cruzada)')
ax1.set_xlabel(f'Iterações (x {eval_iters})')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True)

# Gráfico 2: Accuracy
ax2.plot(acc_history_train, label='Treino')
ax2.plot(acc_history_val, label='Validação')
ax2.set_title('Evolução da Accuracy (Precisão de Tokens)')
ax2.set_xlabel(f'Iterações (x {eval_iters})')
ax2.set_ylabel('Accuracy (0.0 a 1.0)')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.savefig('grafico_completo_metrics.png')
print("Gráfico salvo: grafico_completo_metrics.png")

In [None]:
# Geração de Texto
print("\n" + "="*30)
print("GERANDO TEXTO EM PORTUGUÊS")
print("="*30 + "\n")

model.eval()

In [None]:
# Gerar texto a partir do vazio
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated_ids = model.generate(context, max_new_tokens=200, temperature=0.8, top_k=50)

# Decode usando tiktoken
print(enc.decode(generated_ids[0].tolist()))

# Salvando
torch.save(model.state_dict(), 'gpt_ptbr_v6.pth')
print("Modelo salvo como gpt_ptbr_v6.pth")