In [1]:
import pandas as pd
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm, trange
import torch.nn.functional as F
import csv
import os

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
path = 'drive/MyDrive/EHU/Apps1/final_project'

# Data Preparation

In [4]:
train = pd.read_csv(f'{path}/data/train.csv').rename(columns={'content':'Lyric'})
train = train[train['Lyric'].apply(lambda x: len(x.split(' ')) < 350)]

test = pd.read_csv(f'{path}/data/test.csv').rename(columns={'content':'Lyric'})

In [5]:
test['Lyric'] = test['Lyric'].map(lambda lyric: ''.join([x for x in lyric if x.isascii()]))
test['True_end_lyrics'] = test['Lyric'].str.split().str[-20:].apply(' '.join)
test['Lyric'] = test['Lyric'].str.split().str[:-20].apply(' '.join)
test = test[test.Lyric != ''] # remove those samples that became empty

In [6]:
assert len(test[test.True_end_lyrics == '']) == 0
assert len(test[test.Lyric == '']) == 0

In [None]:
class SongLyrics(Dataset):
    def __init__(self, df, truncate=False, gpt2_type="gpt2", max_length=1024):

        self.tokenizer = GPT2Tokenizer.from_pretrained(gpt2_type)
        self.lyrics = []

        for _, row in df.iterrows():
          self.lyrics.append(torch.tensor(
                self.tokenizer.encode(f"<|{row['tag_2']}|>{row['Lyric'][:max_length]}<|endoftext|>")
            ))
        if truncate:
            self.lyrics = self.lyrics[:20000]
        self.lyrics_count = len(self.lyrics)

    def __len__(self):
        return self.lyrics_count

    def __getitem__(self, item):
        return self.lyrics[item]

dataset = SongLyrics(train, truncate=True, gpt2_type="gpt2")

In [None]:
#Get the tokenizer and model
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2LMHeadModel.from_pretrained('gpt2')

# Training

In [9]:
#Accumulated batch size (since GPT2 is so big)
def pack_tensor(new_tensor, packed_tensor, max_seq_len):
    if packed_tensor is None:
        return new_tensor, True, None
    if new_tensor.size()[1] + packed_tensor.size()[1] > max_seq_len:
        return packed_tensor, False, new_tensor
    else:
        packed_tensor = torch.cat([new_tensor, packed_tensor[:, 1:]], dim=1)
        return packed_tensor, True, None

In [10]:
def train(
    dataset, model, tokenizer,
    batch_size=16, epochs=5, lr=2e-5,
    max_seq_len=400, warmup_steps=200,
    gpt2_type="gpt2", output_dir=".", output_prefix="wreckgar",
    test_mode=False,save_model_on_epoch=False,
):
    acc_steps = 100
    device=torch.device("cuda")
    model = model.cuda()
    model.train()

    optimizer = AdamW(model.parameters(), lr=lr)
    scheduler = get_linear_schedule_with_warmup(
        optimizer, num_warmup_steps=warmup_steps, num_training_steps=-1
    )

    train_dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
    loss=0
    accumulating_batch_count = 0
    input_tensor = None

    for epoch in range(epochs):

        print(f"Training epoch {epoch}")
        print(loss)
        for idx, entry in tqdm(enumerate(train_dataloader)):
            (input_tensor, carry_on, remainder) = pack_tensor(entry, input_tensor, 768)

            if carry_on and idx != len(train_dataloader) - 1:
                continue

            input_tensor = input_tensor.to(device)
            outputs = model(input_tensor, labels=input_tensor)
            loss = outputs[0]
            loss.backward()

            if (accumulating_batch_count % batch_size) == 0:
                optimizer.step()
                scheduler.step()
                optimizer.zero_grad()
                model.zero_grad()

            accumulating_batch_count += 1
            input_tensor = None
        if save_model_on_epoch:
            torch.save(
                model.state_dict(),
                os.path.join(output_dir, f"{output_prefix}-{epoch}.pt"),
            )
    return model

In [11]:
model = train(dataset, model, tokenizer)



Training epoch 0
0


6484it [02:00, 53.68it/s]


Training epoch 1
tensor(5.0755, device='cuda:0', grad_fn=<NllLossBackward0>)


6484it [02:06, 51.34it/s]


Training epoch 2
tensor(2.9324, device='cuda:0', grad_fn=<NllLossBackward0>)


6484it [02:08, 50.35it/s]


Training epoch 3
tensor(2.1749, device='cuda:0', grad_fn=<NllLossBackward0>)


6484it [02:10, 49.79it/s]


Training epoch 4
tensor(2.8809, device='cuda:0', grad_fn=<NllLossBackward0>)


6484it [02:10, 49.60it/s]


# Predict

In [12]:
def generate(
    model,
    tokenizer,
    prompt,
    entry_count=10,
    entry_length=30, #maximum number of words
    top_p=0.8,
    temperature=1.,
):
    model.eval()
    generated_num = 0
    generated_list = []

    filter_value = -float("Inf")

    with torch.no_grad():

        for entry_idx in trange(entry_count):

            entry_finished = False
            generated = torch.tensor(tokenizer.encode(prompt)).unsqueeze(0)

            for i in range(entry_length):
                outputs = model(generated, labels=generated)
                loss, logits = outputs[:2]
                logits = logits[:, -1, :] / (temperature if temperature > 0 else 1.0)

                sorted_logits, sorted_indices = torch.sort(logits, descending=True)
                cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)

                sorted_indices_to_remove = cumulative_probs > top_p
                sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[
                    ..., :-1
                ].clone()
                sorted_indices_to_remove[..., 0] = 0

                indices_to_remove = sorted_indices[sorted_indices_to_remove]
                logits[:, indices_to_remove] = filter_value

                next_token = torch.multinomial(F.softmax(logits, dim=-1), num_samples=1)
                generated = torch.cat((generated, next_token), dim=1)

                if next_token in tokenizer.encode("<|endoftext|>"):
                    entry_finished = True

                if entry_finished:

                    generated_num = generated_num + 1

                    output_list = list(generated.squeeze().numpy())
                    output_text = tokenizer.decode(output_list)
                    generated_list.append(output_text)
                    break

            if not entry_finished:
              output_list = list(generated.squeeze().numpy())
              output_text = f"{tokenizer.decode(output_list)}<|endoftext|>"
              generated_list.append(output_text)

    return generated_list

In [18]:
#Function to generate multiple sentences. Test data should be a dataframe
def text_generation(test_data):
  generated_lyrics = []
  for i in range(len(test_data)):
    x = generate(model.to('cpu'), tokenizer, test_data['Lyric'].iloc[i], entry_count=1)
    generated_lyrics.append(x)
  return generated_lyrics

In [19]:
#Run the functions to generate the lyrics
generated_lyrics = text_generation(test)

100%|██████████| 1/1 [00:05<00:00,  5.98s/it]
100%|██████████| 1/1 [00:04<00:00,  4.92s/it]
100%|██████████| 1/1 [00:07<00:00,  7.98s/it]
100%|██████████| 1/1 [00:05<00:00,  5.91s/it]
100%|██████████| 1/1 [00:06<00:00,  6.88s/it]
100%|██████████| 1/1 [00:04<00:00,  4.56s/it]
100%|██████████| 1/1 [00:06<00:00,  6.94s/it]
100%|██████████| 1/1 [00:04<00:00,  4.31s/it]
100%|██████████| 1/1 [00:02<00:00,  2.64s/it]
100%|██████████| 1/1 [00:06<00:00,  6.89s/it]
100%|██████████| 1/1 [00:00<00:00,  7.35it/s]
100%|██████████| 1/1 [00:05<00:00,  5.74s/it]
100%|██████████| 1/1 [00:04<00:00,  4.77s/it]
100%|██████████| 1/1 [00:05<00:00,  5.29s/it]
100%|██████████| 1/1 [00:14<00:00, 14.08s/it]
100%|██████████| 1/1 [00:04<00:00,  4.41s/it]
100%|██████████| 1/1 [00:05<00:00,  5.52s/it]
100%|██████████| 1/1 [00:02<00:00,  2.22s/it]
100%|██████████| 1/1 [00:04<00:00,  4.11s/it]
100%|██████████| 1/1 [00:13<00:00, 13.86s/it]
100%|██████████| 1/1 [00:05<00:00,  5.62s/it]
100%|██████████| 1/1 [00:06<00:00,

In [22]:
df_generated = pd.DataFrame(generated_lyrics, columns=["Generated Lyrics"])
df_generated

Unnamed: 0,Generated Lyrics
0,"Angel, with the gun in your hand, looking back..."
1,Go on and light me like a cigarette as I am ma...
2,I only light up when cameras are flashin' Neve...
3,Lay me in the palm of your hand I'll give you ...
4,I only light up when cameras are flashin' Neve...
...,...
784,'Cause if I could I would feel nothing That's ...
785,"So let's just fuck, no strings attached Let's ..."
786,"February, weather's scary, I need notes on how..."
787,'Cause if I could I would feel nothing That's ...


# Save Results and Model

In [59]:
reseted_index_results = test.reset_index().merge(
    df_generated.reset_index(), left_index=True, right_index=True)
reseted_index_results.to_csv(
    f'{path}/reseted_test_predictions.csv', index=False
)

In [30]:
model.save_pretrained(f'{path}/model/')