In [35]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.functional import softmax
from torch.nn.utils import clip_grad_norm_
from torch.optim.lr_scheduler import StepLR
from sklearn.model_selection import train_test_split
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, Dataset, random_split
from transformers import AdamW, get_linear_schedule_with_warmup
from transformers import get_scheduler, GPT2Tokenizer, GPT2LMHeadModel
from transformers import GPT2Model, GPT2Config
from sklearn.preprocessing import LabelEncoder, StandardScaler


In [23]:
df = pd.read_csv('final_lyrics_features_combined.csv')
df = df.drop_duplicates() # remove duplicate rows
# Assuming df is your DataFrame and 'column_name' is the name of the column you want to remove
df.drop(columns=['track_id'], inplace=True)
df.drop(columns=['Title'], inplace=True)
df.drop(columns=['Artist'], inplace=True)
df.drop(columns=['track_genre'], inplace=True)
df = df.rename(columns={'Lyrics': 'lyrics'})

df['lyrics'] = df['lyrics'].str.replace('   ', '\n') # replace the 3 space delimtter setup with the | character
df['explicit'] = df['explicit'].astype(int)
df['lyrics'] = df['lyrics'].astype(str)
df.head()

Unnamed: 0,lyrics,popularity,duration_ms,explicit,danceability,energy,key,loudness,mode,speechiness,acousticness,instrumentalness,liveness,valence,tempo,time_signature
0,I hate you for what you did And I miss you li...,77,229760,0,0.651,0.546,1,-9.021,1,0.0357,0.774,0.0437,0.0842,0.623,107.021,4
1,I hate you for what you did And I miss you li...,77,229760,0,0.651,0.546,1,-9.021,1,0.0357,0.774,0.0437,0.0842,0.623,107.021,4
2,Sometimes I think I'm a killer I scared you i...,51,183906,0,0.558,0.0578,8,-19.907,1,0.0337,0.921,0.961,0.0954,0.0575,80.021,4
3,killer yeah it's crazy i'm a killer made all t...,51,183906,0,0.558,0.0578,8,-19.907,1,0.0337,0.921,0.961,0.0954,0.0575,80.021,4
4,"Georgia, Georgia, I love your son And when he...",62,230506,0,0.388,0.66,11,-7.372,1,0.0308,0.312,0.000307,0.0943,0.401,143.554,4


In [24]:
label_encoder = LabelEncoder()

numerical_features = ['popularity', 'duration_ms', 'explicit', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']

# Scaling numerical features
scaler = StandardScaler()
df[numerical_features] = scaler.fit_transform(df[numerical_features])

# Splitting dataset into features and target
X = df[numerical_features]
y = df['lyrics']

# Splitting the data into training and test sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

In [25]:
# Initialize the GPT-2 tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token  # Setting the padding token to EOS token for consistency

def preprocess_and_tokenize_data(features, lyrics, tokenizer):
    # Tokenize lyrics
    tokenized_lyrics = tokenizer(lyrics.tolist(), return_tensors="pt", padding=True, truncation=True, max_length=512, add_special_tokens=True)
    
    # Convert features to numpy array and then tensor
    features_array = features.to_numpy()  # Convert DataFrame to numpy array
    features_tensor = torch.tensor(features_array, dtype=torch.float)  # Convert numpy array to tensor
    
    # Creating a TensorDataset
    dataset = TensorDataset(features_tensor, tokenized_lyrics['input_ids'], tokenized_lyrics['attention_mask'])
    return dataset

# Preprocess and tokenize the training and validation data
train_dataset = preprocess_and_tokenize_data(X_train, y_train, tokenizer)
val_dataset = preprocess_and_tokenize_data(X_val, y_val, tokenizer)

# Create DataLoaders for the training and validation set
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [30]:
class LyricGeneratorModel(nn.Module):
    def __init__(self, num_numerical_features, gpt2_model_name='gpt2', feature_weights=None):
        super(LyricGeneratorModel, self).__init__()
        
        self.gpt2_config = GPT2Config.from_pretrained(gpt2_model_name)
        self.gpt2_model = GPT2Model.from_pretrained(gpt2_model_name)
        
        if feature_weights is None:
            # Default to equal weighting if none provided
            feature_weights = torch.ones(num_numerical_features)
        self.feature_weights = nn.Parameter(feature_weights, requires_grad=False)  # Make it a model parameter to ensure it moves with the model device
        
        self.numerical_processor = nn.Linear(num_numerical_features, self.gpt2_config.n_embd)
        self.decoder = nn.Linear(self.gpt2_config.n_embd, self.gpt2_config.vocab_size)

    def forward(self, numerical_features, input_ids, attention_mask):
        # Apply the feature weights to the numerical features
        weighted_numerical_features = numerical_features * self.feature_weights
        
        numerical_embeddings = self.numerical_processor(weighted_numerical_features)
        numerical_embeddings = numerical_embeddings.unsqueeze(1).expand(-1, input_ids.size(1), -1)
        
        inputs_embeds = numerical_embeddings + self.gpt2_model.wte(input_ids)
        
        gpt2_outputs = self.gpt2_model(inputs_embeds=inputs_embeds, attention_mask=attention_mask)
        sequence_output = gpt2_outputs.last_hidden_state
        
        logits = self.decoder(sequence_output)
        
        return logits

In [31]:
# Convert DataFrame columns to tensors
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float)
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float)

# Tokenize lyrics for both training and validation sets
train_encodings = tokenizer(y_train.tolist(), return_tensors="pt", padding=True, truncation=True, max_length=512)
val_encodings = tokenizer(y_val.tolist(), return_tensors="pt", padding=True, truncation=True, max_length=512)

# Create TensorDatasets
train_dataset = TensorDataset(X_train_tensor, train_encodings['input_ids'], train_encodings['attention_mask'])
val_dataset = TensorDataset(X_val_tensor, val_encodings['input_ids'], val_encodings['attention_mask'])

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Model instantiation
feature_weights = torch.tensor([3, 8, 6, 9, 9, 1, 1, 1, 7, 7, 7, 5, 10, 2, 1], dtype=torch.float)
model = LyricGeneratorModel(num_numerical_features=15, gpt2_model_name='gpt2', feature_weights=feature_weights)

# Optimizer and Loss Function
optimizer = AdamW(model.parameters(), lr=5e-5)
loss_fn = torch.nn.CrossEntropyLoss()



In [36]:
def train(model, dataloader, optimizer, scheduler=None, clip_grad=None):
    model.train()  # Set the model to training mode
    total_loss = 0

    for batch_idx, batch in enumerate(dataloader):
        # Unpack the batch data
        num_feats, input_ids, attn_mask = batch

        # Ensure data is on the correct device
        num_feats = num_feats.to(device)
        input_ids = input_ids.to(device)
        attn_mask = attn_mask.to(device)

        optimizer.zero_grad()  # Zero the gradients before running the forward pass.

        # Forward pass
        outputs = model(numerical_features=num_feats, input_ids=input_ids, attention_mask=attn_mask)
        logits = outputs  # Assuming your model returns logits directly

        # Calculate loss
        loss = loss_fn(logits.view(-1, model.gpt2_config.vocab_size), input_ids.view(-1))

        # Backward pass and optimize
        loss.backward()

        # Gradient clipping
        if clip_grad is not None:
            clip_grad_norm_(model.parameters(), clip_grad)

        optimizer.step()

        # Adjust the learning rate based on the scheduler
        if scheduler:
            scheduler.step()

        total_loss += loss.item()

        # Print loss every batch (or modify to print less frequently)
        print(f"Epoch {epoch+1}, Batch {batch_idx+1}/{len(dataloader)}, Loss: {loss.item():.4f}")

    avg_loss = total_loss / len(dataloader)
    return avg_loss

# Assuming optimizer has been defined
scheduler = StepLR(optimizer, step_size=1, gamma=0.1)  # Example scheduler

# Example usage of gradient clipping
clip_grad = 1.0

num_epochs = 3
# Training loop with scheduler and gradient clipping
for epoch in range(num_epochs):
    avg_train_loss = train(model, train_loader, optimizer, scheduler=scheduler, clip_grad=clip_grad)
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}")


Epoch 1, Batch 1/597, Loss: 11.3945


KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(), 'lyric_generator_model.pth')

In [None]:
# Re-instantiate the model
loaded_model = LyricGeneratorModel(num_numerical_features=15, gpt2_model_name='gpt2')
# Load the saved model parameters
loaded_model.load_state_dict(torch.load('lyric_generator_model.pth'))
# Move the model to the appropriate device
loaded_model = loaded_model.to(device)

# Set the model to evaluation mode
loaded_model.eval()

In [None]:
def process_user_input(input_str, scaler, num_features_order, device):
    """
    Processes user input string of features and converts them into a tensor.
    
    Args:
    - input_str (str): User input string of features in the format "feature1: value1, feature2: value2, ..."
    - scaler (StandardScaler): Scaler used during training for numerical features.
    - num_features_order (list): Ordered list of numerical feature names as used during training.
    - device (torch.device): Device to load the tensor onto.
    
    Returns:
    - torch.Tensor: Tensor of processed numerical features ready for model input.
    """
    # Split input string and build a feature dict
    features = dict(item.split(": ") for item in input_str.split(", "))
    input_features = np.zeros(len(num_features_order))
    
    # Fill the array with input values, maintaining the order
    for i, feature in enumerate(num_features_order):
        if feature in features:
            input_features[i] = float(features[feature])
    
    # Scale the features
    input_features_scaled = scaler.transform([input_features])
    
    # Convert to tensor and move to the correct device
    input_tensor = torch.tensor(input_features_scaled, dtype=torch.float).to(device)
    
    return input_tensor

user_input = "danceability: 0.68, valence: 0.701, tempo: 120"
processed_input = process_user_input(user_input, scaler, numerical_features, device)

# Now `processed_input` can be used with `loaded_model` for generating lyrics.


In [None]:
def generate_lyrics(model, tokenizer, processed_features, device, max_length=512):
    """
    Generates lyrics based on processed numerical features.

    Args:
    - model: Trained LyricGeneratorModel.
    - tokenizer: Tokenizer used during training (GPT2Tokenizer).
    - processed_features: Tensor of processed input features.
    - device: Device on which the model is loaded.
    - max_length: Maximum length of the generated lyrics.

    Returns:
    - str: Generated lyrics.
    """
    model.eval()  # Ensure the model is in evaluation mode

    # Initial input tokens: start of sequence token
    input_ids = torch.tensor([tokenizer.bos_token_id]).unsqueeze(0).to(device)
    attention_mask = torch.ones_like(input_ids).to(device)

    generated_ids = []
    
    with torch.no_grad():  # No need to calculate gradients
        for _ in range(max_length):
            outputs = model(numerical_features=processed_features, input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs[:, -1, :]  # Take the logits of the last token

            # Convert logits to probabilities
            probs = softmax(logits, dim=-1)
            next_token_id = torch.argmax(probs, dim=-1)

            # Append generated token ID
            generated_ids.append(next_token_id.item())

            # Prepare input_ids for the next iteration
            input_ids = torch.cat([input_ids, next_token_id.unsqueeze(0)], dim=1)

            # Check if the generated token is the end-of-sequence token
            if next_token_id.item() == tokenizer.eos_token_id:
                break

    # Decode generated token IDs to text
    generated_text = tokenizer.decode(generated_ids, skip_special_tokens=True)
    
    return generated_text

In [None]:
# Assuming `processed_input` is the processed numerical features from the user input
generated_lyrics = generate_lyrics(loaded_model, tokenizer, processed_input, device, max_length=100)
print("Generated Lyrics:\n", generated_lyrics)