In [0]:
#NOTEBOOK PARA O RECOMENDADOR DE MUSICAS

In [0]:
import pyspark.pandas as pd

In [0]:
path = '/Volumes/workspace/default/spotify/dados_tratados/data.parquet'
df_data = pd.read_parquet(path)
df_data.head()

In [0]:
df_data.info()

In [0]:
df_data = df_data.dropna()

In [0]:
df_data.info()

In [0]:
df_data['artists_song'] = df_data.artists + ' - ' + df_data.name

In [0]:
df_data.head(5)

In [0]:
df_data.info()

In [0]:
X = df_data.columns.tolist()
X.remove('artists')
X.remove('name')
X.remove('artists_song')
X.remove('id')
X.remove('release_date')
X.remove('bad_chars')
X

In [0]:
df_data = df_data.to_spark()

In [0]:
from pyspark.ml.feature import VectorAssembler

In [0]:
dados_encoded_vector = VectorAssembler(inputCols=X, 
outputCol='features').transform(df_data)

In [0]:
dados_encoded_vector.select('features').show(truncate=False, n=5)

In [0]:
from pyspark.ml.feature import StandardScaler

In [0]:
scaler = StandardScaler(inputCol='features', outputCol='features_scaled')
model_scaler = scaler.fit(dados_encoded_vector)
dados_musicas_scaler = model_scaler.transform(dados_encoded_vector)

In [0]:
dados_musicas_scaler.select('features_scaled').show(truncate=False, n=5)

In [0]:
k = len(X)
k

In [0]:
from pyspark.ml.feature import PCA

In [0]:
pca = PCA(k=k, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
dados_musicas_pca = model_pca.transform(dados_musicas_scaler)

In [0]:
sum(model_pca.explainedVariance)*100

In [0]:
lista_valores = [sum(model_pca.explainedVariance[0:i+1]) for i in range(k)]
lista_valores

In [0]:
import numpy as np

In [0]:
k = sum(np.array(lista_valores) <= 0.7)
k

In [0]:
pca = PCA(k=k, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
dados_musicas_pca_final = model_pca.transform(dados_musicas_scaler)

In [0]:
dados_musicas_pca_final.select('pca_features').show(truncate=False, n=5)

In [0]:
from pyspark.ml import Pipeline

In [0]:
pca_pipeline = Pipeline(stages=[VectorAssembler(inputCols=X, outputCol='features'), StandardScaler(inputCol='features', outputCol='features_scaled'), PCA(k=6, inputCol='features_scaled', outputCol='pca_features')])

In [0]:
model_pca_pipeline = pca_pipeline.fit(df_data)

In [0]:
projection = model_pca_pipeline.transform(df_data) 

In [0]:
projection.select('pca_features').show(truncate=False, n=5)

In [0]:
from pyspark.ml.clustering import KMeans

In [0]:
SEED = 1224

In [0]:
kmeans = KMeans(k=50, featuresCol='pca_features', predictionCol='cluster_pca', seed=SEED)
model_kmeans = kmeans.fit(projection)

In [0]:
projection_kmeans = model_kmeans.transform(projection)

In [0]:
projection_kmeans.select(['pca_features','cluster_pca']).show()

In [0]:
from pyspark.ml.functions import vector_to_array

In [0]:
projection_kmeans = projection_kmeans.withColumn('x', vector_to_array('pca_features')[0]).withColumn('y', vector_to_array('pca_features')[1])

In [0]:
projection_kmeans.select(['x','y','cluster_pca','artists_song']).show()

In [0]:
import plotly.express as px

In [0]:
fig = px.scatter(projection_kmeans.toPandas(), x='x', y='y', color='cluster_pca', hover_data=['artists_song'])
fig.show(renderer="databricks")

In [0]:
#Como o volume é grande, plotado gráfico com amostragem da base

sample_df = projection_kmeans.sample(0.2).toPandas()   # 20% da base
fig = px.scatter(sample_df, x='x', y='y', color='cluster_pca', hover_data=['artists_song'])
fig.show()

In [0]:
nome_musica = 'Taylor Swift - Blank Space'

In [0]:
cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
cluster 

In [0]:
musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster).select('artists_song', 'id', 'pca_features')
musicas_recomendadas.show()

In [0]:
componentes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica).select('pca_features').collect()[0][0]
componentes_musica

In [0]:
from scipy.spatial.distance import euclidean
from pyspark.sql.types import FloatType
import pyspark.sql.functions as f

In [0]:
def calcula_distancia(value):
    return euclidean(componentes_musica, value)

udf_calcular_distancia = f.udf(calcula_distancia, FloatType())

musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcular_distancia('pca_features'))


In [0]:
recomendadas = spark.createDataFrame(musicas_recomendadas_dist.select(['artists_song', 'id', 'Dist']).sort('Dist').take(10))

recomendadas.show()

In [0]:
def recomendador(nome_musica):
    cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
    musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster).select('artists_song', 'id', 'pca_features')
    componentes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica).select('pca_features').collect()[0][0]
    
    def calcula_distancia(value):
        return euclidean(componentes_musica, value)
    
    udf_calcular_distancia = f.udf(calcula_distancia, FloatType())
    
    musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcular_distancia('pca_features'))

    recomendadas = spark.createDataFrame(musicas_recomendadas_dist.select(['artists_song', 'id', 'Dist']).sort('Dist').take(10))
    
    return recomendadas
    

In [0]:
df_recomendada = recomendador('Ed Sheeran - Shape of You')
df_recomendada.show()

In [0]:
!pip install spotipy

In [0]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth, SpotifyClientCredentials

In [0]:
scope = "user-library-read playlist-modify-private"

OAuth = SpotifyOAuth(
    scope = scope,
    redirect_uri = 'https://localhost:5000/callback',
    client_id = '5f77d319bed74568aca64f083cda15b4',
    client_secret = '720c1afe865a47b9b7207caf2367337e')

In [0]:
client_credentials_manager = SpotifyClientCredentials(client_id = '5f77d319bed74568aca64f083cda15b4',
     client_secret = '720c1afe865a47b9b7207caf2367337e')
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

In [0]:
id = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('id').collect()[0][0]
id

In [0]:
sp.track(id)

In [0]:
playlist_id = df_recomendada.select('id').collect()

In [0]:
playlist_track = []
for id in playlist_id:
        playlist_track.append(sp.track(id[0]))

In [0]:
def recomendador(nome_musica):
    cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
    musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster).select('artists_song', 'id', 'pca_features')
    componentes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica).select('pca_features').collect()[0][0]
    
    def calcula_distancia(value):
        return euclidean(componentes_musica, value)
    
    udf_calcular_distancia = f.udf(calcula_distancia, FloatType())
    
    musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcular_distancia('pca_features'))

    recomendadas = spark.createDataFrame(musicas_recomendadas_dist.select(['artists_song', 'id', 'Dist']).sort('Dist').take(10))
    
    id = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('id').collect()[0][0]

    playlist_id = recomendadas.select('id').collect()
    
    playlist_track = []
    for id in playlist_id:
        playlist_track.append(sp.track(id[0]))

    return len(playlist_track)
    

In [0]:
recomendador('Ed Sheeran - Shape of You')

In [0]:
!pip install scikit-image

In [0]:
import matplotlib.pyplot as plt
from skimage import io

nome_musica = 'Ed Sheeran - Shape of You'

id = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('id').collect()[0][0]

track = sp.track(id)

url = track['album']['images'][1]['url']
name = track['name']

image = io.imread(url)
plt.imshow(image)
plt.xlabel(name, fontsize = 10)

In [0]:
import matplotlib.pyplot as plt
from skimage import io

def visualize_songs(name, url):
    plt.figure(figsize=(15, 10))
    columns = 5

    for i, u in enumerate(url):
        ax = plt.subplot(len(url) // columns + 1, columns, i + 1)
        image = io.imread(u)
        plt.imshow(image)
        ax.get_yaxis().set_visible(False)
        plt.xticks(color='w', fontsize = 0.1)
        plt.yticks(color='w', fontsize = 0.1)
        plt.xlabel(name[i], fontsize = 10)
        plt.tight_layout(h_pad=0.7, w_pad=0)
        plt.subplots_adjust(wspace=0, hspace=0)
        plt.grid(visible=False)
    plt.show()

In [0]:
playlist_id = df_recomendada.select('id').collect()

name = []
url = []
for i in playlist_id:
    track = sp.track(i[0])
    url.append(track['album']['images'][1]['url'])
    name.append(track['name'])


In [0]:
visualize_songs(name, url)

In [0]:
import matplotlib.pyplot as plt
import requests
from PIL import Image, ImageDraw
from io import BytesIO
import textwrap

def fetch_rounded_image(url, size=(300, 300), radius=40):
    """Baixa a imagem, redimensiona e aplica cantos arredondados."""
    resp = requests.get(url)
    img = Image.open(BytesIO(resp.content)).convert("RGBA")
    img = img.resize(size, Image.LANCZOS)

    # máscara para cantos arredondados
    mask = Image.new("L", size, 0)
    draw = ImageDraw.Draw(mask)
    draw.rounded_rectangle((0, 0, size[0], size[1]), radius=radius, fill=255)

    img.putalpha(mask)
    return img

def visualize_songs(names, urls, images_per_row=5, thumb_size=(300, 300)):
    total = len(urls)
    rows = (total + images_per_row - 1) // images_per_row

    # figura com fundo escuro
    plt.style.use("default")
    fig, ax = plt.subplots(
        rows, images_per_row, 
        figsize=(images_per_row * 3, rows * 3.8), 
        facecolor="#121212"
    )

    # quando só há uma linha/coluna, ax não é matriz
    if rows == 1:
        ax = [ax]
    if images_per_row == 1:
        ax = [[a] for a in ax]

    idx = 0
    for r in range(rows):
        for c in range(images_per_row):
            eixo = ax[r][c]
            eixo.set_facecolor("#121212")
            eixo.axis("off")

            if idx < total:
                try:
                    img = fetch_rounded_image(urls[idx], size=thumb_size, radius=40)
                    eixo.imshow(img)
                except Exception:
                    # fallback se der erro ao baixar imagem
                    eixo.text(
                        0.5, 0.5, "IMG\nERROR",
                        ha="center", va="center",
                        color="white", fontsize=10
                    )

                # título (nome da música/artista) truncado
                title = str(names[idx])
                # quebra em até 2 linhas, 16 chars cada
                wrapped = "\n".join(textwrap.wrap(title, width=16)[:2])

                eixo.set_title(
                    wrapped,
                    fontsize=9,
                    color="white",
                    pad=6
                )

            idx += 1

    plt.subplots_adjust(wspace=0.2, hspace=0.5)
    plt.show()

In [0]:
visualize_songs(name, url)