In [1]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.mixture import GaussianMixture

import boto3

  from tqdm.autonotebook import tqdm, trange


# public data: https://www.kaggle.com/datasets/maharshipandya/-spotify-tracks-dataset
# personal data: spotify API

In [2]:
def read_full_table(table_name):
    session = boto3.Session(profile_name="default")
    dynamodb = session.resource("dynamodb", region_name="eu-west-1")
    table = dynamodb.Table(table_name)
    response = table.scan()
    data = response["Items"]

    # Handle pagination
    while "LastEvaluatedKey" in response:
        response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        data.extend(response["Items"])
        
    return pd.DataFrame(data)

In [3]:
numerical_cols = ["num_sections", "danceability", "sections_avg_duration", "instrumentalness", "liveness", "loudness",
                  "duration", "speechiness", "valence", "dynamics_changes", "tempo_changes", "acousticness",
                  "time_signature_changes", "popularity", "mode_changes", "energy", "key_changes", "tempo"]
numerical_cols_public = ["danceability", "instrumentalness", "liveness", "loudness", "duration", "speechiness", 
                         "valence", "acousticness", "popularity", "energy", "tempo"]

non_standarized_cols = ["num_sections", "sections_avg_duration", "loudness", "duration", "dynamics_changes", 
                         "tempo_changes", "time_signature_changes", "popularity", "mode_changes", "key_changes", "tempo"]
categorical_cols = ["key", "mode"]

notes = ("C", "C#", "D", "Eb", "E", "F", "F#", "G", "Ab", "A", "Bb", "B")
key_mapping = {i:note for i, note in enumerate(notes)}
key_mapping[-1] = "NoKey"

mode_mapping = {0: "Minor", 1: "Major"}

random_state = 602452

# Preprocess

In [4]:
top_artists_raw = read_full_table("top_artists")
recently_played_raw = read_full_table("recently_played")
track_info_raw = read_full_table("track_info")
public_data_raw = pd.read_csv(r"C:\Users\jcf\Desktop\codigo\Portfolio\Spotify Analysis\public_music_data.csv")

EndpointConnectionError: Could not connect to the endpoint URL: "https://dynamodb.eu-west-1.amazonaws.com/"

In [None]:
top_artists = top_artists_raw.copy()
recently_played = recently_played_raw.copy()
track_info = track_info_raw.copy()
public_data = public_data_raw.copy()

In [None]:
track_info["key"] = track_info["key"].map(key_mapping)
track_info["mode"] = track_info["mode"].map(mode_mapping)

public_data["key"] = public_data["key"].map(key_mapping)
public_data["mode"] = public_data["mode"].map(mode_mapping)

In [None]:
for col in numerical_cols:
    track_info[col] = pd.to_numeric(track_info[col])
    
for col in track_info.columns:
    if "changes" in col:
        track_info[col] = track_info[col] / track_info["duration"]

track_info["case"] = "user"

public_data["duration"] = public_data["duration_ms"] / 1000
public_data["artists"] = public_data["artists"].apply(lambda x: str(x).split(";")[0])
public_data["case"] = "public"

genre_info = track_info.explode("genres")
genre_info = genre_info.loc[:, ["genres", "track_id", "artist"]]
genre_info = genre_info.groupby(["genres"]).nunique().reset_index()
genre_info["track_perc"] = 100 * genre_info["track_id"] / genre_info["track_id"].sum()
genre_info["artist_perc"] = 100 * genre_info["artist"] / genre_info["artist"].sum()

genre_info_public = public_data.loc[:, ["track_genre", "track_id", "artists"]]
genre_info_public = genre_info_public.groupby(["track_genre"]).nunique().reset_index()
genre_info_public["track_perc"] = 100 * genre_info_public["track_id"] / genre_info_public["track_id"].sum()
genre_info_public["artist_perc"] = 100 * genre_info_public["artists"] / genre_info_public["artists"].sum()

In [None]:
# genre_info = track_info.genres


In [None]:
# public_data

# do analysis (numerical and plots)
'track_name', 'album', 'artist', 'track_id', 'raw_sections'

'num_sections', 'danceability',  'sections_avg_duration',
'instrumentalness', 'liveness', 'loudness', 'duration', 'speechiness',
'valence', 'dynamics_changes', 'mode', 'tempo_changes',
'acousticness', 'time_signature_changes',
'popularity', 'mode_changes', 'energy', 'key_changes', 'key',
'tempo', 'explicit'

# show top artists and recenelty played in table

# plots: 
    X - histograma features canciones -> here/tableau
    X - histograma generos -> here
    X - correlaciones entre features -> here
    X - clustering usuario (y public?) -> here
    - predictor de genero con features (train con public, test con usuario. comparar con clusters de antes)
    X - comparativas usuario y public -> here
    
    -try to replicate them in tableau


# model:
# try to group more the genres (manually? semantic comparison?) -> to reduce the total 
# clustering kmeans usuario (standarize first)
# modelo predictor genero (?)

In [None]:
for row in track_info.genres.sample(5):
    print(row)

# Clustering

## EDA

In [None]:
for element in ("track", "artist"):
    genre_info_plot = genre_info.sort_values(by=f"{element}_perc", ascending=False)
    genre_info_plot[f"{element}_perc_accum"] = genre_info_plot[f"{element}_perc"].cumsum()
    
    total_elements = genre_info_plot["genres"].nunique()
    top_elements = genre_info_plot.loc[genre_info_plot[f"{element}_perc_accum"] <= 90, "genres"].nunique()
    
    print(f"{total_elements=}, {top_elements=}")

    fig = make_subplots(specs=[[{"secondary_y": True}]])

    fig.add_trace(
        go.Scatter(x=genre_info_plot["genres"], y=genre_info_plot[f"{element}_perc_accum"], name="% Accum"),
        secondary_y=True,
    )
    fig.add_trace(
        go.Bar(x=genre_info_plot["genres"], y=genre_info_plot[f"{element}_perc"], name="%"),
        secondary_y=False,
    )
    fig.update_layout(
        title_text=f"User {element.capitalize()} Genres"
    )

    fig.show()

In [None]:
# px.bar(genre_info_public.sort_values(by="artists", ascending=False), x="track_genre", y="artists")
for element in ("track", "artist"):
    genre_info_plot = genre_info_public.sort_values(by=f"{element}_perc", ascending=False)
    genre_info_plot[f"{element}_perc_accum"] = genre_info_plot[f"{element}_perc"].cumsum()
    
    total_elements = genre_info_plot["track_genre"].nunique()
    top_elements = genre_info_plot.loc[genre_info_plot[f"{element}_perc_accum"] <= 90, "track_genre"].nunique()
    
    print(f"{total_elements=}, {top_elements=}")

    fig = make_subplots(specs=[[{"secondary_y": True}]])

    fig.add_trace(
        go.Scatter(x=genre_info_plot["track_genre"], y=genre_info_plot[f"{element}_perc_accum"], name="% Accum"),
        secondary_y=True,
    )
    fig.add_trace(
        go.Bar(x=genre_info_plot["track_genre"], y=genre_info_plot[f"{element}_perc"], name="%"),
        secondary_y=False,
    )
    fig.update_layout(
        title_text=f"Public {element.capitalize()} Genres"
    )

    fig.show()

In [None]:
public_genres = genre_info_public["track_genre"].unique()
user_genres = genre_info["genres"].unique()

user_genres_not_public = set(user_genres).difference(set(public_genres))
pulic_genres_not_user = set(public_genres).difference(set(user_genres))

print(f"{user_genres_not_public=}\n({len(user_genres_not_public)}, {100 * len(user_genres_not_public) / len(user_genres)}%)")
print(f"{pulic_genres_not_user=}\n({len(pulic_genres_not_user)}, {100 * len(pulic_genres_not_user) / len(public_genres)})%")

## By genre text

In [None]:
genre_info_sorted = genre_info_public.sort_values(by=f"track_perc", ascending=False)
genre_info_sorted[f"track_perc_accum"] = genre_info_sorted[f"track_perc"].cumsum()

limit = 100

genre_info_sorted = genre_info_sorted.loc[genre_info_sorted[f"track_perc_accum"] <= limit, "track_genre"].unique()
genres =  genre_info_sorted.tolist()

model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

embeddings = model.encode(genres)

param_grid = {
    "n_clusters": range(3, 15)
}
grid_search = GridSearchCV(
    KMeans(random_state=random_state), param_grid=param_grid
)
grid_search.fit(embeddings)

df = pd.DataFrame(grid_search.cv_results_)
num_clusters = df.loc[df["rank_test_score"] == 1, "param_n_clusters"].values[0]

print(f"Recommended number of clusters is {num_clusters}")

In [None]:
kmeans = KMeans(n_clusters=num_clusters, random_state=random_state)
kmeans.fit(embeddings)
labels = kmeans.labels_

# Print the genres grouped by cluster
clusters = {}
for genre, label in zip(genres, labels):
    if label not in clusters:
        clusters[label] = []
    clusters[label].append(genre)

clusters = dict(sorted(clusters.items()))

for cluster_id, genre_list in clusters.items():
    print(f"Cluster {cluster_id}: {', '.join(genre_list)}\n")

pca = PCA(n_components=2)
reduced_embeddings = pca.fit_transform(embeddings)
plt.scatter(reduced_embeddings[:, 0], reduced_embeddings[:, 1], c=labels, cmap='tab20')
plt.xlabel('PCA Component 1')
plt.ylabel('PCA Component 2')
plt.title('Genre Clusters Visualization')
plt.show()

In [None]:
def assign_cluster(x):
    clusters_row = []
    for k, v in clusters.items():
        if x in v:
            clusters_row.append(k)
            
    return clusters_row


public_data["cluster_NLP"] = public_data["track_genre"].apply(assign_cluster)
df_exploded = public_data.explode("cluster_NLP")

for i in range(0, num_clusters):
    _df = df_exploded.loc[df_exploded["cluster_NLP"] == i, ["track_name", "artists"]]
    elements = min(5, _df.shape[0])
    print(f"Cluster {i}")
    print(_df.sample(elements).values)

## By features

In [None]:
correlations = public_data[numerical_cols_public].corr().abs()

fig = px.imshow(correlations, text_auto=True)
fig.update_layout(width=1000, height=1200, autosize=False)

fig.show()

In [None]:
track_info_new_features = public_data.copy()

useless_cols = ["loudness"] 
numerical_cols_extra = list(set(numerical_cols_public).difference(useless_cols))

track_info_new_features = track_info_new_features[track_info_new_features.columns.difference(useless_cols)]

for pair in (("valence", "danceability"), ("energy", "acousticness")):
    track_info_new_features[f"{pair[0]} - {pair[1]}"] = track_info_new_features[pair[0]] * track_info_new_features[pair[1]]
    numerical_cols_extra.append(f"{pair[0]} - {pair[1]}")

correlations = track_info_new_features[numerical_cols_extra].corr().abs()

fig = px.imshow(correlations, text_auto=True)
fig.update_layout(width=1000, height=1200, autosize=False)

fig.show()

In [None]:
# track_info.loc[:, ["mode_changes", "key_changes"]].plot(kind="scatter", y="mode_changes", x="key_changes")

In [None]:
useless_cols = ["loudness", "energy", "acousticness", "valence", "danceability"] 
# valence", "danceability"), ("energy", "acousticness"
numerical_cols_extra = list(set(numerical_cols_public).difference(useless_cols))
numerical_cols_extra = numerical_cols_extra + ["valence - danceability", "energy - acousticness"]

track_info_new_features = track_info_new_features[track_info_new_features.columns.difference(useless_cols)]

correlations = track_info_new_features[numerical_cols_extra].corr().abs()

fig = px.imshow(correlations, text_auto=True)
fig.update_layout(width=1000, height=1000, autosize=False)

fig.show()

In [None]:
# track_info_alt = track_info.copy()

# useless_cols = ["num_sections", "loudness", "speechiness", "acousticness", "danceability"] #speechiness, acousticness, danceability
# _numerical_cols = list(set(numerical_cols).difference(useless_cols))

# track_info_alt = track_info_alt[track_info_alt.columns.difference(useless_cols)]
# track_info_alt["comb"]

# # for pair in (("valence", "danceability"), ("key_changes", "mode_changes")):
# #     track_info_alt[f"{pair[0]} - {pair[1]}"] = track_info_alt[pair[0]] * track_info_alt[pair[1]]
# #     _numerical_cols.append(f"{pair[0]} - {pair[1]}")

# correlations = track_info_alt[_numerical_cols].corr().abs()

# fig = px.imshow(correlations, text_auto=True)
# fig.update_layout(width=1300, height=1200, autosize=False)

# fig.show()

In [None]:
numerical_cols_extra

In [None]:
# track_info_new_features = public_data.copy()
# # useless_cols = ["num_sections", "loudness", "energy", "acousticness", "valence", "danceability", "speechiness"] 
# combinations_cols = [("valence", "danceability"), ("energy", "acousticness"), ("energy", "speechiness")]
# numerical_cols_extra = list(set(numerical_cols).difference(useless_cols))

# for pair in combinations_cols:
#     track_info_new_features[f"{pair[0]} - {pair[1]}"] = track_info_new_features[pair[0]] * track_info_new_features[pair[1]]
#     numerical_cols_extra.append(f"{pair[0]} - {pair[1]}")
    
# numerical_cols_extra = numerical_cols_extra + categorical_cols + [f"{pair[0]} - {pair[1]}" for pair in combinations_cols]

X = track_info_new_features[numerical_cols_extra+categorical_cols]
X = pd.get_dummies(X, columns=categorical_cols)

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# try pca, delete features with corr


In [None]:
from collections import Counter

Counter([1, 2, 3, 3, 3])

In [None]:
clusters = {}
for genre, label in zip(genres, labels):
    if label not in clusters:
        clusters[label] = []
    # if genre not in clusters[label]:
    clusters[label].append(genre)

for k, v in clusters.items():
    count = Counter(v)
    total = len(v)
    break

res = []
for e in count.most_common(50):
    res.append((e[0], 100 * e[1] / total))

print(total)
res

In [None]:
kmeans = KMeans(n_clusters=20, random_state=random_state)
kmeans.fit(X)
labels = kmeans.labels_

genres = track_info_new_features["track_genre"]

# Print the genres grouped by cluster
clusters = {}
for genre, label in zip(genres, labels):
    if label not in clusters:
        clusters[label] = []
    if genre not in clusters[label]:
        clusters[label].append(genre)

clusters = dict(sorted(clusters.items()))

for cluster_id, genre_list in clusters.items():
    print(f"Cluster {cluster_id}: {', '.join(genre_list)}\n")

# pca = PCA(n_components=2)
# reduced_embeddings = pca.fit_transform(embeddings)
# plt.scatter(reduced_embeddings[:, 0], reduced_embeddings[:, 1], c=labels, cmap='tab20')
# plt.xlabel('PCA Component 1')
# plt.ylabel('PCA Component 2')
# plt.title('Genre Clusters Visualization')
# plt.show()

In [None]:
def gmm_bic_score(estimator, X):
    """Callable to pass to GridSearchCV that will use the BIC score."""
    # Make it negative since GridSearchCV expects a score to maximize
    return -estimator.bic(X)


param_grid = {
    "n_components": range(3, 20),
    "covariance_type": ["spherical", "tied", "diag", "full"],
}
grid_search = GridSearchCV(
    GaussianMixture(random_state=random_state), param_grid=param_grid, scoring=gmm_bic_score, 
)
grid_search.fit(X_scaled)

df = pd.DataFrame(grid_search.cv_results_)[
    ["param_n_components", "param_covariance_type", "mean_test_score"]
]
df["mean_test_score"] = -df["mean_test_score"]
df = df.rename(
    columns={
        "param_n_components": "Number of components",
        "param_covariance_type": "Type of covariance",
        "mean_test_score": "BIC score",
    }
)
df.sort_values(by="BIC score").head(10)

In [None]:
lower_limit = 0.5
gmm_components = 19

model = GaussianMixture(covariance_type="full", n_components=gmm_components, random_state=random_state)
model.fit(X_scaled)
results = model.predict(X_scaled)
probabilities = model.predict_proba(X_scaled)

track_clustered = public_data.copy()
track_clustered["cluster_single"] = results
track_clustered["clusters"] = probabilities.tolist()
track_clustered["clusters"] = track_clustered["clusters"].apply(lambda x: [i for i, e in enumerate(x) if e > lower_limit])

track_clustered.head(5)

In [None]:
genre_info_clustered = track_clustered.copy()
# genre_info_clustered["genres"] = genre_info_clustered["genres"].fillna("No Genre")

cluster_items = genre_info_clustered[["cluster_single", "track_genre"]].groupby("cluster_single").agg(list).reset_index()
cluster_items["track_genre"] = cluster_items["track_genre"].apply(lambda x: list(set(x)))

for _, row in cluster_items.iterrows():
    print(f"Cluster {row['cluster_single']}: {', '.join(row['track_genre'])}\n")

In [None]:
for i in range(0, gmm_components):
    _df = track_clustered.loc[track_clustered["cluster_single"] == i, ["track_name", "artists"]]
    elements = min(5, _df.shape[0])
    print(f"Cluster {i}")
    print(_df.sample(elements).values)

In [None]:
genre_info_clustered = track_clustered.copy()
# genre_info_clustered["genres"] = genre_info_clustered["genres"].fillna("No Genre")
genre_info_clustered = genre_info_clustered.explode("clusters")

cluster_items = genre_info_clustered[["clusters", "track_genre"]].groupby("clusters").agg(list).reset_index()
cluster_items["track_genre"] = cluster_items["track_genre"].apply(lambda x: list(set(x)))

for _, row in cluster_items.iterrows():
    print(f"Cluster {row['clusters']}: {', '.join(row['track_genre'])}\n")

In [None]:
df_exploded = track_clustered.explode("clusters")

for i in range(0, gmm_components):
    _df = df_exploded.loc[df_exploded["clusters"] == i, ["track_name", "artist", "track_url"]]
    elements = min(5, _df.shape[0])
    print(f"Cluster {i}")
    print(_df.sample(elements).values)

# Save

In [None]:
import chart_studio


chart_studio.tools.set_credentials_file(username='jcf94', api_key='Ma8Fg56h3ANUjoPRemcK')

In [None]:
import chart_studio.plotly as py

fig = px.imshow(correlations, text_auto=True)
fig.update_layout(width=1300, height=1200, autosize=False)
py.plot(fig, filename = 'test-plot', auto_open=True)

In [None]:
# save results to csv
# save html with numerical and plotly plots 

top_artists.to_csv("top_artists.csv")
recently_played.to_csv("recently_played.csv")