# GPT-2 (Linkin Park's Version)

Um finetuning do GPT 2 com músicas da banda Linkin Park.

## Criando o ambiente

In [None]:
import os
import time
import datetime

import pandas as pd
import seaborn as sns
import numpy as np
import random

import matplotlib.pyplot as plt

import torch
from torch.utils.data import Dataset, DataLoader, random_split, RandomSampler, SequentialSampler
torch.manual_seed(13)

from transformers import GPT2LMHeadModel,  GPT2Tokenizer, GPT2Config, GPT2LMHeadModel
from transformers import AdamW, get_linear_schedule_with_warmup

import nltk
nltk.download('punkt')

## Carregando e preparando os dados

In [None]:
df = pd.read_csv('linkin_park_lyrics.csv')

In [None]:
df

In [None]:
df = pd.read_csv('linkin_park_lyrics.csv').drop(columns=['track_title'])
df = df.sample(frac=1).reset_index(drop=True)
df.head()

In [None]:
print(df.lyric.iloc[0])

In [None]:
print(df.lyric.iloc[13])

In [None]:
tokenizer = GPT2Tokenizer.from_pretrained('gpt2', bos_token='<|start|>', eos_token='<|end|>', pad_token='<|pad|>')

In [None]:
class GPT2Dataset(Dataset):
    def __init__(self, txt_list, tokenizer, gpt2_type="gpt2", max_length=768):
        # Inicializa o dataset com uma lista de texto, um tokenizer e outras configurações
        self.tokenizer = tokenizer
        self.input_ids = []  # Lista para armazenar os IDs de entrada
        self.attn_masks = []  # Lista para armazenar as máscaras de atenção

        # Itera sobre cada texto na lista fornecida
        for txt in txt_list:
            # Gera a codificação do texto usando o tokenizer
            encodings_dict = tokenizer('<|start|>' + txt + '<|end|>', truncation=True, max_length=max_length, padding="max_length")

            # Adiciona os IDs de entrada e as máscaras de atenção às listas correspondentes
            self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
            self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))

    def __len__(self):
        # Retorna o tamanho do dataset, ou seja, o número de textos no dataset
        return len(self.input_ids)

    def __getitem__(self, idx):
        # Retorna uma amostra específica do dataset, representada pelos IDs de entrada e máscaras de atenção
        return self.input_ids[idx], self.attn_masks[idx]

In [None]:
# Cria um dataset usando a classe GPT2Dataset, passando uma lista de letras de música, um tokenizer e o comprimento máximo
dataset = GPT2Dataset(df.lyric, tokenizer, max_length=768)

# Define o tamanho do conjunto de treinamento como 90% do tamanho total do dataset e o tamanho do conjunto de validação como o restante
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size

# Divide aleatoriamente o dataset em conjuntos de treinamento e validação usando random_split
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Imprime o número de amostras nos conjuntos de treinamento e validação
print('{} dados de treino'.format(train_size))
print('{} dados de validação'.format(val_size))

In [None]:
# Criando os DataLoaders para nossos conjuntos de treinamento e validação

batch_size = 2

train_dataloader = DataLoader(
            train_dataset,
            sampler = RandomSampler(train_dataset), # Seleciona lotes aleatoriamente
            batch_size = batch_size
        )

# Para validação, a ordem não importa, então vamos ler os dados sequencialmente
validation_dataloader = DataLoader(
            val_dataset,
            sampler = SequentialSampler(val_dataset), # Extrai lotes sequencialmente
            batch_size = batch_size
        )

# Finetuning

In [None]:
# Definindo seeds
seed_val = 13
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

In [None]:
# Carrega a configuração pré-treinada do modelo GPT-2
configuration = GPT2Config.from_pretrained('gpt2', output_hidden_states=False)

# Inicializa o modelo GPT-2 com a configuração
model = GPT2LMHeadModel.from_pretrained("gpt2", config=configuration)

# Redimensiona os embeddings de token para corresponder ao tamanho do vocabulário do tokenizer
model.resize_token_embeddings(len(tokenizer))

# Define o dispositivo de execução para GPU
device = (torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu"))
model.to(device)

In [None]:
# Parâmetros de treinamento do modelo
epochs = 13
learning_rate = 0.0001
warmup_steps = 100
epsilon = 1e-8
sample_every = 100

In [None]:
# Cria um otimizador AdamW para ajustar os parâmetros do modelo durante o treinamento
optimizer = AdamW(model.parameters(),
                  lr = learning_rate,
                  eps = epsilon
                )

# Calcula o número total de etapas de treinamento, multiplicando o número de lotes por época
total_steps = len(train_dataloader) * epochs

# Para ajustar a taxa de aprendizado ao longo do treinamento
scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps = warmup_steps,  # Número de etapas de aquecimento
                                            num_training_steps = total_steps)  # Número total de etapas de treinamento


In [None]:
def format_time(elapsed):
    return str(datetime.timedelta(seconds=int(round((elapsed)))))

In [None]:
total_t0 = time.time()

training_stats = []

model = model.to(device)

for epoch_i in range(0, epochs):

    # =====================================================
    # ================== Treinamento ======================
    # =====================================================

    print("")
    print('======== Época {:} / {:} ========'.format(epoch_i + 1, epochs))

    t0 = time.time()        # Tempo inicial da época
    total_train_loss = 0    # Inicializa a perda total de treinamento para a época
    model.train()           # Define o modelo em modo de treinamento

    for step, batch in enumerate(train_dataloader):

        batch_input_ids = batch[0].to(device)
        batch_labels = batch[0].to(device)
        batch_masks = batch[1].to(device)

        # Zera os gradientes
        model.zero_grad()

        outputs = model(batch_input_ids,
                        labels=batch_labels,
                        attention_mask = batch_masks,
                        token_type_ids=None
                        )

        # Cálculo do loss da época
        loss = outputs[0]
        batch_loss = loss.item()
        total_train_loss += batch_loss


        if step % sample_every == 0 and not step == 0:

            # Imprime informações sobre a época
            elapsed = format_time(time.time() - t0)
            print('Batch {:>5,}  de  {:>5,}.\nLoss: {:>5,}.\nDuração: {:}.\n'.format(step, len(train_dataloader), batch_loss, elapsed))

            # Muda o modelo para o modo de avaliação
            model.eval()

            # Gera um texto de teste com a atual configuração do modelo
            sample_outputs = model.generate(
                                    bos_token_id=random.randint(1,30000),
                                    do_sample=True,
                                    top_k=50,
                                    max_length = 200,
                                    top_p=0.95,
                                    num_return_sequences=1
                                )

            for i, sample_output in enumerate(sample_outputs):
                  print("Texto gerado:\n{}".format(tokenizer.decode(sample_output, skip_special_tokens=True)))

            model.train()

        # Backpropagation e atualização dos parâmetros do modelo
        loss.backward()
        optimizer.step()
        scheduler.step()

    # Calcula o loss médio de treino
    avg_train_loss = total_train_loss / len(train_dataloader)

    # Calcula a duração do treinamento da época
    training_time = format_time(time.time() - t0)

    print("")
    print("Loss de Treino: {0:.2f}".format(avg_train_loss))
    print("Época treinada em: {:}".format(training_time))



    # =====================================================
    # =================== Validação =======================
    # =====================================================

    print("")

    t0 = time.time()

    model.eval()

    total_eval_loss = 0
    nb_eval_steps = 0


    # Validação dos dados da época atual
    for batch in validation_dataloader:

        batch_input_ids = batch[0].to(device)
        batch_labels = batch[0].to(device)
        batch_masks = batch[1].to(device)

        with torch.no_grad():

            # Forward sem calcular gradientes
            outputs  = model(batch_input_ids,
                            attention_mask = batch_masks,
                            labels=batch_labels)

            loss = outputs[0]

        batch_loss = loss.item()
        total_eval_loss += batch_loss

    # Calcula o loss de validação da época
    avg_val_loss = total_eval_loss / len(validation_dataloader)

    # Calcula a duração da validação da época
    validation_time = format_time(time.time() - t0)

    print("Loss de Validação: {0:.2f}".format(avg_val_loss))
    print("Validação processada em: {:}".format(validation_time))


    training_stats.append(
        {
            'Época': epoch_i + 1,
            'Loss de Treino': avg_train_loss,
            'Loss de Validação': avg_val_loss,
            'Tempo de treinamento': training_time,
            'Tempo de validação': validation_time
        }
    )

print("")
print("Treinamento completo!")
print("Treinamento dos dados demorou {:} (h:mm:ss)".format(format_time(time.time()-total_t0)))

In [None]:
# Tabela com informações de treino e validação
df_stats = pd.DataFrame(data=training_stats)
df_stats = df_stats.set_index('Época')

df_stats

In [None]:
plt.figure(figsize=(12, 6))

fundo = "#dad9db"

plt.gcf().set_facecolor(fundo)

ax = plt.axes()
ax.set_facecolor(fundo)

# Plotar a curva de aprendizado
plt.plot(df_stats['Loss de Treino'], color='#8572a3', marker='o', linestyle='-', label="Treinamento")
plt.plot(df_stats['Loss de Validação'], color='#2b1545', marker='o', linestyle='-', label="Validação")

plt.xlabel("Época")
plt.ylabel("Perda")
plt.legend(facecolor=fundo)
plt.xticks([i+1 for i in range(epochs)])

# Exibir o gráfico
plt.show()


# Salvando o modelo



In [None]:
# Salvando o modelo

output_dir = './model_save/'

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

print("Modelo salvo em %s" % output_dir)
model_to_save = model.module if hasattr(model, 'module') else model
model_to_save.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)


In [None]:
!ls -l --block-size=K ./model_save/

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import shutil
drive_dir = "/content/drive/MyDrive/GPT-2 TV/modelo"

shutil.copy("/content/model_save/model.safetensors", drive_dir)
shutil.copy("/content/model_save/added_tokens.json", drive_dir)
shutil.copy("/content/model_save/config.json", drive_dir)
shutil.copy("/content/model_save/generation_config.json", drive_dir)
shutil.copy("/content/model_save/merges.txt", drive_dir)
shutil.copy("/content/model_save/special_tokens_map.json", drive_dir)
shutil.copy("/content/model_save/tokenizer_config.json", drive_dir)
shutil.copy("/content/model_save/vocab.json", drive_dir)

In [None]:
# Carregando o modelo

model = GPT2LMHeadModel.from_pretrained(output_dir)
tokenizer = GPT2Tokenizer.from_pretrained(output_dir)
model.to(device)

# Gerando uma música

In [None]:
model.eval()

prompt = "<|start|>Red dress"

generated = torch.tensor(tokenizer.encode(prompt)).unsqueeze(0)
generated = generated.to(device)

print(generated)

sample_outputs = model.generate(
                                generated,
                                do_sample=True,
                                top_k=50,
                                max_length = 300,
                                top_p=0.95,
                                num_return_sequences=3
                                )

for i, sample_output in enumerate(sample_outputs):
  print("\n{}: {}\n\n".format(i, tokenizer.decode(sample_output, skip_special_tokens=True)))