In [None]:
import pyspark.pandas as ps

In [None]:
path = 'dbfs:/FileStore/dados_transformados/data.parquet'
df_data = ps.read_parquet(path)

In [None]:
df_data.info()

In [None]:
df_data = df_data.dropna()

In [None]:
df_data.info()

In [None]:
df_data['artists_song'] = df_data.artists + ' - ' + df_data.name

In [None]:
df_data.head(5)

In [None]:
df_data.info()

In [None]:
X = df_data.columns.to_list()
X.remove('artists')
X.remove('id')
X.remove('name')
X.remove('artists_song')
X.remove('release_date')
X

In [None]:
df_data = df_data.to_spark()

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
dados_encoded_vector = VectorAssembler(inputCols=X, outputCol='features').transform(df_data)

In [None]:
dados_encoded_vector.select('features').show(truncate=False, n=5)

In [None]:
from pyspark.ml.feature import StandardScaler

In [None]:
scaler = StandardScaler(inputCol='features', outputCol='features_scaled')
model_scaler = scaler.fit(dados_encoded_vector)
dados_musicas_scaler = model_scaler.transform(dados_encoded_vector)

In [None]:
dados_musicas_scaler.select('features_scaled').show(truncate=False, n=5)

In [None]:
k = len(X)
k

In [None]:
from pyspark.ml.feature import PCA

In [None]:
pca = PCA(k=k, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
dados_musicas_pca = model_pca.transform(dados_musicas_scaler)

In [None]:
model_pca.explainedVariance

In [None]:
sum(model_pca.explainedVariance) * 100

In [None]:
lista_valores = [sum(model_pca.explainedVariance[0:i+1]) for i in range(k)]
lista_valores

In [None]:
import numpy as np

In [None]:
k = sum(np.array(lista_valores) <= 0.7)
k

In [None]:
pca = PCA(k=k, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
dados_musicas_pca_final = model_pca.transform(dados_musicas_scaler)

In [None]:
dados_musicas_pca_final.select('pca_features').show(truncate=False, n=5)

In [None]:
from pyspark.ml import Pipeline

In [None]:
pca_pipeline = Pipeline(stages=[VectorAssembler(inputCols=X, outputCol='features'),
                                StandardScaler(inputCol='features', outputCol='features_scaled'),
                                PCA(k=6, inputCol='features_scaled', outputCol='pca_features')])

In [None]:
model_pca_pipeline = pca_pipeline.fit(df_data)

In [None]:
projection = model_pca_pipeline.transform(df_data)

In [None]:
projection.select('pca_features').show(truncate=False, n=5)

In [None]:
from pyspark.ml.clustering import KMeans

In [None]:
SEED = 1224

In [None]:
kmeans = KMeans(k=50, featuresCol='pca_features', predictionCol='cluster_pca', seed=SEED)

In [None]:
modelo_kmeans = kmeans.fit(projection)

In [None]:
projetion_kmeans = modelo_kmeans.transform(projection) 

In [None]:
projetion_kmeans.select(['pca_features','cluster_pca']).show()

In [None]:
from pyspark.ml.functions import vector_to_array

In [None]:
projetion_kmeans = projetion_kmeans.withColumn('x', vector_to_array('pca_features')[0])\
                                   .withColumn('y', vector_to_array('pca_features')[1])

In [None]:
projetion_kmeans.select(['x', 'y', 'cluster_pca', 'artists_song']).show()

In [None]:
import plotly.express as px

In [None]:
fig = px.scatter(projetion_kmeans.toPandas(), x='x', y='y', color='cluster_pca', hover_data=['artists_song'])
fig.show()

In [None]:
nome_musica = 'Taylor Swift - Blank Space'

In [None]:
cluster = projetion_kmeans.filter(projetion_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
cluster

In [None]:
musicas_recomendadas = projetion_kmeans.filter(projetion_kmeans.cluster_pca == cluster)\
                                       .select('artists_song', 'id', 'pca_features')
musicas_recomendadas.show()

In [None]:
componenetes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica)\
                                          .select('pca_features').collect()[0][0]
componenetes_musica                             

In [None]:
from scipy.spatial.distance import euclidean
from pyspark.sql.types import FloatType
import pyspark.sql.functions as f

In [None]:
def calcula_distance(value):
    return euclidean(componenetes_musica, value)

udf_calcula_distance = f.udf(calcula_distance, FloatType())

musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcula_distance('pca_features'))

In [None]:
recomendadas = spark.createDataFrame(musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])

recomendadas.show()

In [None]:
def recomendador(nome_musica):
    cluster = projetion_kmeans.filter(projetion_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
    musicas_recomendadas = projetion_kmeans.filter(projetion_kmeans.cluster_pca == cluster)\
                                       .select('artists_song', 'id', 'pca_features')
    componenetes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica)\
                                          .select('pca_features').collect()[0][0]
    
    def calcula_distance(value):
        return euclidean(componenetes_musica, value)

    udf_calcula_distance = f.udf(calcula_distance, FloatType())

    musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcula_distance('pca_features'))
    
    recomendadas = spark.createDataFrame(musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])

    return recomendadas

In [None]:
df_recomedada = recomendador('Taylor Swift - Blank Space')
df_recomedada.show()