In [None]:

import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

df = pd.read_csv('../dataset/scraped_lyrics.csv')


lyrics_list = df['Lyrics'].tolist()


tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')
bert_model.to(device)
bert_model.eval()


def get_embedding(text):
    inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    with torch.no_grad():
        outputs = bert_model(**inputs)
        last_hidden_states = outputs.last_hidden_state  
   
    embedding = torch.mean(last_hidden_states, dim=1) 
    return embedding.squeeze().cpu().numpy()


embeddings = []
for idx, text in enumerate(lyrics_list):
    embedding = get_embedding(text)
    embeddings.append(embedding)
    if (idx + 1) % 50 == 0:
        print(f'Processed {idx + 1}/{len(lyrics_list)} songs')


embeddings = np.stack(embeddings) 


class EmbeddingDataset(Dataset):
    def __init__(self, embeddings):
        self.embeddings = embeddings

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        x = self.embeddings[idx]
       
        noise = np.random.normal(0, 0.1, x.shape)
        x_noisy = x + noise
        return torch.tensor(x_noisy, dtype=torch.float32), torch.tensor(x, dtype=torch.float32)


dataset = EmbeddingDataset(embeddings)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


class DenoisingAutoEncoder(nn.Module):
    def __init__(self, input_dim, hidden_dim1, hidden_dim2):
        super(DenoisingAutoEncoder, self).__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim1),
            nn.ReLU(),
            nn.Linear(hidden_dim1, hidden_dim2),
            nn.ReLU()
        )
       
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dim2, hidden_dim1),
            nn.ReLU(),
            nn.Linear(hidden_dim1, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        x_reconstructed = self.decoder(z)
        return x_reconstructed


input_dim = embeddings.shape[1]     
hidden_dim1 = 512
hidden_dim2 = 256


model = DenoisingAutoEncoder(input_dim, hidden_dim1, hidden_dim2).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for x_noisy, x in dataloader:
        x_noisy = x_noisy.to(device)
        x = x.to(device)
        optimizer.zero_grad()
        x_reconstructed = model(x_noisy)
        loss = criterion(x_reconstructed, x)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(dataloader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}')


model.eval()
with torch.no_grad():
    enhanced_embeddings = []
    for x_noisy, _ in dataloader:
        x_noisy = x_noisy.to(device)
        z = model.encoder(x_noisy)
        enhanced_embeddings.append(z.cpu().numpy())
    enhanced_embeddings = np.vstack(enhanced_embeddings)  


np.save('enhanced_embeddings.npy', enhanced_embeddings)
