In [None]:

import json
import logging
import os
import pprint
import time
from collections.abc import Collection
from enum import Enum

import librosa
import numpy as np
import pandas as pd
import requests
import seaborn as sns
from dotenv import load_dotenv
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier, MLPRegressor
from sklearn.preprocessing import LabelEncoder, RobustScaler
from tqdm import tqdm
from xgboost import XGBClassifier, XGBRegressor

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

pp = pprint.PrettyPrinter(indent=4)

sns.set_theme()

load_dotenv()

pd.options.mode.chained_assignment = None

In [None]:
class SpotifyAPI:
    class SpotifyAPIError(Exception):
        def __init__(self, error_type: str, error_description: str):
            self.error_type = error_type
            self.error_description = error_description
            super().__init__(f"{self.error_type}: {self.error_description}")

    class AccessToken:
        def __init__(self, client_id: str, client_secret: str):
            self._client_id = client_id
            self._client_secret = client_secret
            self._token, self._expires_at = self.get_auth_token()
            self._header = {"Authorization": f"Bearer {self._token}"}

        def get_auth_token(self) -> tuple[str, float]:
            url = "https://accounts.spotify.com/api/token"
            header = {"Content-Type": "application/x-www-form-urlencoded"}
            payload = {
                "grant_type": "client_credentials",
                "client_id": self._client_id,
                "client_secret": self._client_secret,
            }
            response = requests.post(url=url, headers=header, data=payload)
            # TODO: Handle HTTP error and Authentication Error
            acquired_at = time.time()
            logger.info(f"Token expires at {response.json()}")
            expire_at = acquired_at + float(response.json()["expires_in"])
            return response.json()["access_token"], expire_at

        @property
        def token(self) -> str:
            # TODO: Add logging for token refresh
            if time.time() >= self._expires_at:
                logger.info("Refreshing Spotify API token")
                self._token, self._expires_at = self.get_auth_token()
            return self._token

        @property
        def header(self) -> dict:
            return {"Authorization": f"Bearer {self.token}"}

    def __init__(self, client_id, client_secret):
        self.token = self.AccessToken(client_id, client_secret)

    def get_track(self, track_id: str) -> dict:
        url = f"https://api.spotify.com/v1/tracks/{track_id}"
        response = requests.get(url=url, headers=self.token.header)
        return response.json()

    def get_tracks(self, track_ids: Collection[str]) -> dict:
        if len(track_ids) > 50:
            raise ValueError("Maximum 50 tracks per request")
        url = f"https://api.spotify.com/v1/tracks"
        payload = {"ids": ",".join(track_ids)}
        response = requests.get(url=url, headers=self.token.header, params=payload)
        return response.json()

    def get_track_features(self, track_id: str) -> dict:
        url = f"https://api.spotify.com/v1/audio-features/{track_id}"
        response = requests.get(url=url, headers=self.token.header)
        return response.json()

    def get_tracks_features(self, track_ids: Collection[str]) -> dict:
        if len(track_ids) > 50:
            raise ValueError("Maximum 50 tracks per request")
        url = f"https://api.spotify.com/v1/audio-features"
        payload = {"ids": ",".join(track_ids)}
        response = requests.get(url=url, headers=self.token.header, params=payload)
        return response.json()

In [None]:
def download_tracks(df: pd.DataFrame, max_tracks: int = 100, path: str = "data/tracks"):
    logger.info(f"Getting audio data of the first {max_tracks} tracks...")
    
    api = SpotifyAPI(os.getenv("SPOTIFY_CLIENT_ID"), os.getenv("SPOTIFY_CLIENT_SECRET"))
    songs = df["track_id"].head(max_tracks)

    existing_songs = {f.removesuffix(".mp3") for f in os.listdir(path)}
    logger.debug(f"Found {len(existing_songs)} existing tracks")

    with open(f"{path}/tracks_data.csv", "a+") as f:
        try:
            track_data = json.load(f)
        except json.decoder.JSONDecodeError:
            logger.info("No existing track data found, or file is corrupted")
            track_data = {}

    num_splits = (len(songs) // 50) + (len(songs) % 50 > 0)

    for song_batch in np.array_split(songs, num_splits):
        try:
            logger.debug(f"Downloading audio features for track {song_batch.tolist()}")
            song_batch = set(song_batch.tolist()) - existing_songs
            tracks: list = api.get_tracks(song_batch)["tracks"]

            for track in tracks:
                track_data[track["id"]] = track
                preview_url = track["preview_url"]

                if preview_url is None:
                    logger.debug(
                        f"Skipping {track['id']} for missing preview url"
                    )
                    continue

                response = requests.get(track["preview_url"])
                with open(f"{path}/{track['id']}.mp3", "wb") as f:
                    f.write(response.content)

                existing_songs.add(track["id"])
                time.sleep(0.1)
            time.sleep(3)

        except SpotifyAPI.SpotifyAPIError as e:
            logger.error(e)
            continue

    with open("data/tracks_data.csv", "w") as f:
        json.dump(track_data, f)

In [None]:
def read_data(path: str = "data/dataset.csv") -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

In [None]:
def data_visualization(df: pd.DataFrame) -> None:
    # pp.pprint(df.columns.to_list())
    df = df.sort_values(by="popularity", ascending=False)

    ax = sns.displot(data=df, x="popularity", kde=True, aspect=2)
    ax.set(title="Popularity Distribution", xlabel="Popularity", ylabel="Count")

    ax = sns.catplot(data=df, x="track_genre", y="popularity", kind="box", aspect=4, width=0.8)
    ax.tick_params(axis="x", rotation=90)

    # Popular songs tents to have higher danceability and enrergy
    ax = sns.relplot(data=df, x="popularity", y="danceability", aspect=2, s=10, hue="popularity_category", legend=False)
    ax.set(
        title="Relationship plot between Popularity and Danceability ",
        xlabel="Popularity",
        ylabel="Danceability",
    )
    
    ax = sns.relplot(data=df, x="popularity", y="energy", aspect=2, s=10, hue="popularity_category", legend=False)
    ax.set(
        title="Relationship plot between Popularity and Energy ",
        xlabel="Popularity",
        ylabel="Energy",
    )
    
    # Popularity of songs with higher speechiness tents to sit around 20
    ax = sns.relplot(data=df, x="popularity", y="speechiness", aspect=2, s=10, hue="popularity_category", legend=False)
    ax.set(
        title="Relationship plot between Popularity and Speechiness ",
        xlabel="Popularity",
        ylabel="Speechiness",
    )

    # Songs with high or low popularity tends to have lower instrumentalness
    ax = sns.relplot(data=df, x="popularity", y="liveness", aspect=2, s=10, hue="popularity_category", legend=False)
    ax.set(
        title="Relationship plot between Popularity and Liveness ",
        xlabel="Popularity",
        ylabel="Liveness",
    )

    # Features that does not show significant relationship
    # ax = sns.relplot(data=df, x="popularity", y="tempo", aspect=2, s=10)
    # ax = sns.relplot(data=df, x="popularity", y="acousticness", aspect=2, s=10)
    # ax = sns.relplot(data=df, x="popularity", y="instrumentalness", aspect=2, s=10)
    # ax = sns.relplot(data=df, x="popularity", y="valence", aspect=2, s=10)

In [None]:
class Mode(Enum):
    FULL = 0
    REDUCED_ACOUSTIC = 1
    REDUCED = 2


def preprocessing(
    df: pd.DataFrame,
    use_title_track=True,
    acoustic_features: dict | None = None,
    mode: Mode = Mode.REDUCED_ACOUSTIC,
) -> tuple:
    if mode == Mode.REDUCED_ACOUSTIC or mode == Mode.REDUCED:
        df["acoustic_feature"] = df.apply(
            lambda row: acoustic_features.get(row["track_id"], None), axis=1
        )
        df = df.dropna()
        acoustic_features = np.stack(df["acoustic_feature"], axis=0)
        df = df.drop(columns=["acoustic_feature"])
    X = df.drop(columns=["popularity"])
    y = df["popularity"]

    # Feature Engineering
    if use_title_track:
        df["is_title_track"] = df["track_name"].str.lower() == df["artists"].str.lower()

    # Encoding
    genre_encoder = LabelEncoder()
    genre_encoder.fit(X["track_genre"])
    X["track_genre"] = genre_encoder.transform(X["track_genre"])

    # Process with missing/extreme values
    ...

    # Drop
    X = X.drop(columns=["track_id", "artists", "album_name", "track_name"])

    # Split
    X_train, X_test, y_train, y_test, acoustic_train, acoustic_test = train_test_split(
        X, y, acoustic_features, test_size=0.2, random_state=42
    )

    # Normalization: duration_ms
    # For MLPRegressor, normalization is required
    scaler = RobustScaler()
    scaler.fit(X_train[["duration_ms", "instrumentalness"]])
    X_train[["duration_ms", "instrumentalness"]] = scaler.transform(
        X_train[["duration_ms", "instrumentalness"]]
    )
    X_test[["duration_ms", "instrumentalness"]] = scaler.transform(
        X_test[["duration_ms", "instrumentalness"]]
    )

    if mode == Mode.REDUCED_ACOUSTIC:
        X_train = np.concatenate((X_train, acoustic_train), axis=1)
        X_test = np.concatenate((X_test, acoustic_test), axis=1)

    return X_train, X_test, y_train.to_numpy(), y_test.to_numpy()

In [None]:
def get_acoustic_feature(df: pd.DataFrame, path: str = "data/tracks") -> pd.DataFrame:
    """Extract features from audio files and add them to the dataframe
    * Mel-Frequency Cepstral Coefficients(MFCC)
    * Spectral centroid
    * Spectral flatness
    * Zero crossings

    Keyword arguments:
    df -- Source dataframe
    path -- Path to the audio files
    Return: Dataframe with extracted acoustic features
    """
    tracks = os.listdir(path)

    # df["acoustic_features"] = df.apply(lambda r: np.zeros((1,)), axis=1)

    acoustic_features = {}

    for tracks in tqdm(tracks):
        if tracks.endswith(".mp3"):
            track_id = tracks.split(".")[0]
            track_path = f"{path}/{tracks}"
            y, sr = librosa.load(track_path)
            spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
            spectral_flatness = librosa.feature.spectral_flatness(y=y)
            zero_crossings = librosa.feature.zero_crossing_rate(y=y)
            acoustic_feature = np.concatenate(
                (spectral_centroid, spectral_flatness, zero_crossings), axis=1
            ).squeeze()
            acoustic_features[track_id] = acoustic_feature

    return acoustic_features

In [None]:
def train_model_mlp(X, y):
    model = MLPRegressor(hidden_layer_sizes=(100, 100, 100), max_iter=500)
    model.fit(X, y)
    return model

In [None]:
def train_model_mlp_cls(X, y):
    model = MLPClassifier(hidden_layer_sizes=(100, 100, 100), max_iter=500)
    model.fit(X, y)
    return model

In [None]:
def train_model_xgb(X, y) -> XGBRegressor:
    model = XGBRegressor(
        n_estimators=100,
        max_depth=10,
        learning_rate=0.01,
        n_jobs=4,
        random_state=42,
        # device="cuda",
    )
    model.fit(X, y)
    return model

In [None]:
def train_model_xgb_cls(X, y) -> XGBClassifier:
    model = XGBClassifier(
        n_estimators=100,
        max_depth=10,
        learning_rate=0.01,
        n_jobs=4,
        random_state=42,
        # device="cuda",
    )
    model.fit(X, y)
    return model

In [None]:
def train_model_random_forest(X, y) -> RandomForestClassifier:
    model = RandomForestClassifier(n_estimators=100, max_depth=10, n_jobs=4, random_state=42)
    model.fit(X, y)
    return model

In [None]:
def evaluate_model(model, X, y):
    y_pred = model.predict(X)
    y_pred = y_pred.round()
    y_pred = y_pred.clip(0, 100)
    y_pred = y_pred.astype(int)
    return y_pred

In [None]:
if __name__ == "__main__":
    remove_zero_popularity = True
    reduce_popularity = True
    visualization = True
    use_acoustic_features = False

    mode = Mode.REDUCED

    df = read_data()
    
    labels=["unpopular", "intermediate", "popular", "highly_popular"]

    if remove_zero_popularity:
        df = df[df["popularity"] > 0]

    if reduce_popularity:
        if not remove_zero_popularity:
            raise ValueError("remove_zero_popularity must be True if reduce_popularity is True")

        # NOTE: The SettingWithCopyWarning should be ignored
        df["popularity_category"] = pd.qcut(df["popularity"], 4, labels=False)

        # bins = df[["popularity"]].describe().loc[["25%", "50%", "75%"]]["popularity"].to_list()
        # df["popularity"] = pd.cut(
        #     df["popularity"],
        #     bins=[0,] + bins + [100,],
        #     labels=[0, 1, 2, 3],
        #     # labels=["unpopular", "intermediate", "popular", "highly_popular"],
        # )

    if visualization:
        data_visualization(df)

    match mode:
        case Mode.FULL:
            pass
        case Mode.REDUCED_ACOUSTIC | Mode.REDUCED:
            logger.info("Downloading acoustic features...")
            # download_tracks(df, max_tracks=10000, path="data/tracks")
            logger.info("Building acoustic features...")
            acoustic_features = get_acoustic_feature(df)

    if reduce_popularity:
        df["popularity"] = df["popularity_category"]
        df = df.drop(columns=["popularity_category"])

    X_train, X_test, y_train, y_test = preprocessing(
        df,
        use_title_track=True,
        acoustic_features=acoustic_features,
        mode=mode,
    )

    # Move the data to GPU if possible
    logger.info("XGB Classifier...")
    model_xgb_cls = train_model_xgb_cls(X_train, y_train)
    y_pred = evaluate_model(model_xgb_cls, X_test, y_test)
    logger.info(f"{accuracy_score(y_test, y_pred):.3f}")
    logger.info(classification_report(y_test, y_pred))

    # logger.info("XGB Regressor...")
    # model_xgb = train_model_xgb(X_train, y_train)
    # y_pred = evaluate_model(model_xgb, X_test, y_test)
    # logger.info(f"{accuracy_score(y_test, y_pred):.3f}")

    logger.info("MLP Classifier...")
    model_mlp_cls = train_model_mlp_cls(X_train, y_train)
    y_pred = evaluate_model(model_mlp_cls, X_test, y_test)
    logger.info(f"{accuracy_score(y_test, y_pred):.3f}")
    logger.info(classification_report(y_test, y_pred))
    
    # logger.info("MLP Regressor...")
    # model_mlp = train_model_mlp(X_train, y_train)
    # y_pred = evaluate_model(model_mlp, X_test, y_test)
    # logger.info(f"{accuracy_score(y_test, y_pred):.3f}")

    logger.info("Random Forest...")
    model_random_forest = train_model_random_forest(X_train, y_train)
    y_pred = evaluate_model(model_random_forest, X_test, y_test)
    logger.info(f"{accuracy_score(y_test, y_pred):.3f}")
    logger.info(classification_report(y_test, y_pred))
    