<a href="https://www.kaggle.com/code/nilovnachatterjee/personalized-music-mood-recommender-nc?scriptVersionId=223304950" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

Building Mood Based Music Recommendation System using Spotify dataset and the Million Song Dataset
image.png

# =========================================
# 1. Introduction
# =========================================
"""
In this notebook, a Mood-Based Music Recommendation System was built using 
the Spotify Dataset and the Million Song Dataset. The following steps were adopted:

1. Load and explore the data.
2. Clean and preprocess the data (handle duplicates, missing values, outliers).
3. Perform exploratory data analysis (EDA).
4. Cluster songs based on audio features (K-Means).
5. Build and evaluate a mood classification model.
6. Create a neural network for recommendation based on mood clusters.

A separate Word/PDF document will summarize the key findings and link back
to this notebook.
"""

In [14]:
# =========================================
# 2. Import Libraries
# =========================================
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings
import json
import nbformat
import tensorflow as tf
from tensorflow import keras

# Web scraping / HTML parsing
from bs4 import BeautifulSoup 
from io import StringIO 


# Preprocessing & Feature Engineering
from imblearn.over_sampling import RandomOverSampler
from tensorflow.keras.layers import Dense, Dropout, Input
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, RobustScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report, precision_score, recall_score

#Deep Learning
from tensorflow.keras.models import Sequential
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTETomek, SMOTEENN
from sklearn.utils.class_weight import compute_class_weight

# **Data preprocessing**

In [15]:
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/spotify-song-prediction-and-recommendation-system/__results__.html
/kaggle/input/spotify-song-prediction-and-recommendation-system/__notebook_source__.ipynb
/kaggle/input/spotify-song-prediction-and-recommendation-system/__notebook__.ipynb
/kaggle/input/spotify-song-prediction-and-recommendation-system/__output__.json
/kaggle/input/spotify-song-prediction-and-recommendation-system/custom.css
/kaggle/input/million-song-dataset/Testing_set_songs.csv
/kaggle/input/million-song-dataset/Training_set_songs.csv
/kaggle/input/million-song-dataset-studies/song_data1.csv
/kaggle/input/million-song-dataset-studies/song_data4.csv
/kaggle/input/million-song-dataset-studies/song_data2.csv
/kaggle/input/million-song-dataset-studies/song_data3.csv
/kaggle/input/million-song-dataset-studies/song_data7.csv
/kaggle/input/million-song-dataset-studies/song_data6.csv
/kaggle/input/million-song-dataset-studies/song_data5.csv
/kaggle/input/spotify-user-behavior-analysis/__results__.html
/kaggle/

In [16]:
data_dirs = {
    "million_song" : "/kaggle/input/million-song-dataset",
    "spotify_songs": "/kaggle/input/spotify-dataset",
    "million_song_studies": "/kaggle/input/million-song-dataset-studies",
    "spotify_user_behavior": "/kaggle/input/spotify-user-behavior-analysis",  # Integrated the Spotify User Behavior Analysis
    "spotify_song_prediction": "/kaggle/input/spotify-song-prediction-and-recommendation-system",
    "musics_demographic_data": "/kaggle/input/musics-depending-on-demographic-data"
}


In [17]:
# =========================================
# 3. Helper Functions
# =========================================
def extract_tables_from_html(html_path: str):
    """
    Extracts all tables from an HTML file and returns them as a list of DataFrames.
    """
    try:
        with open(html_path, 'r', encoding='utf-8') as file:
            soup = BeautifulSoup(file, 'html.parser')
        tables = pd.read_html(StringIO(str(soup)))
        return tables
    except Exception as e:
        print(f"Error reading HTML file: {e}")
        return None

def list_files(directory: str):
    """
    Lists all files in a given directory. Returns a list of file names.
    """
    try:
        files = os.listdir(directory)
        print(f"Files in {directory}:", files)
        return files
    except Exception as e:
        print(f"Error listing files in {directory}: {e}")
        return []

def load_specific_datasets(data_dirs: dict):
    """
    Loads multiple datasets (Million Song, Spotify, etc.) from specified directories.
    Returns a dictionary of DataFrames or dict-of-DataFrames.
    """
    datasets = {}
    # Example usage of listing and loading. Adjust to your actual files.
    for name, path in data_dirs.items():
        print(f"Loading dataset from: {name} -> {path}")
        # Example: If you have CSVs in that folder, you can load them
        files = list_files(path)
        # Add your custom loading logic per dataset name...
        # (Skipping the full logic here for brevity.)
    return datasets

def handle_missing_values(df: pd.DataFrame):
    """
    Handles missing values by filling numerical columns with mean
    and categorical columns with mode. Returns a cleaned copy of the DataFrame.
    """
    if df is None:
        return None

    df = df.copy()
    # Fill numerical columns with mean, categorical with mode
    for col in df.columns:
        if df[col].dtype in ['float64', 'int64']:
            df[col].fillna(df[col].mean(), inplace=True)
        else:
            df[col].fillna(df[col].mode()[0], inplace=True)
    return df

def remove_duplicates(df: pd.DataFrame):
    """
    Removes duplicate rows from a DataFrame.
    """
    if df is None:
        return None
    df = df.drop_duplicates().reset_index(drop=True)
    return df

def detect_and_remove_outliers(df: pd.DataFrame, numeric_cols: list, iqr_multiplier: float = 1.5):
    """
    Detects and removes outliers based on the IQR rule.
    Returns a DataFrame without outliers in the specified numeric_cols.
    """
    df_clean = df.copy()
    for col in numeric_cols:
        if col in df_clean.columns and df_clean[col].dtype in [np.float64, np.int64]:
            Q1 = df_clean[col].quantile(0.25)
            Q3 = df_clean[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - iqr_multiplier * IQR
            upper_bound = Q3 + iqr_multiplier * IQR
            df_clean = df_clean[(df_clean[col] >= lower_bound) & (df_clean[col] <= upper_bound)]
    df_clean.reset_index(drop=True, inplace=True)
    return df_clean

def encode_categorical_features(df: pd.DataFrame):
    """
    Encodes categorical columns in a DataFrame using LabelEncoder.
    Returns the transformed DataFrame and a dictionary of encoders.
    """
    if df is None:
        return None, {}

    df = df.copy()
    label_encoders = {}
    categorical_columns = df.select_dtypes(include=['object', 'category']).columns.tolist()

    for col in categorical_columns:
        le = LabelEncoder()
        df[col] = le.fit_transform(df[col].astype(str))
        label_encoders[col] = le
    return df, label_encoders

def combine_numerical_data(datasets: dict) -> pd.DataFrame:
    """
    Combines numerical data from multiple DataFrames into a single DataFrame.
    Works if your 'datasets' dict has DataFrames or dict-of-DataFrames.
    """
    combined_data = pd.DataFrame()
    for key, data in datasets.items():
        if isinstance(data, dict):  # For datasets with train/test split
            for sub_key, df in data.items():
                if isinstance(df, pd.DataFrame):
                    combined_data = pd.concat([combined_data, df], ignore_index=True)
        elif isinstance(data, pd.DataFrame):
            combined_data = pd.concat([combined_data, data], ignore_index=True)
    return combined_data


## [2] Helper Functions

Below, we define **reusable helper functions** for:
- **Extracting tables** from HTML files,
- **Listing files** in a directory,
- **Loading** multiple datasets,
- **Handling missing values**,
- **Removing duplicates**,
- **Outlier detection**,
- **Encoding categorical features**,
- **Combining** data from multiple DataFrames.


In [11]:
# Function to extract tables from HTML files
# =========================================
# 3. Helper Functions
# =========================================
def extract_tables_from_html(html_path):
    try:
        with open(html_path, 'r', encoding='utf-8') as file:
            soup = BeautifulSoup(file, 'html.parser')

        # Extract all tables from the HTML using StringIO to avoid FutureWarning
        tables = pd.read_html(StringIO(str(soup)))
        return tables
    except Exception as e:
        print(f"Error reading HTML file: {e}")
        return None

# Directories for datasets
data_dirs = {
    "spotify_user_behavior": "/kaggle/input/spotify-user-behavior-analysis",
    "spotify_song_prediction": "/kaggle/input/spotify-song-prediction-and-recommendation-system",
    "musics_demographic_data": "/kaggle/input/musics-depending-on-demographic-data"
}

# Attempting to extract data from HTML files
spotify_html_data = extract_tables_from_html(os.path.join(data_dirs["spotify_user_behavior"], "__results__.html"))
musics_html_data = extract_tables_from_html(os.path.join(data_dirs["musics_demographic_data"], "__results__.html"))

# Display the first few rows of the extracted tables if available
if spotify_html_data:
    print("\nSpotify User Behavior Data Sample:")
    print(spotify_html_data[0].head())  # Display the first table
else:
    print("No data found in Spotify User Behavior HTML.")

if musics_html_data:
    print("\nMusics Demographic Data Sample:")
    print(musics_html_data[0].head())  # Display the first table
else:
    print("No data found in Musics Demographic HTML.")


def list_files(directory):
    try:
        files = os.listdir(directory)
        print(f"Files in {directory}:", files)
        return files
    except Exception as e:
        print(f"Error listing files in {directory}: {e}")
        return []


def load_specific_datasets(data_dirs):
    datasets = {}

    # Million Song Dataset (Train & Test)
    million_song_files = list_files(data_dirs["million_song"])
    if "Testing_set_songs.csv" in million_song_files and "Training_set_songs.csv" in million_song_files:
        datasets["million_song"] = {
            "train": pd.read_csv(os.path.join(data_dirs["million_song"], "Training_set_songs.csv")),
            "test": pd.read_csv(os.path.join(data_dirs["million_song"], "Testing_set_songs.csv"))
        }
    else:
        print("Error: Test or Train files not found in Million Song Dataset.")

    # Million Song Dataset Studies
    studies_files = list_files(data_dirs["million_song_studies"])
    studies_data = [pd.read_csv(os.path.join(data_dirs["million_song_studies"], file)) 
                     for file in studies_files if file.endswith(".csv")]
    datasets["million_song_studies"] = pd.concat(studies_data, ignore_index=True) if studies_data else None

    # Spotify Dataset
    spotify_files = list_files(data_dirs["spotify_songs"])
    spotify_data = {file.replace(".csv", ""): pd.read_csv(os.path.join(data_dirs["spotify_songs"], file))
                     for file in spotify_files if file.endswith(".csv")}
    datasets["spotify_songs"] = spotify_data if spotify_data else None

    # Spotify User Behavior Dataset (HTML)
    spotify_html_data = extract_tables_from_html(os.path.join(data_dirs["spotify_user_behavior"], "__results__.html"))
    datasets["spotify_user_behavior"] = spotify_html_data[0] if spotify_html_data else None

    # Spotify Song Prediction Dataset
    prediction_files = list_files(data_dirs["spotify_song_prediction"])
    prediction_data = [pd.read_csv(os.path.join(data_dirs["spotify_song_prediction"], file)) 
                        for file in prediction_files if file.endswith(".csv")]
    datasets["spotify_song_prediction"] = pd.concat(prediction_data, ignore_index=True) if prediction_data else None

    # Musics Demographic Data (HTML)
    musics_html_data = extract_tables_from_html(os.path.join(data_dirs["musics_demographic_data"], "__results__.html"))
    datasets["musics_demographic_data"] = musics_html_data[0] if musics_html_data else None



Spotify User Behavior Data Sample:
  Unnamed: 0    Age  Gender spotify_usage_period  \
0          0  20-35  Female    More than 2 years   
1          1  12-20    Male    More than 2 years   
2          2  35-60  Others   6 months to 1 year   
3          3  20-35  Female    1 year to 2 years   
4          4  20-35  Female    1 year to 2 years   

                         spotify_listening_device spotify_subscription_plan  \
0              Smart speakers or voice assistants       Free (ad-supported)   
1                              Computer or laptop       Free (ad-supported)   
2              Smart speakers or voice assistants       Free (ad-supported)   
3  Smartphone, Smart speakers or voice assistants       Free (ad-supported)   
4                                      Smartphone       Free (ad-supported)   

  premium_sub_willingness          preffered_premium_plan  \
0                     Yes        Family Plan-Rs 179/month   
1                     Yes  Individual Plan- Rs 119/ mo

## [3] Load Data

Here, we define a `data_dirs` dictionary for our dataset paths and use `load_specific_datasets` to create a unified dictionary `all_datasets`. Adjust the paths and file names to match your actual environment.


In [18]:
# =========================================
# 4. Load Data
# =========================================
data_dirs = {
    "million_song": "/kaggle/input/million-song-dataset",
    "spotify_songs": "/kaggle/input/spotify-dataset",
    "million_song_studies": "/kaggle/input/million-song-dataset-studies",
    "spotify_user_behavior": "/kaggle/input/spotify-user-behavior-analysis",
    "spotify_song_prediction": "/kaggle/input/spotify-song-prediction-and-recommendation-system",
    "musics_demographic_data": "/kaggle/input/musics-depending-on-demographic-data"
}

all_datasets = load_specific_datasets(data_dirs)


Loading dataset from: million_song -> /kaggle/input/million-song-dataset
Files in /kaggle/input/million-song-dataset: ['Testing_set_songs.csv', 'Training_set_songs.csv']
Loading dataset from: spotify_songs -> /kaggle/input/spotify-dataset
Files in /kaggle/input/spotify-dataset: ['data']
Loading dataset from: million_song_studies -> /kaggle/input/million-song-dataset-studies
Files in /kaggle/input/million-song-dataset-studies: ['song_data1.csv', 'song_data4.csv', 'song_data2.csv', 'song_data3.csv', 'song_data7.csv', 'song_data6.csv', 'song_data5.csv']
Loading dataset from: spotify_user_behavior -> /kaggle/input/spotify-user-behavior-analysis
Files in /kaggle/input/spotify-user-behavior-analysis: ['__results__.html', '__notebook_source__.ipynb', '__resultx__.html', '__notebook__.ipynb', '__results___files', '__output__.json', 'custom.css']
Loading dataset from: spotify_song_prediction -> /kaggle/input/spotify-song-prediction-and-recommendation-system
Files in /kaggle/input/spotify-song-p

In [19]:
# =========================================
# 5. Data Cleaning
# =========================================
# =========================================
# 5. Data Cleaning
# =========================================
def handle_missing_values(df):
    """
    Handles missing values in a DataFrame by filling numerical columns with their mean 
    and categorical columns with their mode.
    
    Parameters:
    df (pandas.DataFrame): The input DataFrame.

    Returns:
    pandas.DataFrame: A copy of the DataFrame with missing values filled.
    """
    if df is None:
        print("Error: DataFrame is None.")
        return None

    print("Missing values before handling:")
    print(df.isnull().sum())

    # Create a copy of the DataFrame to avoid modifying the original object
    df = df.copy()

    # Fill numerical columns with mean and categorical columns with mode
    for col in df.columns:
        if df[col].dtype in ['float64', 'int64']:
            df[col] = df[col].fillna(df[col].mean())
        else:
            # Using mode with error handling in case mode returns an empty Series
            mode_val = df[col].mode()
            if not mode_val.empty:
                df[col] = df[col].fillna(mode_val[0])
            else:
                df[col] = df[col].fillna("Unknown")

    print("Missing values after handling:")
    print(df.isnull().sum())
    return df

# Load all datasets
all_datasets = load_specific_datasets(data_dirs)

# Handle missing values for Million Song Dataset
million_song_data = all_datasets.get("million_song")
if million_song_data is not None:
    million_song_data["train"] = handle_missing_values(million_song_data["train"])
    million_song_data["test"] = handle_missing_values(million_song_data["test"])

# Handle missing values for Spotify Dataset
spotify_data = all_datasets.get("spotify_songs")
if spotify_data is not None:
    for key, df in spotify_data.items():
        spotify_data[key] = handle_missing_values(df)

# Handle missing values for Million Song Studies
million_song_studies = all_datasets.get("million_song_studies")
if million_song_studies is not None:
    million_song_studies = handle_missing_values(million_song_studies)

# Handle missing values for Spotify User Behavior Dataset
spotify_user_behavior = all_datasets.get("spotify_user_behavior")
if spotify_user_behavior is not None:
    spotify_user_behavior = handle_missing_values(spotify_user_behavior)

# Handle missing values for Spotify Song Prediction Dataset
spotify_song_prediction = all_datasets.get("spotify_song_prediction")
if spotify_song_prediction is not None:
    spotify_song_prediction = handle_missing_values(spotify_song_prediction)

# Handle missing values for Musics Demographic Data
musics_demographic_data = all_datasets.get("musics_demographic_data")
if musics_demographic_data is not None:
    musics_demographic_data = handle_missing_values(musics_demographic_data)


Loading dataset from: million_song -> /kaggle/input/million-song-dataset
Files in /kaggle/input/million-song-dataset: ['Testing_set_songs.csv', 'Training_set_songs.csv']
Loading dataset from: spotify_songs -> /kaggle/input/spotify-dataset
Files in /kaggle/input/spotify-dataset: ['data']
Loading dataset from: million_song_studies -> /kaggle/input/million-song-dataset-studies
Files in /kaggle/input/million-song-dataset-studies: ['song_data1.csv', 'song_data4.csv', 'song_data2.csv', 'song_data3.csv', 'song_data7.csv', 'song_data6.csv', 'song_data5.csv']
Loading dataset from: spotify_user_behavior -> /kaggle/input/spotify-user-behavior-analysis
Files in /kaggle/input/spotify-user-behavior-analysis: ['__results__.html', '__notebook_source__.ipynb', '__resultx__.html', '__notebook__.ipynb', '__results___files', '__output__.json', 'custom.css']
Loading dataset from: spotify_song_prediction -> /kaggle/input/spotify-song-prediction-and-recommendation-system
Files in /kaggle/input/spotify-song-p

In [None]:
def normalize_features(df, columns=None):
    if df is None:
        print("Error: DataFrame is None.")
        return None

    scaler = MinMaxScaler()
    df = df.copy()

    if columns is None:
        columns = df.select_dtypes(include=['float64', 'int64']).columns.tolist()

    available_columns = [col for col in columns if col in df.columns]
    if not available_columns:
        print(f"No matching columns found for normalization. DataFrame columns: {df.columns.tolist()}")
        return df

    df[available_columns] = scaler.fit_transform(df[available_columns])
    return df

# Load all datasets
all_datasets = load_specific_datasets(data_dirs)

# Normalize Million Song Dataset
million_song_data = all_datasets.get("million_song")
if million_song_data is not None:
    million_song_data["train"] = normalize_features(million_song_data["train"])
    million_song_data["test"] = normalize_features(million_song_data["test"])

# Normalize Spotify Dataset
spotify_data = all_datasets.get("spotify_songs")
if spotify_data is not None:
    for key, df in spotify_data.items():
        spotify_data[key] = normalize_features(df)

# Normalize Million Song Studies
million_song_studies = all_datasets.get("million_song_studies")
if million_song_studies is not None:
    million_song_studies = normalize_features(million_song_studies)

# Normalize Spotify User Behavior Dataset
spotify_user_behavior = all_datasets.get("spotify_user_behavior")
if spotify_user_behavior is not None:
    spotify_user_behavior = normalize_features(spotify_user_behavior)

# Normalize Spotify Song Prediction Dataset
spotify_song_prediction = all_datasets.get("spotify_song_prediction")
if spotify_song_prediction is not None:
    spotify_song_prediction = normalize_features(spotify_song_prediction)

# Normalize Musics Demographic Data
musics_demographic_data = all_datasets.get("musics_demographic_data")
if musics_demographic_data is not None:
    musics_demographic_data = normalize_features(musics_demographic_data)

In [None]:
def encode_categorical_features(df):
    if df is None:
        print("Error: DataFrame is None.")
        return None, {}

    df = df.copy()
    categorical_columns = df.select_dtypes(include=['object', 'category']).columns.tolist()
    print(f"Detected categorical columns for encoding: {categorical_columns}")

    label_encoders = {}
    for col in categorical_columns:
        le = LabelEncoder()
        df[col] = le.fit_transform(df[col].astype(str))
        label_encoders[col] = le
        print(f"Encoded column: {col}")

    return df, label_encoders

# Load all datasets
all_datasets = load_specific_datasets(data_dirs)

# Normalize Million Song Dataset
million_song_data = all_datasets.get("million_song")
if million_song_data is not None:
    million_song_data["train"] = normalize_features(million_song_data["train"])
    million_song_data["test"] = normalize_features(million_song_data["test"])

# Normalize Spotify Dataset
spotify_data = all_datasets.get("spotify_songs")
if spotify_data is not None:
    for key, df in spotify_data.items():
        spotify_data[key] = normalize_features(df)

# Normalize Million Song Studies
million_song_studies = all_datasets.get("million_song_studies")
if million_song_studies is not None:
    million_song_studies = normalize_features(million_song_studies)

# Normalize Spotify User Behavior Dataset
spotify_user_behavior = all_datasets.get("spotify_user_behavior")
if spotify_user_behavior is not None:
    spotify_user_behavior = normalize_features(spotify_user_behavior)

# Encode Categorical Features
if million_song_data is not None:
    print("Encoding Million Song Train DataFrame:")
    million_song_data["train"], million_song_label_encoders = encode_categorical_features(million_song_data["train"])
    print("Encoding Million Song Test DataFrame:")
    million_song_data["test"], _ = encode_categorical_features(million_song_data["test"])

if spotify_data is not None:
    for key, df in spotify_data.items():
        print(f"Encoding Spotify DataFrame: {key}")
        spotify_data[key], spotify_label_encoders = encode_categorical_features(df)

if million_song_studies is not None:
    print("Encoding Million Song Studies DataFrame:")
    million_song_studies, million_song_studies_encoders = encode_categorical_features(million_song_studies)

if spotify_user_behavior is not None:
    print("Encoding Spotify User Behavior DataFrame:")
    spotify_user_behavior, spotify_user_behavior_encoders = encode_categorical_features(spotify_user_behavior)

# *EDA Section*

In [None]:
# Suppress FutureWarnings
warnings.filterwarnings('ignore', category=FutureWarning)

# Selected music-related features
music_features = [
    'duration', 'key', 'tempo', 'time_signature', 'end_of_fade_in',
    'start_of_fade_out', 'loudness'
]

# Combine numerical data from multiple datasets
def combine_numerical_data(datasets):
    combined_data = pd.DataFrame()
    
    for key, data in datasets.items():
        if isinstance(data, dict):  # For datasets with train/test split
            for sub_key, df in data.items():
                if isinstance(df, pd.DataFrame):
                    combined_data = pd.concat([combined_data, df], ignore_index=True)
        elif isinstance(data, pd.DataFrame):
            combined_data = pd.concat([combined_data, data], ignore_index=True)

    return combined_data

# Load and combine all datasets
all_datasets = load_specific_datasets(data_dirs)
combined_data = combine_numerical_data(all_datasets)

# Filter combined_data to include only the selected features
available_music_features = [feature for feature in music_features if feature in combined_data.columns]
music_data = combined_data[available_music_features]

# Convert non-numeric columns to numeric where possible using .loc to avoid SettingWithCopyWarning
for col in music_data.columns:
    music_data.loc[:, col] = pd.to_numeric(music_data[col], errors='coerce')


In [None]:
# Basic Statistics
summary_stats = music_data.describe()
print("Summary Statistics:")
print(summary_stats)


In [None]:
# Suppress RuntimeWarnings
warnings.filterwarnings('ignore', category=RuntimeWarning)

# Correlation Matrix
correlation_matrix = music_data.corr()

# Visualization: Correlation Heatmap
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix, annot=True, fmt='.2f', cmap='coolwarm')
plt.title('Music Feature Correlation Heatmap')
plt.show()

In [None]:
# Pairplot for key numerical features (select first 5 for readability)
key_features = music_data.select_dtypes(include=['float64', 'int64']).columns[:7]
sns.pairplot(music_data[key_features])
plt.show()


In [None]:
# Distribution of representative features
for feature in key_features:
    plt.figure(figsize=(8, 4))
    sns.histplot(music_data[feature], bins=30, kde=True)
    plt.title(f'Distribution of {feature}')
    plt.xlabel(feature)
    plt.ylabel('Frequency')
    plt.show()


# *Kmeans clustering*

## [6] K-Means Clustering

We demonstrate **K-Means** on a subset of columns (e.g., `duration`, `tempo`, `start_of_fade_out`, etc.).  

1. **Select features** for clustering.  
2. **Scale** them with `StandardScaler`.  
3. Use the **Elbow Method** to find a good number of clusters.  
4. Fit K-Means and label the data with cluster IDs.  
5. (Optional) **Label** each cluster with a descriptive name.  
6. **Visualize** clusters via PCA.


In [None]:
music_features = [
    'duration', 'key', 'tempo', 'time_signature', 'end_of_fade_in', 'start_of_fade_out'
]

In [None]:
# Combine numerical data from multiple datasets
def combine_numerical_data(datasets):
    combined_data = pd.DataFrame()
    for key, data in datasets.items():
        if isinstance(data, dict):  # For datasets with train/test split
            for sub_key, df in data.items():
                if isinstance(df, pd.DataFrame):
                    combined_data = pd.concat([combined_data, df], ignore_index=True)
        elif isinstance(data, pd.DataFrame):
            combined_data = pd.concat([combined_data, data], ignore_index=True)
    return combined_data

# Load and combine datasets
all_datasets = load_specific_datasets(data_dirs)
combined_data = combine_numerical_data(all_datasets)

# 🎯 Step 3: Filter for Selected Music Features
available_music_features = [feature for feature in music_features if feature in combined_data.columns]
music_data = combined_data[available_music_features]

In [None]:
df_cleaned = music_data.copy()

for col in music_features:
    if df_cleaned[col].dtype in ['float64', 'int64']:
        df_cleaned[col] = df_cleaned[col].fillna(df_cleaned[col].mean())
    else:
        # If there are any categorical features (unlikely here), fill with mode
        df_cleaned[col] = df_cleaned[col].fillna(df_cleaned[col].mode()[0])

In [None]:
# Scale the features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(df_cleaned[music_features])

# Determine optimal number of clusters
sse = []
k_range = range(1, 11)

for k in k_range:
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    preds = kmeans.fit_predict(scaled_features)
    sse.append(kmeans.inertia_)


In [None]:
# Plot SSE for the Elbow Method
plt.figure(figsize=(8, 4))
plt.plot(k_range, sse, marker='o')
plt.title('Elbow Method for Optimal Clusters')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Sum of Squared Errors (SSE)')
plt.grid(True)
plt.show()

In [None]:
# Step 1: Define Available Music Features
music_features = [
    'duration', 'key', 'tempo', 'time_signature', 'end_of_fade_in', 'start_of_fade_out'
]

# Step 2: Handle Missing Values by Filling with Mean
df_cleaned = music_data.copy()
for col in music_features:
    if df_cleaned[col].isnull().sum() > 0:
        df_cleaned[col] = df_cleaned[col].fillna(df_cleaned[col].mean())

# Step 3: Scale the Features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(df_cleaned[music_features])

# Step 4: Apply K-Means Clustering
optimal_k = 3
kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
df_cleaned['Cluster'] = kmeans.fit_predict(scaled_features)

# Step 5: Inverse Scaling for Interpretation
centroids = pd.DataFrame(scaler.inverse_transform(kmeans.cluster_centers_), columns=music_features)

# Step 6: Enhanced Labeling Based on Feature Dominance
# Step 6: Enhanced Labeling Based on Feature Dominance
def label_cluster_by_centroid(centroid, centroids):
    # Thresholds based on quartiles
    tempo_threshold = centroids['tempo'].quantile(0.75)
    fade_out_threshold = centroids['start_of_fade_out'].quantile(0.75)
    duration_threshold = centroids['duration'].quantile(0.75)

    # Labeling Logic for 3 Clusters
    if centroid['start_of_fade_out'] >= fade_out_threshold:
        return "folks"    # Cluster 1: Focus on long fade-out
    elif centroid['tempo'] >= tempo_threshold:
        return "pop"       # Cluster 2: Focus on fast tempo
    else:
        return "jazz"      # Cluster 3: Everything else, relaxed mood


# Step 7: Assign Cluster Labels
cluster_labels = {i: label_cluster_by_centroid(centroids.iloc[i], centroids) for i in range(optimal_k)}
df_cleaned['Cluster_Label'] = df_cleaned['Cluster'].map(cluster_labels)

# Step 8: Display Final Results
print("Cluster Centroids:")
print(centroids)

print("\nSample Clustered Data:")
print(df_cleaned[['duration', 'tempo', 'key', 'Cluster', 'Cluster_Label']].head())


In [None]:
# PCA for Visualization
pca = PCA(n_components=2)
pca_result = pca.fit_transform(scaled_features)
df_cleaned['PCA1'] = pca_result[:, 0]
df_cleaned['PCA2'] = pca_result[:, 1]

# Map cluster labels
cluster_label_map = df_cleaned.groupby("Cluster")["Cluster_Label"].first().to_dict()

In [None]:
# Plot Clusters
plt.figure(figsize=(8, 6))
scatter = plt.scatter(df_cleaned['PCA1'], df_cleaned['PCA2'], c=df_cleaned['Cluster'], cmap='viridis', s=10)

# Plot Cluster Centers
cluster_centers = pca.transform(kmeans.cluster_centers_)
plt.scatter(cluster_centers[:, 0], cluster_centers[:, 1], c='red', marker='X', s=100, label='Cluster Centers')

# Label Cluster Centers
for i, (x, y) in enumerate(cluster_centers):
    cluster_name = cluster_label_map.get(i, f"Cluster {i}")
    plt.text(x, y, cluster_name, fontsize=12, fontweight='bold', ha='center', color='black',
             bbox=dict(facecolor='white', alpha=0.6))

# Final Touches
plt.title('Clusters Visualization (PCA)')
plt.xlabel('PCA 1')
plt.ylabel('PCA 2')
plt.colorbar(scatter, label='Cluster')
plt.legend()
plt.show()


In [None]:
print(centroids)

In [None]:
# Assuming 'df_cleaned' contains song features and 'kmeans.labels_' holds cluster labels
df_cleaned["mood_cluster"] = kmeans.labels_

# Save to CSV
df_cleaned.to_csv("clustered_songs.csv", index=False)
print("Clustered dataset saved as clustered_songs.csv")

# **Mood prediction model**

In [None]:
# ✅ Using the HTML-extracted DataFrame
df_cleaned = spotify_user_behavior.copy()

# ✅ Encode Categorical Features
categorical_features = ['Gender', 'spotify_subscription_plan', 'preferred_listening_content', 
                         'fav_music_genre', 'music_time_slot', 'music_expl_method']

label_encoders = {}
for col in categorical_features:
    le = LabelEncoder()
    df_cleaned[col] = le.fit_transform(df_cleaned[col].astype(str))
    label_encoders[col] = le

# ✅ Encode the Target Variable (User Mood)
label_encoder_mood = LabelEncoder()
df_cleaned['Encoded_Mood'] = label_encoder_mood.fit_transform(df_cleaned['music_Influencial_mood'].astype(str))

# ✅ Select Features and Target
features = ['Gender', 'spotify_subscription_plan', 'preferred_listening_content', 
            'fav_music_genre', 'music_time_slot', 'music_recc_rating', 'music_expl_method']
X = df_cleaned[features]
y = df_cleaned['Encoded_Mood']

In [None]:
# ✅ Scaling
scaler = RobustScaler()
X_scaled = scaler.fit_transform(X)

# ✅ Polynomial Features
poly = PolynomialFeatures(degree=2, interaction_only=True, include_bias=False)
X_poly = poly.fit_transform(X_scaled)

# ✅ Resampling with RandomOverSampler
ros = RandomOverSampler(random_state=42)
X_resampled, y_resampled = ros.fit_resample(X_poly, y)

# ✅ Train-test Split
X_train, X_test, y_train, y_test = train_test_split(X_resampled, y_resampled, test_size=0.2, random_state=42)


In [None]:
# ✅ Logistic Regression Model with Regularization
class UserMoodPredictionModel:
    def __init__(self, learning_rate=0.005, n_iterations=30000, regularization_strength=0.05):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.reg_strength = regularization_strength
        self.weights = None
        self.bias = None

    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.classes = np.unique(y)
        self.weights = np.zeros((len(self.classes), n_features))
        self.bias = np.zeros(len(self.classes))

        for idx, cls in enumerate(self.classes):
            y_binary = np.where(y == cls, 1, 0)
            for _ in range(self.n_iterations):
                linear_model = np.dot(X, self.weights[idx]) + self.bias[idx]
                y_predicted = self.sigmoid(linear_model)

                dw = (1 / n_samples) * np.dot(X.T, (y_predicted - y_binary)) + self.reg_strength * self.weights[idx]
                db = (1 / n_samples) * np.sum(y_predicted - y_binary)

                self.weights[idx] -= self.learning_rate * dw
                self.bias[idx] -= self.learning_rate * db

    def predict(self, X):
        linear_model = np.dot(X, self.weights.T) + self.bias
        y_predicted = self.sigmoid(linear_model)
        return np.argmax(y_predicted, axis=1)

In [None]:
# ✅ Initialize and Train the Logistic Regression Model
logistic_model = UserMoodPredictionModel()
logistic_model.fit(X_train, y_train)

# ✅ Random Forest Classifier
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)

# ✅ Predictions
logistic_pred = logistic_model.predict(X_test)
rf_pred = rf_model.predict(X_test)


In [None]:
# ✅ Evaluation Function
def evaluate_model(y_true, y_pred, model_name):
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='weighted')
    precision = precision_score(y_true, y_pred, average='weighted', zero_division=1)
    recall = recall_score(y_true, y_pred, average='weighted', zero_division=1)

    print(f"\n{model_name} Performance:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")

    unique_classes = np.unique(y_true)
    target_names = label_encoder_mood.inverse_transform(unique_classes)

    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, labels=unique_classes, target_names=target_names, zero_division=1))

    # ✅ Confusion Matrix
    conf_mat = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=target_names, yticklabels=target_names)
    plt.title(f'{model_name} - Confusion Matrix')
    plt.xlabel('Predicted Mood')
    plt.ylabel('Actual Mood')
    plt.show()

In [None]:
# ✅ Model Evaluation
evaluate_model(y_test, logistic_pred, "Logistic Regression")
evaluate_model(y_test, rf_pred, "Random Forest Classifier")

# **Recommendation System Neural Network**


* The dataset is loaded and features (such as valence, danceability, etc.) and target (mood_cluster) are selected.
* Features are normalized using StandardScaler to improve model performance.
* The dataset is split into training and testing sets using train_test_split.


In [None]:
selected_features = [
    'valence', 'danceability', 'energy', 'acousticness', 'tempo'
]

In [None]:
# Combine numerical data from multiple datasets
def combine_numerical_data(datasets):
    combined_data = pd.DataFrame()
    for key, data in datasets.items():
        if isinstance(data, dict):  # For datasets with train/test split
            for sub_key, df in data.items():
                if isinstance(df, pd.DataFrame):
                    combined_data = pd.concat([combined_data, df], ignore_index=True)
        elif isinstance(data, pd.DataFrame):
            combined_data = pd.concat([combined_data, data], ignore_index=True)
    return combined_data

# Load and combine datasets
all_datasets = load_specific_datasets(data_dirs)
combined_data = combine_numerical_data(all_datasets)

# 🎯 Step 3: Filter for Selected Music Features
available_music_features = [feature for feature in selected_features if feature in combined_data.columns]
selected_music_data = combined_data[available_music_features]

In [None]:
# Load the clustered dataset (assuming it contains song features and mood clusters)
data = pd.read_csv("clustered_songs.csv")  # Update with actual file path

# Select features and target
features = available_music_features
target = 'mood_cluster'  # Mood cluster as label

X = data[features]
y = data[target]

# Normalize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)



* A Sequential model is used to define the neural network.
* It has two hidden layers with ReLU activation and dropout for regularization.
* The output layer uses a softmax activation for multi-class classification, with the number of units equal to the number of unique mood clusters
.

In [None]:
# Define the neural network model
model = Sequential([
    Input(shape=(X_train.shape[1],)),  # Define input layer explicitly
    Dense(64, activation='relu'),
    Dropout(0.2),
    Dense(32, activation='relu'),
    Dropout(0.2),
    Dense(len(y.unique()), activation='softmax')  # Output layer for classification
])


* The model is compiled with Adam optimizer and sparse_categorical_crossentropy loss function.
* The model is trained for 50 epochs with a batch size of 16, using validation data to monitor performance.


In [None]:
# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=50, batch_size=16, validation_data=(X_test, y_test))

In [None]:
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Model Accuracy Over Epochs')

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Model Loss Over Epochs')

plt.show()

In [None]:
# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {accuracy:.4f}")

# Save the trained model
model.save("music_mood_recommendation_model.h5")