In [None]:
import pandas as pd
import numpy as np
from datasets import load_dataset
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import mean_squared_error, r2_score
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout, BatchNormalization
from tensorflow.keras.regularizers import l1_l2
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import plotly.express as px
import plotly.graph_objects as go

# Enhanced data preprocessing
def load_and_preprocess_data():
    ds = load_dataset("maharshipandya/spotify-tracks-dataset")
    df = pd.DataFrame(ds["train"])
    
    print("✅ Dataset loaded. First few rows:")
    print(df.head(), "\n")

    # More robust encoding
    le = LabelEncoder()
    df['track_genre_encoded'] = le.fit_transform(df['track_genre'])

    numeric_features = [
        'danceability', 'energy', 'loudness', 'speechiness', 
        'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
        'track_genre_encoded' 
    ]
    
    # More thorough data cleaning
    df = df.dropna(subset=numeric_features).reset_index(drop=True)
    
    # Remove outliers using IQR
    for feature in numeric_features[:-1]:  # Skip the encoded genre
        Q1 = df[feature].quantile(0.25)
        Q3 = df[feature].quantile(0.75)
        IQR = Q3 - Q1
        df = df[~((df[feature] < (Q1 - 1.5 * IQR)) | (df[feature] > (Q3 + 1.5 * IQR)))]
    
    X = df[numeric_features].values
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    return df, X_scaled, numeric_features

def plot_feature_heatmap(df, numeric_features):
    # Calculate correlation matrix
    correlation_matrix = df[numeric_features].corr()
    
    # Create heatmap using plotly
    fig = go.Figure(data=go.Heatmap(
        z=correlation_matrix,
        x=numeric_features,
        y=numeric_features,
        colorscale='RdBu',
        zmin=-1,
        zmax=1,
        text=np.round(correlation_matrix, 2),
        texttemplate='%{text}',
        textfont={"size": 10},
        hoverongaps=False
    ))
    
    fig.update_layout(
        title='Feature Correlation Heatmap',
        width=900,
        height=900,
        xaxis_tickangle=-45
    )
    
    fig.show()
    
    print("\n🔍 Strongest Feature Correlations:")
    correlations = []
    for i in range(len(numeric_features)):
        for j in range(i+1, len(numeric_features)):
            corr = correlation_matrix.iloc[i,j]
            if abs(corr) > 0.3: 
                correlations.append((
                    numeric_features[i],
                    numeric_features[j],
                    corr
                ))
    
    correlations.sort(key=lambda x: abs(x[2]), reverse=True)
    
    for feat1, feat2, corr in correlations:
        print(f"{feat1} ↔️ {feat2}: {corr:.3f}")

# Enhanced autoencoder architecture
def build_autoencoder(input_dim, encoding_dim=8):
    # Input layer
    input_layer = Input(shape=(input_dim,))
    
    # Encoder with more layers and regularization
    encoded = Dense(64, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(input_layer)
    encoded = BatchNormalization()(encoded)
    encoded = Dropout(0.2)(encoded)
    
    encoded = Dense(32, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(encoded)
    encoded = BatchNormalization()(encoded)
    encoded = Dropout(0.2)(encoded)
    
    # Bottleneck layer
    encoded = Dense(encoding_dim, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(encoded)
    
    # Decoder
    decoded = Dense(32, activation='relu')(encoded)
    decoded = BatchNormalization()(decoded)
    decoded = Dropout(0.2)(decoded)
    
    decoded = Dense(64, activation='relu')(decoded)
    decoded = BatchNormalization()(decoded)
    decoded = Dropout(0.2)(decoded)
    
    # Output layer
    decoded = Dense(input_dim, activation='linear')(decoded)
    
    autoencoder = Model(inputs=input_layer, outputs=decoded)
    encoder = Model(inputs=input_layer, outputs=encoded)
    
    autoencoder.compile(optimizer='adam', loss='mse')
    return autoencoder, encoder

# Enhanced training process
def train_autoencoder(autoencoder, X_scaled, epochs=50, batch_size=128):
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)
    ]
    
    history = autoencoder.fit(
        X_scaled, X_scaled,
        epochs=epochs,
        batch_size=batch_size,
        validation_split=0.2,  # Increased validation split
        callbacks=callbacks,
        verbose=1
    )
    
    # Plot training history
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
    
    return history

def evaluate_autoencoder(autoencoder, X_scaled):
    X_reconstructed = autoencoder.predict(X_scaled)
    mse = mean_squared_error(X_scaled, X_reconstructed)
    r2 = r2_score(X_scaled, X_reconstructed)
    
    print(f"\n🔹 Autoencoder Accuracy Metrics:")
    print(f"   ✅ Mean Squared Error (MSE): {mse:.5f}")
    print(f"   ✅ R² Score: {r2:.5f}")

def build_knn_model(latent_features, metric='cosine'):
    knn = NearestNeighbors(metric=metric, algorithm='brute')
    knn.fit(latent_features)
    return knn

def plot_tracks_by_genre(latent_features, df):
    pca = PCA(n_components=3)
    latent_3d = pca.fit_transform(latent_features)
    
    plot_df = pd.DataFrame(
        latent_3d, 
        columns=['PC1', 'PC2', 'PC3']
    )
    plot_df['Genre'] = df['track_genre']
    plot_df['Track'] = df['track_name']
    plot_df['Artist'] = df['artists']
    
    fig = px.scatter_3d(
        plot_df, 
        x='PC1', 
        y='PC2', 
        z='PC3',
        color='Genre',
        hover_data=['Track', 'Artist'],
        title='3D Interactive Visualization of Tracks by Genre',
        labels={'PC1': 'First Principal Component',
                'PC2': 'Second Principal Component',
                'PC3': 'Third Principal Component'}
    )
    
    fig.update_layout(
        scene = dict(
            xaxis_title='PC1',
            yaxis_title='PC2',
            zaxis_title='PC3'
        ),
        width=1200,
        height=800,
        showlegend=True,
        legend=dict(
            yanchor="top",
            y=0.99,
            xanchor="left",
            x=0.85
        )
    )
    
    fig.show()

    genre_dist = df['track_genre'].value_counts()
    print("\n🎵 Genre Distribution:")
    for genre, count in genre_dist.items():
        print(f"{genre}: {count} tracks")

def get_track_recommendations(track_index, n_recommendations=5):
    print("\n🎵 Selected Track:")
    selected_track = df.iloc[track_index]
    print(f"Track: {selected_track['track_name']}")
    print(f"Artist: {selected_track['artists']}")
    print(f"Genre: {selected_track['track_genre']}")

    track_vector = latent_features[track_index].reshape(1, -1)
    distances, indices = knn.kneighbors(track_vector, n_neighbors=20)  
    similar_indices = indices.flatten()[1:]  
    similar_tracks = df.iloc[similar_indices]

    selected_genre = selected_track['track_genre']
    filtered_tracks = similar_tracks[similar_tracks['track_genre'] != selected_genre]

    recommendations = similar_tracks.head(n_recommendations)

    print("\n🎶 Recommended Tracks (Different Genre):")
    return recommendations[['track_name', 'artists', 'track_genre']]

def main():
    global df, latent_features, knn
    
    df, X_scaled, numeric_features = load_and_preprocess_data()
    plot_feature_heatmap(df, numeric_features)
    input_dim = X_scaled.shape[1]  
    encoding_dim = 8  # Increased encoding dimension
    
    autoencoder, encoder = build_autoencoder(input_dim, encoding_dim)
    
    print("🚀 Training autoencoder with enhanced architecture...")
    history = train_autoencoder(autoencoder, X_scaled, epochs=50, batch_size=128)
    
    evaluate_autoencoder(autoencoder, X_scaled)

    print("\n🔍 Generating latent representations...")
    latent_features = encoder.predict(X_scaled)
    knn = build_knn_model(latent_features, metric='cosine')
    
    print("\n📊 Generating genre distribution visualization...")
    plot_tracks_by_genre(latent_features, df)
    
    sample_track_index = 15
    recommendations = get_track_recommendations(sample_track_index)
    print(recommendations)

if __name__ == "__main__":
    main()

In [1]:
import pandas as pd
import numpy as np
from datasets import load_dataset
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import mean_squared_error, r2_score
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout, BatchNormalization
from tensorflow.keras.regularizers import l1_l2
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import plotly.express as px
import plotly.graph_objects as go

# Constants
FEATURE_CORRELATION_THRESHOLD = 0.3
IQR_OUTLIER_MULTIPLIER = 1.5
AUTOENCODER_ENCODING_DIM = 8
AUTOENCODER_EPOCHS = 50
AUTOENCODER_BATCH_SIZE = 128
AUTOENCODER_VALIDATION_SPLIT = 0.2
KNN_METRIC = 'cosine'
N_RECOMMENDATIONS = 5
KNN_NEIGHBORS = 20  # Number of neighbors to consider for recommendations

def load_and_preprocess_data():
    """
    Loads the Spotify tracks dataset, preprocesses it, and returns
    the processed DataFrame and scaled numeric features.

    Returns:
        tuple: (DataFrame, NumPy array, list)
               - DataFrame: The processed DataFrame.
               - NumPy array: Scaled numeric features.
               - list: List of numeric feature names.
    """

    try:
        ds = load_dataset("maharshipandya/spotify-tracks-dataset")
        df = pd.DataFrame(ds["train"])
    except Exception as e:
        raise ValueError(f"Error loading dataset: {e}")

    print("✅ Dataset loaded. First few rows:")
    print(df.head(), "\n")

    le = LabelEncoder()
    df['track_genre_encoded'] = le.fit_transform(df['track_genre'])

    numeric_features = [
        'danceability', 'energy', 'loudness', 'speechiness',
        'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
        'track_genre_encoded'
    ]

    df = df.dropna(subset=numeric_features).reset_index(drop=True)

    for feature in numeric_features[:-1]:  # Skip the encoded genre
        Q1 = df[feature].quantile(0.25)
        Q3 = df[feature].quantile(0.75)
        IQR = Q3 - Q1
        df = df[~((df[feature] < (Q1 - IQR_OUTLIER_MULTIPLIER * IQR)) |
              (df[feature] > (Q3 + IQR_OUTLIER_MULTIPLIER * IQR)))]

    X = df[numeric_features].values
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    return df.copy(), X_scaled.copy(), numeric_features  # Return copies to avoid unintended modifications


def plot_feature_heatmap(df, numeric_features):
    """
    Generates a heatmap visualizing the correlation between numeric features.

    Args:
        df (DataFrame): The DataFrame containing the data.
        numeric_features (list): List of numeric feature names.
    """

    correlation_matrix = df[numeric_features].corr()

    fig = go.Figure(data=go.Heatmap(
        z=correlation_matrix,
        x=numeric_features,
        y=numeric_features,
        colorscale='RdBu',
        zmin=-1,
        zmax=1,
        text=np.round(correlation_matrix, 2),
        texttemplate='%{text}',
        textfont={"size": 10},
        hoverongaps=False
    ))

    fig.update_layout(
        title='Feature Correlation Heatmap',
        width=900,
        height=900,
        xaxis_tickangle=-45
    )

    fig.show()

    print("\n🔍 Strongest Feature Correlations:")
    correlations = []
    for i in range(len(numeric_features)):
        for j in range(i + 1, len(numeric_features)):
            corr = correlation_matrix.iloc[i, j]
            if abs(corr) > FEATURE_CORRELATION_THRESHOLD:
                correlations.append((
                    numeric_features[i],
                    numeric_features[j],
                    corr
                ))

    correlations.sort(key=lambda x: abs(x[2]), reverse=True)

    for feat1, feat2, corr in correlations:
        print(f"{feat1} ↔️ {feat2}: {corr:.3f}")


def build_autoencoder(input_dim, encoding_dim=AUTOENCODER_ENCODING_DIM):
    """
    Builds an autoencoder model for dimensionality reduction.

    Args:
        input_dim (int): Dimensionality of the input data.
        encoding_dim (int): Dimensionality of the bottleneck layer.

    Returns:
        tuple: (Model, Model) - The autoencoder and encoder models.
    """

    input_layer = Input(shape=(input_dim,))

    # Encoder with more layers and regularization
    encoded = Dense(64, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(input_layer)
    encoded = BatchNormalization()(encoded)
    encoded = Dropout(0.2)(encoded)

    encoded = Dense(32, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(encoded)
    encoded = BatchNormalization()(encoded)
    encoded = Dropout(0.2)(encoded)

    # Bottleneck layer
    encoded = Dense(encoding_dim, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(encoded)

    # Decoder
    decoded = Dense(32, activation='relu')(encoded)
    decoded = BatchNormalization()(decoded)
    decoded = Dropout(0.2)(decoded)

    decoded = Dense(64, activation='relu')(decoded)
    decoded = BatchNormalization()(decoded)
    decoded = Dropout(0.2)(decoded)

    # Output layer
    decoded = Dense(input_dim, activation='linear')(decoded)

    autoencoder = Model(inputs=input_layer, outputs=decoded)
    encoder = Model(inputs=input_layer, outputs=encoded)

    autoencoder.compile(optimizer='adam', loss='mse')
    return autoencoder, encoder


def train_autoencoder(autoencoder, X_scaled, epochs=AUTOENCODER_EPOCHS, batch_size=AUTOENCODER_BATCH_SIZE):
    """
    Trains the autoencoder model.

    Args:
        autoencoder (Model): The autoencoder model to train.
        X_scaled (NumPy array): Scaled training data.
        epochs (int): Number of training epochs.
        batch_size (int): Batch size for training.

    Returns:
        History: Training history object.
    """

    callbacks = [
        EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)
    ]

    try:
        history = autoencoder.fit(
            X_scaled, X_scaled,
            epochs=epochs,
            batch_size=batch_size,
            validation_split=AUTOENCODER_VALIDATION_SPLIT,
            callbacks=callbacks,
            verbose=1
        )
    except Exception as e:
        raise Exception(f"Error during autoencoder training: {e}")

    # Plot training history
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

    return history


def evaluate_autoencoder(autoencoder, X_scaled):
    """
    Evaluates the autoencoder model's performance.

    Args:
        autoencoder (Model): The trained autoencoder model.
        X_scaled (NumPy array): Scaled data for evaluation.
    """

    X_reconstructed = autoencoder.predict(X_scaled)
    mse = mean_squared_error(X_scaled, X_reconstructed)
    r2 = r2_score(X_scaled, X_reconstructed)

    print(f"\n🔹 Autoencoder Accuracy Metrics:")
    print(f"  ✅ Mean Squared Error (MSE): {mse:.5f}")
    print(f"  ✅ R² Score: {r2:.5f}")


def build_knn_model(latent_features, metric=KNN_METRIC):
    """
    Builds a KNN model for track recommendations.

    Args:
        latent_features (NumPy array): Latent features of the tracks.
        metric (str): Distance metric for KNN.

    Returns:
        NearestNeighbors: Trained KNN model.
    """

    knn = NearestNeighbors(metric=metric, algorithm='brute')  # Consider experimenting with 'kd_tree' or 'ball_tree'
    knn.fit(latent_features)
    return knn


def plot_tracks_by_genre(latent_features, df):
    """
    Visualizes tracks in a 3D space using PCA, colored by genre.

    Args:
        latent_features (NumPy array): Latent features of the tracks.
        df (DataFrame): DataFrame containing track information.
    """

    pca = PCA(n_components=3)
    latent_3d = pca.fit_transform(latent_features)

    plot_df = pd.DataFrame(
        latent_3d,
        columns=['PC1', 'PC2', 'PC3']
    )
    plot_df['Genre'] = df['track_genre']
    plot_df['Track'] = df['track_name']
    plot_df['Artist'] = df['artists']

    fig = px.scatter_3d(
        plot_df,
        x='PC1',
        y='PC2',
        z='PC3',
        color='Genre',
        hover_data=['Track', 'Artist'],
        title='3D Interactive Visualization of Tracks by Genre',
        labels={'PC1': 'First Principal Component',
                'PC2': 'Second Principal Component',
                'PC3': 'Third Principal Component'}
    )

    fig.update_layout(
        scene=dict(
            xaxis_title='PC1',
            yaxis_title='PC2',
            zaxis_title='PC3'
        ),
        width=1200,
        height=800,
        showlegend=True,
        legend=dict(
            yanchor="top",
            y=0.99,
            xanchor="left",
            x=0.85
        )
    )

    fig.show()

    genre_dist = df['track_genre'].value_counts()
    print("\n🎵 Genre Distribution:")
    for genre, count in genre_dist.items():
        print(f"{genre}: {count} tracks")


def get_track_recommendations(track_index, n_recommendations=N_RECOMMENDATIONS):
    """
    Recommends tracks similar to a given track.

    Args:
        track_index (int): Index of the track to find recommendations for.
        n_recommendations (int): Number of recommendations to return.

    Returns:
        DataFrame: DataFrame containing recommended tracks.
    """
    if not 0 <= track_index < len(df):
        raise ValueError(f"track_index ({track_index}) is out of bounds (0, {len(df) - 1})")

    print("\n🎵 Selected Track:")
    selected_track = df.iloc[track_index]
    print(f"Track: {selected_track['track_name']}")
    print(f"Artist: {selected_track['artists']}")
    print(f"Genre: {selected_track['track_genre']}")

    track_vector = latent_features[track_index].reshape(1, -1)
    distances, indices = knn.kneighbors(track_vector, n_neighbors=KNN_NEIGHBORS)
    similar_indices = indices.flatten()[1:]
    similar_tracks = df.iloc[similar_indices]

    selected_genre = selected_track['track_genre']
    #print(f"Selected Genre: {selected_genre}") # Debugging
    #print(f"All Similar Tracks Genres: {similar_tracks['track_genre'].unique()}") # Debugging
    filtered_tracks = similar_tracks[similar_tracks['track_genre'] != selected_genre]

    recommendations = filtered_tracks.head(n_recommendations)

    print("\n🎶 Recommended Tracks (Different Genre):")
    return recommendations[['track_name', 'artists', 'track_genre']]


def main():
    global df, latent_features, knn

    try:
        df, X_scaled, numeric_features = load_and_preprocess_data()
    except ValueError as e:
        print(f"Error during data loading/preprocessing: {e}")
        return

    plot_feature_heatmap(df, numeric_features)
    input_dim = X_scaled.shape[1]

    try:
        autoencoder, encoder = build_autoencoder(input_dim)
    except Exception as e:
        print(f"Error building autoencoder: {e}")
        return
    print("🚀 Training autoencoder with enhanced architecture...")
    