In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True) #mounting my google drive

Mounted at /content/drive


In [2]:
!mkdir ~/.kaggle

In [3]:
!cp /content/drive/MyDrive/KAGGLE_API_CREDENTIALS/kaggle.json ~/.kaggle/kaggle.json

In [4]:
!chmod 600 ~/.kaggle/kaggle.json

In [5]:
import torch
from torch.optim import Adam
import matplotlib.pyplot as plt
from torch import nn
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from sklearn.metrics import classification_report, accuracy_score
import copy
import numpy as np

In [6]:
device = 'cuda' if torch.cuda.is_available() else 'cpu' #setting up device agnostic code

In [None]:
!kaggle datasets download -d meeraajayakumar/spotify-user-behavior-dataset
#link to dataset: https://www.kaggle.com/datasets/meeraajayakumar/spotify-user-behavior-dataset

In [None]:
!unzip /content/spotify-user-behavior-dataset.zip

In [9]:
import pandas as pd
df = pd.read_excel('/content/Spotify_data.xlsx')

df = df.dropna() #removing any row with an empty column


In [10]:
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()

for col in df.columns: #applying label encoding on all columns except music_recc_rating since it's not categorical
  if col != 'music_recc_rating':
    df[col] = le.fit_transform(df[col])

In [11]:
COLUMNS_TO_DROP = ['premium_sub_willingness','preffered_premium_plan'] #these are the columns I want the model to predict

X = df.drop(COLUMNS_TO_DROP, axis=1)
y1 = df['premium_sub_willingness']
y2 = df['preffered_premium_plan']



In [12]:
from sklearn.model_selection import train_test_split

X_numpy = X.values


X_train_prem_will, X_test_prem_will, y_train_prem_will, y_test_prem_will = train_test_split(X, y1, test_size=0.33, random_state=42)

X_train_pref_plan, X_test_pref_plan, y_train_pref_plan, y_test_pref_plan = train_test_split(X, y2, test_size=0.33, random_state=42)




In [71]:
from torch.utils.data import TensorDataset

# Convert the premium_sub_willingness data to PyTorch tensors
X_train_prem_will_tensor = torch.Tensor(X_train_prem_will.values)
y_train_prem_will_tensor = torch.Tensor(y_train_prem_will.values)
X_test_prem_will_tensor = torch.Tensor(X_test_prem_will.values)
y_test_prem_will_tensor = torch.Tensor(y_test_prem_will.values)

# Create TensorDatasets
train_dataset_prem_will = TensorDataset(X_train_prem_will_tensor, y_train_prem_will_tensor)
test_dataset_prem_will = TensorDataset(X_test_prem_will_tensor, y_test_prem_will_tensor)

# Same for 'preffered_premium_plan'
X_train_pref_plan_tensor = torch.Tensor(X_train_pref_plan.values)
y_train_pref_plan_tensor = torch.Tensor(y_train_pref_plan.values)
X_test_pref_plan_tensor = torch.Tensor(X_test_pref_plan.values)
y_test_pref_plan_tensor = torch.Tensor(y_test_pref_plan.values)

y_train_pref_plan_tensor = y_train_pref_plan_tensor.type(torch.LongTensor) #these are feature for multiclass model. sunce we're using cross entropy loss, their data type has to be changed to long
y_test_pref_plan_tensor = y_test_pref_plan_tensor.type(torch.LongTensor)


train_dataset_pref_plan = TensorDataset(X_train_pref_plan_tensor, y_train_pref_plan_tensor)
test_dataset_pref_plan = TensorDataset(X_test_pref_plan_tensor, y_test_pref_plan_tensor)


In [72]:
batch_size = 32

# Create DataLoaders
train_loader_prem_will = DataLoader(train_dataset_prem_will, batch_size=batch_size, shuffle=True)
test_loader_prem_will = DataLoader(test_dataset_prem_will, batch_size=batch_size, shuffle=False)

train_loader_pref_plan = DataLoader(train_dataset_pref_plan, batch_size=batch_size, shuffle=True)
test_loader_pref_plan = DataLoader(train_dataset_pref_plan, batch_size=batch_size, shuffle=False)

In [186]:
class PremiumPlanSubscriptionPredictorV3(nn.Module):
  def __init__(self, input_shape, output_shape):
    super().__init__()

    self.layer_stack = nn.Sequential(
        nn.Linear(in_features=input_shape, out_features=64),
        nn.BatchNorm1d(64),
        nn.ReLU(),
        nn.Dropout(0.45),  # Add dropout layer with 20% dropout rate
        nn.Linear(in_features=64, out_features=128),
        nn.BatchNorm1d(128),
        nn.ReLU(),
        nn.Dropout(0.45),  # Add dropout layer with 20% dropout rate
        nn.Linear(in_features=128, out_features=32),
        nn.BatchNorm1d(32),
        nn.ReLU(),
        nn.Dropout(0.45),  # Add dropout layer with 20% dropout rate
        nn.Linear(in_features=32, out_features=18),
        nn.BatchNorm1d(18),
        nn.ReLU(),
        nn.Dropout(0.45),  # Add dropout layer with 20% dropout rate
        nn.Linear(in_features=18, out_features=output_shape)
    )

  def forward(self, X):
    return self.layer_stack(X).squeeze(dim=1)


In [187]:
class PremiumPlanSubscriptionAmountPredictorV1(nn.Module):
  def __init__(self,input_shape,output_shape):
    super().__init__()

    self.layer_stack = nn.Sequential(
        nn.Linear(in_features=input_shape, out_features=64),
        nn.BatchNorm1d(64),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(in_features=64, out_features=128),
        nn.BatchNorm1d(128),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(in_features=128, out_features=64),
        nn.BatchNorm1d(64),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(in_features=64, out_features=output_shape)
    )

  def forward(self,X):
    return self.layer_stack(X)


In [188]:
model_0 = PremiumPlanSubscriptionPredictorV3(input_shape=X.shape[1], output_shape=1).to(device)
model_1 = PremiumPlanSubscriptionAmountPredictorV1(input_shape=X.shape[1], output_shape=len(y2.unique())).to(device)

In [196]:
def train_model(model, train_loader, loss_fn, optimizer,device ,model_type):
    model.train()
    running_loss = 0.0 #keeps track of the total loss in entire dataset
    preds_list = []
    labels_list = []

    for X, y in train_loader:
        X = X.to(device)
        y = y.to(device)

        # Forward pass
        outputs = model(X)
        loss = loss_fn(outputs, y)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Get predictions
        if model_type == 'binary':
          preds = (outputs > 0).float()
        else:
          preds = torch.argmax(outputs,dim=1)

        # Move predictions and labels to CPU and convert to numpy arrays, then append to the lists
        preds_list.append(preds.detach().cpu().numpy())
        labels_list.append(y.cpu().numpy())

        running_loss += loss.item() * X.size(0)

    # Concatenate all the numpy arrays into a single numpy array
    all_preds = np.concatenate(preds_list, axis=0)
    all_labels = np.concatenate(labels_list, axis=0)

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_acc = accuracy_score(all_labels, all_preds)  # Use scikit-learn's accuracy_score

    return epoch_loss, epoch_acc


In [197]:
def test_model(model, test_loader, loss_fn, device, model_type):
    model.eval()
    running_loss = 0.0
    labels_list = []
    preds_list = []

    with torch.no_grad():
        for X, y in test_loader:
            X = X.to(device)
            y = y.to(device)



            outputs = model(X)
            loss = loss_fn(outputs, y)

            if model_type == 'binary':
              preds = (outputs > 0).float()
            else:
              preds = torch.argmax(outputs,dim=1)

            running_loss += loss.item()

            labels_list += y.cpu().numpy().tolist()
            preds_list += preds.cpu().numpy().tolist()

    epoch_loss = running_loss / len(test_loader)
    epoch_acc = accuracy_score(labels_list, preds_list)  # using sklearn's accuracy_score

    classification_results = classification_report(labels_list, preds_list)

    return epoch_loss, epoch_acc, classification_results


In [198]:
def train_and_validate(model, train_loader, test_loader, loss_fn, optimizer, epochs, device, model_type, model_save_path):
    best_acc = 0.0
    for epoch in range(epochs):
        train_loss, train_acc = train_model(model, train_loader, loss_fn, optimizer, device,model_type)
        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')

        if epoch % 5 == 0:
          test_loss, test_acc, class_report = test_model(model, test_loader, loss_fn, device, model_type)
          print(f'Epoch {epoch+1}/{epochs},test Loss: {test_loss:.4f}, test Acc: {test_acc:.4f}')


        # Save the model weights if this epoch gives us the highest test accuracy
        if test_acc > best_acc:
            best_acc = test_acc
            torch.save(model.state_dict(), model_save_path)
            best_class_report = class_report
            #print(best_class_report)

    # After all epochs, print the best classification report
    print("Best Classification Report : ")
    return best_class_report, best_acc

In [None]:
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.SGD(model_0.parameters(), lr=0.008, weight_decay=0.003)
epochs = 500
model_type = 'binary'

# Perform training and validation
train_and_validate(model_0, train_loader_prem_will, test_loader_prem_will, loss_fn, optimizer, epochs, device, model_type, 'premium_plan_subscription_predictor_model_weights.pth')

In [None]:
model_0.load_state_dict(torch.load('/content/premium_plan_subscription_predictor_model_weights.pth'))
model_1.load_state_dict(torch.load('/content/premium_plan_subscription_amount_predictor_model_weights.pth'))