# Subtask 1
## K-means Clustering

Requirements:
- numpy
- pandas
- scikit-learn

Python 3.13.1

In [18]:
import logging
from typing import List, Optional, Tuple

import numpy as np
import pandas as pd
from sklearn.calibration import LabelEncoder
from sklearn.discriminant_analysis import StandardScaler
from sklearn.cluster import KMeans

## Preprocessing
Load the data and preprocess it. Since we are working with mixed-type data, we need to encode features that are not continuous numeric. Also, missing values need to be imputated.

In [2]:
# Relevant preprocessing functions

def clean(df: pd.DataFrame) -> pd.DataFrame:
    # Replace empty cells (or ?) with NA
    df = df.replace(r"^\?|\s+$", pd.NA, regex=True)

    # Columns with whitespace (or ?) values are infered as string type but could be numeric
    for col in df.select_dtypes(exclude=[np.number, np.bool_]).columns:
        # Exclude booleans
        if df[col].nunique() > 2:
            try:
                df[col] = pd.to_numeric(df[col])
            except ValueError:
                logging.debug("Could not convert column %s to numeric dtype", col)

    # Integer columns with missing values automatically get converted to float columns
    # We want to convert back to integer to allow for better column classification (num/cat)
    for col in df.select_dtypes(include=np.floating):
        if np.all(df[col].fillna(0) % 1 == 0):
            df[col] = df[col].astype("Int64")

    # Turn object columns into string columns (causes issues with stepmix)
    for col in df.select_dtypes(include=object).columns:
        df[col] = df[col].astype(str)

    return df


def classify_cols(
    df: pd.DataFrame,
    bool_cols: bool = False,
) -> Tuple:
    # All floating point columns are considered numeric
    num_cols: List[str] = list(df.select_dtypes(include=np.inexact).columns)
    cat_cols: List[str] = []
    bool_cols_: List[str] = list(df.select_dtypes(include=np.bool_).columns) if bool_cols else []

    remaining_cols = df.columns.difference(num_cols + bool_cols_)

    if bool_cols:
        for col in remaining_cols:
            if df[col].nunique() <= 2:
                bool_cols_.append(col)
                remaining_cols = remaining_cols.drop(col)

    # Integer columns with more than 50 unique values are considered numeric
    for col in df[remaining_cols].select_dtypes(include=np.integer).columns:
        if df[col].nunique() >= 50:
            num_cols.append(col)
            remaining_cols = remaining_cols.drop(col)

    # All other columns are considered categorical
    cat_cols = list(remaining_cols)

    logging.info("Num cols: %s", num_cols)
    logging.info("Cat cols: %s", cat_cols)
    logging.info("Bool cols: %s", bool_cols_)
    if bool_cols:
        return num_cols, cat_cols, bool_cols_
    return num_cols, cat_cols


def scale_cols(
    df: pd.DataFrame,
    num_cols: Optional[List[str]],
    cat_cols: Optional[List[str]],
    bool_cols: Optional[List[str]] = None,
) -> pd.DataFrame:
    df = df.copy(deep=True)
    if num_cols:
        df[num_cols] = StandardScaler().fit_transform(df[num_cols])
    if cat_cols:
        df[cat_cols] = df[cat_cols].apply(LabelEncoder().fit_transform)
    if bool_cols:
        df[bool_cols] = df[bool_cols].apply(LabelEncoder().fit_transform)
    return df


def imputate_na(
    df: pd.DataFrame,
    num_cols: Optional[List[str]],
    cat_cols: Optional[List[str]],
    bool_cols: Optional[List[str]] = None,
) -> pd.DataFrame:
    if num_cols:
        # imputate numeric features
        for col in num_cols:
            df[col] = df[col].fillna(round(df[col].median()))
    if cat_cols is None:
        cat_cols = []
    if bool_cols is None:
        bool_cols = []
    # add "missing" category for categorical features
    for col in cat_cols + bool_cols:
        df[col] = df[col].astype(str)
        df[col] = df[col].fillna("<NA>")
    return df


### Cleaning, Imputating, Scaling and Labelling
Now execute the functions from above to return a dataframe we can use for computation.

In [3]:
df = pd.read_csv("../dataset/tracks.csv")

df = clean(df)
num_cols, cat_cols, bool_cols = classify_cols(df, bool_cols=True)
df = imputate_na(df, num_cols, cat_cols, bool_cols)
df = scale_cols(df, num_cols, cat_cols, bool_cols)

print(df.head())


      id  id_artist  name_artist  full_title  title  featured_artists  \
0  10202          5           82       11135  10484               485   
1   8089          5           82        7444   6986              1579   
2   9973          5           82       11106  10455              1004   
3   4665          5           82        1415   1306              1555   
4   5981          5           82        5064   4646              1382   

   primary_artist  language  album  stats_pageviews  ...  album_type  \
0              82        24    240         4.382687  ...           0   
1              82         9    240         3.566031  ...           0   
2              82         9    240         0.821598  ...           0   
3              82        17    555         0.345227  ...           3   
4              82         9    240         0.229954  ...           0   

   disc_number  track_number  duration_ms  explicit  popularity  album_image  \
0            0            22     0.048662       

### Encoding
Now we use one-hot encoding to encode the categorical features. Boolean features only need encoding if they contain NaN or NA values (=missing values), otherwise they are left as is. To reduce the number of features, we only encode features that have more unique values than a threshhold. This will get rid of features with too many unique values, such as song lyrics. These features would add up to N new features to the dataset, which is not a good idea.

In [16]:
def one_hot_encode_feature(df: pd.DataFrame, feature_to_encode: str, weight: float = 1) -> pd.DataFrame:
    dummies = pd.get_dummies(df[feature_to_encode], dtype="int32", prefix=feature_to_encode)
    dummies *= weight
    result_df = pd.concat([df, dummies], axis=1)
    return result_df.drop(columns=feature_to_encode)


encoded_df = df.copy(deep=True)

for col in cat_cols + [col for col in bool_cols if encoded_df[col].nunique() > 2]:
    # Dont encode columns with too many unique values
    if encoded_df[col].nunique() > 25:
        logging.warning("Column %s has too many unique values, dropping", col)
        encoded_df = encoded_df.drop(columns=col)
    else:
        encoded_df = one_hot_encode_feature(encoded_df, col)

print(encoded_df.head())



   stats_pageviews  n_sentences  n_tokens  tokens_per_sent  char_per_tok  \
0         4.382687     1.730216  1.986653         0.046638      0.261922   
1         3.566031    -0.137659  0.854572         0.598627      0.510317   
2         0.821598     1.161733  1.252719        -0.009536      0.047712   
3         0.345227    -0.909173 -0.550936         0.292905     -0.067872   
4         0.229954    -0.462507 -0.325479         0.047721     -0.295177   

   lexical_density  avg_token_per_clause       bpm  centroid   rolloff  ...  \
0         0.786870              0.009550  0.793368  1.642220  2.259997  ...   
1         1.792820              0.310083  0.570975  2.354421  3.112774  ...   
2         0.535180              0.029394  0.732070  1.558628  0.743616  ...   
3         0.227492             -0.089032  1.798812 -1.066151 -0.813071  ...   
4        -0.358439              0.028674 -0.307387  0.181037  0.135803  ...   

   month_3  month_4  month_5  month_6  month_7  month_8  month_9  mo

## Clustering
We can now cluster the songs using K-means clustering. We use k-means++ to get a better initialization. The algorithm is run 100 times and inertia is used to determine the best result. Inertia is the sum of squared distances between each data point and its assigned centroid. Lower inertia means better clustering results.

In [21]:
labels = KMeans(n_clusters=3, init="k-means++", n_init=100).fit_predict(encoded_df)
labels


array([1, 1, 1, ..., 0, 0, 2], shape=(11166,), dtype=int32)