# Data Processing

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pyspark.sql.functions import year

In [None]:
# Create a new SparkSession
spark = SparkSession.builder \
        .appName("data_prep") \
        .getOrCreate()
spark

In [None]:
from pyspark.sql import types

In [None]:
schema = types.StructType([
    types.StructField('id', types.StringType(), True),
    types.StructField('name', types.StringType(), True),
    types.StructField('popularity', types.IntegerType(), True),
    types.StructField('duration_ms', types.IntegerType(), True),
    types.StructField('explicit', types.IntegerType(), True),
    types.StructField('artists', types.StringType(), True),
    types.StructField('id_artists',types.StringType(), True),
    types.StructField('release_date', types.DateType(), True),
    types.StructField('danceability', types.DoubleType(), True),
    types.StructField('energy', types.DoubleType(), True),
    types.StructField('key', types.IntegerType(), True),
    types.StructField('loudness', types.DoubleType(), True),
    types.StructField('mode', types.IntegerType(), True),
    types.StructField('speechiness', types.DoubleType(), True),
    types.StructField('acousticness', types.DoubleType(), True),
    types.StructField('instrumentalness', types.DoubleType(), True),
    types.StructField('liveness', types.DoubleType(), True),
    types.StructField('valence', types.DoubleType(), True),
    types.StructField('tempo', types.DoubleType(), True),
    types.StructField('time_signature', types.IntegerType(), True)
])

In [None]:
tracks_sparkdf=spark.read.option("header","true").schema(schema).csv("tracks.csv")

In [None]:
#extra pre-processing:
from pyspark.sql.functions import regexp_replace, trim
# remove the square brackets, single quotes and commas
# remove the unwanted characters
tracks_sparkdf = tracks_sparkdf.withColumn("artists_array", trim(regexp_replace(tracks_sparkdf["id_artists"], "[\[\]' ]", "")))

# trim any whitespace characters
#tracks_sparkdf = tracks_sparkdf.withColumn("artists_array", trim(tracks_sparkdf["artists_array"]))

In [None]:
#Casting iString to Array of Strings for the Spark DF
from pyspark.sql.functions import split

# assuming your csv file has been loaded into a dataframe called "df"
#overwrite the same column
tracks_sparkdf = tracks_sparkdf.withColumn("artists_array", split(tracks_sparkdf["artists_array"], ","))

In [None]:
tracks_sparkdf=tracks_sparkdf.withColumnRenamed("name","song_name")

tracks_sparkdf=tracks_sparkdf.withColumnRenamed("popularity","song_popularity")

# show the resulting dataframe
tracks_sparkdf.printSchema()

In [None]:
artist_schema=types.StructType([
    types.StructField('id', types.StringType(), True),
    types.StructField('followers', types.DoubleType(), True),
    types.StructField('genres', types.StringType(), True),
    types.StructField('name', types.StringType(), True),
    types.StructField('popularity', types.IntegerType(), True)
])

In [None]:
artists_sparkdf=spark.read.option("header","true").schema(artist_schema).csv("artists.csv")

In [None]:
#rename "name" as "artist_name"

artists_sparkdf=artists_sparkdf.withColumnRenamed("name","artist_name")

artists_sparkdf=artists_sparkdf.withColumnRenamed("popularity","artist_popularity")

In [None]:
# joining the two data sets:
from pyspark.sql.functions import explode, col, struct


# Step 1: Explode the id_artists column in the tracks dataframe
exploded_tracks_df = tracks_sparkdf.selectExpr("*", "explode(artists_array) as artist_id")

In [None]:
# Step 2: Rename the id column in the artists dataframe
artists_df = artists_sparkdf.withColumnRenamed("id", "artist_id")

In [None]:
# Step 3: Join the exploded tracks dataframe with the artists dataframe
joined_df = exploded_tracks_df.join(artists_df, "artist_id")

In [None]:
#final step of data prep is claening and exploding the genres column
#extra pre-processing:
from pyspark.sql.functions import regexp_replace, trim
# remove the square brackets, single quotes and commas
# remove the unwanted characters
joined_df = joined_df.withColumn("genres_array", trim(regexp_replace(joined_df["genres"], "[\[\]' ]", "")))
#Casting iString to Array of Strings for the Spark DF
from pyspark.sql.functions import split

# assuming your csv file has been loaded into a dataframe called "df"
#overwrite the same column
joined_df = joined_df.withColumn("genres_array", split(joined_df["genres_array"], ","))
#explode the column
final_df = joined_df.selectExpr("*", "explode(genres_array) as genre_list")

In [None]:
#lets modify the new df to fit our needs..we dont need so many rows..we just need all genre info in 1 list
from pyspark.sql.functions import collect_list

# Assume the DataFrame is called `song_data` and the ID column is called `song_id`
grouped_data = final_df.groupBy('id', 'song_name', 'song_popularity', 'duration_ms', 'explicit','release_date', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness',
                                'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature', 'artists') \
                       .agg(collect_list('genre_list').alias('genres'))


In [None]:
grouped_data.printSchema()

Further Data Processing

In [None]:
grouped_data.columns

In [None]:
from pyspark.sql.functions import col, count, isnan, when

# Check the number of rows and columns in the DataFrame
print("Number of rows: ", grouped_data.count())
print("Number of columns: ", len(grouped_data.columns))

In [None]:
# Check for null values in each column
grouped_data.select([count(when(col(c).isNull(), c)).alias(c) for c in grouped_data.columns]).show()

grouped_data = grouped_data.withColumn('year', year(grouped_data['release_date']))

grouped_data.printSchema()


# Feature Analysis and Selection

In [None]:
#EDA TO get feature
pandas_df=grouped_data.toPandas()

pandas_df.columns

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# Select the audio features from the DataFrame
audio_features = ['song_popularity','explicit', 'danceability', 'energy', 'key', 'loudness', 'mode',
                  'speechiness', 'acousticness',
                  'instrumentalness', 'liveness',
                  'valence','tempo','year']

audio_df = pandas_df[audio_features]



#Pearson's Correlation methord
corr = audio_df.corr(method='pearson')
# Plot heatmap
fig, ax = plt.subplots(figsize=(10,10))
sns.heatmap(corr,xticklabels=audio_df.columns,yticklabels=audio_df.columns,annot=True,ax=ax)

# ML PART

## Fuzzy C Means

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.utils import shuffle
from sklearn.metrics import pairwise_distances
from sklearn_extra.cluster import KMedoids
from skfuzzy.cluster import cmeans
from numpy.lib.function_base import kaiser
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline
import plotly.express as px
from sklearn.mixture import GaussianMixture
import pickle
import time

### Finding the most optimal number of cluster

In [None]:
scaler = MinMaxScaler()
#audio_df = audio_df.values
data = scaler.fit_transform(audio_df)
k_range = range(10, 30)
fcm_results = []
for k in k_range:
    print("On ",k)
    cntr, u, u0, d, jm, p, fpc = cmeans(data.T, k, 2, error=0.005, maxiter=1000, init=None)
    fcm_results.append({
        'k': k,
        'cntr': cntr,
        'u': u,
        'u0': u0,
        'd': d,
        'jm': jm,
        'p': p,
        'fpc': fpc
    })
fpc_values = [result['fpc'] for result in fcm_results]
fig, ax = plt.subplots()
ax.plot(k_range, fpc_values)
ax.set_xlabel('Number of Clusters (k)')
ax.set_ylabel('Fuzzy Partition Coefficient (FPC)')
plt.show()
# Find the optimal number of clusters
diffs = np.diff([result['fpc'] for result in fcm_results])
diffs2 = np.diff(diffs)
k_opt = k_range[np.argmin(diffs2)+1]
print("Optimal number of clusters:", k_opt)

In [None]:
scaled_df = (audio_df - audio_df.mean()) /audio_df.std()
# Calculate WCSS for different values of k
wcss = []
for k in range(10, 30):
    print("On ", k)
    kmeans = KMeans(n_clusters=k, n_init=10,random_state=42)
    kmeans.fit(scaled_df)
    wcss.append(kmeans.inertia_)

# Plot the WCSS values against k
plt.plot(range(10, 30), wcss)
plt.xlabel('Number of clusters (k)')
plt.ylabel('WCSS')
plt.title('Elbow Method')
plt.show()

In [None]:
scaled_df = StandardScaler().fit_transform(pandas_df.select_dtypes(np.number))
n_clusters = range(25,35)
gmm_bics = []
for k in n_clusters:
    gmm = GaussianMixture(n_components=k)
    print(f"On {k}")
    gmm.fit(scaled_df)
    gmm_bics.append(gmm.bic(scaled_df))

plt.plot(n_clusters, gmm_bics)
plt.xlabel('Number of clusters (k)')
plt.ylabel('BIC score')
plt.title('Bayesian Information Criterion (BIC)')
plt.show()

In [None]:
optimal_n_clusters

# Creating models using the optimal number of clusters

## Fuzzy C-Means

In [None]:
start_time = time.time()
pandas_df_fuzzy = pandas_df
X = pandas_df_fuzzy.select_dtypes(np.number)
cntr, u, u0, d, jm, p, fpc = cmeans(X.T, 21, 2, error=0.005, maxiter=1000, init=None)
fcm_cluster_labels = np.argmax(u, axis=0)
pandas_df_fuzzy['cluster_label'] = fcm_cluster_labels
end_time = time.time()
durationF = end_time - start_time
np.save('fcm_model_cntr.npy', cntr)


## K-Means

In [None]:
start_time = time.time()
k_pipeline = Pipeline([('scaler', StandardScaler()),
                                  ('kmeans', KMeans(n_clusters=20, verbose=2))])
pandas_df_k = pandas_df
X1 = pandas_df_k.select_dtypes(np.number)
number_cols = list(X1.columns)
k_labels = k_pipeline.fit_predict(X1)
pandas_df_k['cluster_label'] = k_labels
end_time = time.time()
durationK = end_time - start_time

## Gaussian Mixture Model

In [None]:
start_time = time.time()
gmm_pipeline = Pipeline([('scaler', StandardScaler()),
                             ('gmm', GaussianMixture(n_components=29))])

pandas_df_gmm = pandas_df
gmm_pipeline.fit(pandas_df_gmm.select_dtypes(np.number))
labels = gmm_pipeline.predict(pandas_df_gmm.select_dtypes(np.number))
pandas_df_gmm['cluster_label'] = labels
end_time = time.time()
durationG = end_time - start_time

## Visualizing clusters

In [None]:
def VisualizeClustersFCM(pandas_df_fuzzy):
    pca_pipeline = Pipeline([('scaler', StandardScaler()), ('PCA', PCA(n_components=2))])
    song_embedding = pca_pipeline.fit_transform(X)
    projection = pd.DataFrame(columns=['x', 'y'], data=song_embedding)
    projection['title'] = pandas_df['song_name']
    projection['cluster'] = pandas_df_fuzzy['cluster_label']
    fig = px.scatter(projection, x='x', y='y', color='cluster', hover_data=['x', 'y', 'title'])
    fig.show()

In [None]:
def VisualizeClustersK(pandas_df_k):
    pca_pipeline = Pipeline([('scaler', StandardScaler()), ('PCA', PCA(n_components=2))])
    song_embedding = pca_pipeline.fit_transform(X)
    projection = pd.DataFrame(columns=['x', 'y'], data=song_embedding)
    projection['title'] = pandas_df['song_name']
    projection['cluster'] = pandas_df_k['cluster_label']
    fig = px.scatter(projection, x='x', y='y', color='cluster', hover_data=['x', 'y', 'title'])
    fig.show()

In [None]:
def VisualizeClustersGMM(pandas_df_gmm):
    pca_pipeline = Pipeline([('scaler', StandardScaler()), ('PCA', PCA(n_components=2))])
    song_embedding = pca_pipeline.fit_transform(X)
    projection = pd.DataFrame(columns=['x', 'y'], data=song_embedding)
    projection['title'] = pandas_df['song_name']
    projection['cluster'] = pandas_df_gmm['cluster_label']
    fig = px.scatter(projection, x='x', y='y', color='cluster', hover_data=['x', 'y', 'title'])
    fig.show()

## Fuzzy C-Means

In [None]:
VisualizeClustersFCM(pandas_df_fuzzy)

## K-Means

In [None]:
VisualizeClustersK(pandas_df_k)

## Gaussian Mixture Model

In [None]:
VisualizeClustersGMM(pandas_df_gmm)

# Song Recommendations

In [None]:
#first step lets use spotify api:
import spotipy
import os
from spotipy.oauth2 import SpotifyClientCredentials
from collections import defaultdict

cid = 'beb46a274d9841269ee7e457607c09e7'
secret = '17caa1a20c5e470b9e0757e539660e5d'
client_credentials_manager = SpotifyClientCredentials(client_id=cid, client_secret=secret)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)


def get_song_realtime(song_name,year):

    song_data = defaultdict()
    results = sp.search(q= 'track: {} year: {}'.format(song_name,
                                                       year), limit=1)
    if results['tracks']['items'] == []:
        return None

    results = results['tracks']['items'][0]

    track_id = results['id']
    audio_features = sp.audio_features(track_id)[0]

    song_data['song_name'] = [song_name]
    song_data['year'] = [year]
    song_data['explicit'] = [int(results['explicit'])]
    song_data['duration_ms'] = [results['duration_ms']]
    song_data['song_popularity'] = [results['popularity']]

    for key, value in audio_features.items():
        song_data[key] = value

    return pd.DataFrame(song_data)

from collections import defaultdict
from scipy.spatial.distance import cdist
import difflib

number_cols = ['song_popularity',
 'duration_ms',
 'explicit',
 'danceability',
 'energy',
 'key',
 'loudness',
 'mode',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'time_signature','year']

def song_information(song, pandas_df):
    try:
        song_data = pandas_df[(pandas_df['song_name'] == song['song_name'])
                                & (pandas_df['year'] == song['year'])].iloc[0]
        return song_data

    except IndexError:
        return get_song_realtime(song['song_name'], song['year'])


def find_mean(song_list, pandas_df):

    song_vectors = []

    for song in song_list:
        song_data = song_information(song, pandas_df)
        if song_data is None:
            print('Warning: {} does not exist in Spotify or in database'.format(song['name']))
            continue
        song_vector = song_data[number_cols].values
        song_vectors.append(song_vector)

    song_matrix = np.array(list(song_vectors))
    return np.mean(song_matrix, axis=0)

def dictionary_list(dict_list):

    flattened_dict = defaultdict()
    for key in dict_list[0].keys():
        flattened_dict[key] = []

    for dictionary in dict_list:
        for key, value in dictionary.items():
            flattened_dict[key].append(value)

    return flattened_dict

def recommend_songs_FuzzyC(song_list, pandas_df, n_songs=10):

    #pandas_df = pandas_df[pandas_df['year'] > 2000]
    metadata_cols = ['song_name', 'artists','cluster_label', 'song_popularity',
 'danceability',
 'acousticness',
 'instrumentalness',
 'valence',
 'tempo']
    song_dict = dictionary_list(song_list)

    song_center = find_mean(song_list, pandas_df)
    scaler = MinMaxScaler()
    #pandas_df = pandas_df[pandas_df['year'] >= 2010]
    scaled_data = scaler.fit_transform(pandas_df[number_cols])
    scaled_song_center = scaler.transform(song_center.reshape(1, -1))
    distances = cdist(scaled_song_center, scaled_data, 'cosine')
    index = list(np.argsort(distances)[:, :n_songs][0])

    rec_songs = pandas_df.iloc[index]
    rec_songs = rec_songs[~rec_songs['song_name'].isin(song_dict['song_name'])]


    return rec_songs[metadata_cols].to_dict(orient='records')

def recommend_songs_Kmeans(song_list, pandas_df, n_songs=10):

    metadata_cols = ['song_name', 'artists','cluster_label', 'song_popularity',
 'danceability',
 'acousticness',
 'instrumentalness',
 'valence',
 'tempo']
    song_dict = dictionary_list(song_list)

    song_center = find_mean(song_list, pandas_df)
    scaler = k_pipeline.steps[0][1]
    scaled_data = scaler.fit_transform(pandas_df[number_cols])
    scaled_song_center = scaler.transform(song_center.reshape(1, -1))
    distances = cdist(scaled_song_center, scaled_data, 'cosine')

    index = list(np.argsort(distances)[:, :n_songs][0])

    rec_songs = pandas_df.iloc[index]
    rec_songs = rec_songs[~rec_songs['song_name'].isin(song_dict['song_name'])]
    return rec_songs[metadata_cols].to_dict(orient='records')

def recommend_songs_GMM(song_list, pandas_df, n_songs=10):

    metadata_cols = ['song_name', 'artists','cluster_label', 'song_popularity',
 'danceability',
 'acousticness',
 'instrumentalness',
 'valence',
 'tempo']
    song_dict = dictionary_list(song_list)

    song_center = find_mean(song_list, pandas_df)
    scaler = gmm_pipeline.steps[0][1]
    scaled_data = scaler.fit_transform(pandas_df[number_cols])
    scaled_song_center = scaler.transform(song_center.reshape(1, -1))
    distances = cdist(scaled_song_center, scaled_data, 'cosine')
    index = list(np.argsort(distances)[:, :n_songs][0])

    rec_songs = pandas_df.iloc[index]
    rec_songs = rec_songs[~rec_songs['song_name'].isin(song_dict['song_name'])]
    return rec_songs[metadata_cols].to_dict(orient='records')

visF=recommend_songs_FuzzyC([{'song_name': 'All of me', 'year': 2013}],pandas_df_fuzzy)
visK=recommend_songs_Kmeans([{'song_name': 'All of me', 'year': 2013}],pandas_df_k)
visG=recommend_songs_GMM([{'song_name': 'All of me', 'year': 2013}],pandas_df_gmm)

# Visualizing Results

In [None]:
def dataVis(vis):
    top10_genres = pd.DataFrame(vis)
    top10_genres['danceability'] = top10_genres['danceability']*100
    top10_genres['acousticness'] = top10_genres['acousticness']*100
    top10_genres['valence'] = top10_genres['valence']*100
    top10_genres['instrumentalness'] = top10_genres['instrumentalness']*100


    fig = px.bar(top10_genres, x='song_name', y=['song_popularity','tempo','danceability','acousticness','valence','instrumentalness'], barmode='group')
    fig.show()

### Fuzzy C-Means

In [None]:
dataVis(visF)


### K-Means

In [None]:
dataVis(visK)

### Gaussian Mixture Model

In [None]:
dataVis(visG)

### Comparing Models

In [None]:
model_names = ['Fuzzy C Means','KMeans', 'GMM']
time_durations = [durationF, durationK, durationG]

# create bar chart
plt.bar(model_names, time_durations)
plt.xlabel('Models')
plt.ylabel('Time duration (seconds)')
plt.title('Model time duration comparison')
plt.show()