# Modelo de recomendación

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from sklearn.model_selection import train_test_split
from sklearn.metrics.pairwise import cosine_similarity
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.models import Model

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
df = pd.read_csv('/kaggle/input/spotify-dataset/df_songs.csv')

In [None]:
df.head()

In [None]:
df.describe()

# Training

In [None]:
# Se decidió descartar 'mode ya que, al ser incluida en el dataset, las predicciones siempre coinciden con el "mode" original (Mayor o menor). Pero los oyentes no escuchan exclusivamente música de un modo, sino que consumen ambos modos de forma indistinta.
df.drop(columns=['mode'],inplace=True)

In [None]:
# Selecting relevant audio features
features = ['track_popularity', 'danceability', 'energy',
       'loudness', 'speechiness', 'acousticness', 'instrumentalness',
       'liveness', 'valence', 'tempo', 'duration_ms', 'year']
# ¿incluir track popularity?
# ¿¿ Eliminación 'acousticness' por alta correlación con 'energy' y 'loudness' ??
# Reintroducir feature: mode

X = df[features].values.astype('float32')

In [None]:
# Splitting the data
X_train, X_test = train_test_split(X, test_size=0.3, random_state=42)

In [None]:
model = Sequential([
    Input(shape=(len(features),)),
    Dense(128, activation='relu'),
    Dropout(0.3),
    Dense(64, activation='relu'),
    Dropout(0.3),
    Dense(32, activation='relu', name="embedding_layer"), # Embedding layer
    Dense(16, activation='relu'),
    Dense(len(features), activation='linear') # Output layer for reconstruction
])

model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')

In [None]:
# Entrenamiento del modelo y guardar el historial
history = model.fit(X_train, X_train, epochs=50, batch_size=32, validation_data=(X_test, X_test))

In [None]:
# Graficar la pérdida de entrenamiento y validación
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Curva de Entrenamiento y Validación')
plt.show()

In [None]:
# Define model with the functional API
input_layer = Input(shape=(len(features),))
x = Dense(128, activation='relu')(input_layer)
x = Dropout(0.3)(x)
embedding_layer = Dense(64, activation='relu', name="embedding_layer")(x)  # Named embedding layer
x = Dropout(0.3)(embedding_layer)
x = Dense(32, activation='relu')(x)
output_layer = Dense(len(features), activation='linear')(x)

# Compile the model
model = Model(inputs=input_layer, outputs=output_layer)
model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')

# Initialize the model by running a sample input through it
sample_input = np.random.rand(1, len(features))  # Sample input
_ = model.predict(sample_input)  # Initialize the model

# Extract embeddings using the embedding layer directly
embedding_model = Model(inputs=model.input, outputs=model.get_layer("embedding_layer").output)
song_embeddings = embedding_model.predict(X)

print("Embeddings shape:", song_embeddings.shape)


In [None]:
# Exportación del modelo
embedding_model.save("/kaggle/working/songs_embeddings.h5")

## Predicción

### Búsqueda por género

In [None]:
# Índices para facilitar la búsqueda por género
genres = ['pop', # 0
          'rock', # 1
          'rap', # 2
          'r&b', # 3
          'edm', # 4
          'latin'] # 5

df[df['playlist_genre'] == genres[1]]  # Selecciona sólo un género
#df[df['playlist_genre'] != genres[1]]  # Todos los demás géneros

### Búsqueda por artista/canción

In [None]:
# Búsqueda por artista/nombre
df.loc[df['track_artist'] == 'Perotá Chingó']
#df.loc[df['track_name'] == 'Yesterday']

### Búsqueda por ID

Ejemplos de testing:
- 3gu0fRSgFuc4FmrtIv0DnC (Reggaeton)
- 0r7CVbZTWZgbTCYdfa2P31 (pop)
- 5yY9lUy8nbvjM1Uyo1Uqoc (rap)
- 1VgQ6AZ8tVV0uhvhH9usuj ("latin") Vértigo by Perotá Chingó

In [None]:
# Búsqueda por ID
id_ = "5CMjjywI0eZMixPeqNd75R"
name = df.loc[df['track_id'] == id_]
song_index = df[df['track_id'] == id_].index.item()
print(name.iloc[0]['track_name'], 'by', name.iloc[0]['track_artist'], 'on album', name.iloc[0]['track_album_name'])
#print(song_index)
print()
df.iloc[song_index]

In [None]:
# Búsqueda por index
#song_index = 12616  # For example, the first song in the dataset
"""
name = df.loc[df['track_id'] == df.iloc[song_index]['track_id']]
print(name.iloc[0]['track_name'], 'by', name.iloc[0]['track_artist'], 'on album', name.iloc[0]['track_album_name'])
print()

df.iloc[song_index]
"""

### Generación de recomendaciones

In [None]:
num_pred = 5 # Modificar a gusto para obtener más predicciones
similarities = cosine_similarity([song_embeddings[song_index]], song_embeddings)[0]

# Get top n most similar songs (excluding itself)
similar_indices = np.argsort(similarities)[-(num_pred+1):-1][::-1]
similar_songs = df.iloc[similar_indices]

for i, song in enumerate(similar_songs.index):
    print(f"""{i+1}º) "{similar_songs.loc[song, 'track_name']}" by {similar_songs.loc[song, 'track_artist']} on album "{similar_songs.loc[song, 'track_album_name']}" ({similar_songs.loc[song, 'playlist_genre'].title()})\n""")

In [None]:
similar_songs

## Análisis de resultados

In [None]:
# 0. Canción original (user input)
song = df.iloc[song_index].copy()
song = song.drop(index=['track_id','track_name','track_artist','playlist_genre',
                       'track_album_id','track_album_name'])
song = song.astype(float)

# 0. Calcular la media de las características en el dataframe completo
df_mean = df[features].mean()

# 1. Filtrar las canciones similares usando los índices y calcular la media
similar_mean = similar_songs[features].mean()

# 2. Crear un DataFrame que contenga ambas medias para fácil comparación
comparison_df = pd.DataFrame({'Original song': song, 'Similar songs': similar_mean})

# 3. Calcular el RMSE entre la canción original y la media de las canciones similares
comparison_df['Similarity (RMSE)'] = np.sqrt((comparison_df['Original song'] - comparison_df['Similar songs']) ** 2)
comparison_df['Other songs'] = df_mean

# 4. Calcular el RMSE entre la media de canciones similares y la media general
comparison_df['Difference (RMSE)'] = np.sqrt((comparison_df['Similar songs'] - comparison_df['Other songs']) ** 2)

# 5. Compara el RMSE de la predicción contra la media del dataset (Normalizado)
comparison_df['Eficacia (%)'] =  comparison_df['Difference (RMSE)'] / (comparison_df['Similarity (RMSE)'] + comparison_df['Difference (RMSE)'])
# Valores cercanos a 1 indican una predicción excelente
# Valores cercanos a 0 indican una predicción pobre (RMSE similar entre la predicción y la media)

In [None]:
comparison_df

**NOTAS:**
- Similarity/Diference: representan la proximidad de los datos a través del RMSE (0 es "similar" | 1 es "diferente")
- Eficacia: 1 indica una predicción excelente. 0 indica una predicción pobre.

In [None]:
# Define the color map from blue (near 0) to red (near 1) for Eficacia
norm = plt.Normalize(0, 1)
sm = plt.cm.ScalarMappable(cmap="coolwarm", norm=norm)
sm.set_array([])  # Setting an empty array to avoid warnings

# Create the figure and plot each bar with color based on Eficacia value
fig, ax = plt.subplots(figsize=(15, 6))
bars = ax.bar(comparison_df.index, comparison_df['Eficacia (%)'], color=sm.to_rgba(comparison_df['Eficacia (%)']))

# Personalizar las etiquetas del eje x
ax.set_xticks(range(len(comparison_df.index)))
ax.set_xticklabels(comparison_df.index, rotation=45, ha='right', fontsize=10)

# Labels and title
ax.set_xlabel("Features")
ax.set_ylabel("Eficacia (Normalized between 0 and 1)")
ax.set_title("Normalized Eficacia for Each Feature")

# Add color bar with a specified axis
cbar = fig.colorbar(sm, ax=ax, orientation='vertical', pad=0.02)
cbar.set_label('Eficacia scale (0: Blue, 1: Red)')

plt.tight_layout()
plt.show()


In [None]:
threshold = 0.6
indices = comparison_df[comparison_df['Eficacia (%)'] > threshold].index
top_features = comparison_df.loc[indices].sort_values(by="Eficacia (%)", ascending=False)
print("Eficacia de los features representativos:")
for i, (feature, row) in enumerate(top_features['Eficacia (%)'].items(), start=1):
    print(f"{i}º) {feature}: {row*100:.1f}%")
    
#print(top_features['Eficacia'])

**Ideas:**
- Se puede ponderar el peso de ciertas features en la búsqueda (por ejemplo el año)
- Se puede incluir un feature que indique el idioma de la letra.

**Observaciones:**
- Es probable que mode no sea de utilidad. Al ser incluida en el dataset, la predicción siempre coincide con el "mode" original, pero los oyentes no escuchan exclusivamente música de un modo.
- Pareciera que la búsqueda de similitud tiende a evitar outliers.

# ‼️ Predicción para canción externa ⚠️

Recibe los features de la API de Spotify

{'track_id': '1z1Hg7Vb0AhHDiEmnDE79l',
 'track_name': 'All the Time - Don Diablo Remix',
 'track_artist': 'Zara Larsson',
 'track_artist_id': '1Xylc3o4UrD53lo9CvFvVg',
 'track_popularity': 42,
 'track_album_id': '1HoSmj2eLcsrR0vE9gThr4',
 'track_album_name': 'All the Time (Don Diablo Remix)',
 'album_release_date': '2019-07-05',
 'year': 2019,
 'danceability': 0.675,
 'energy': 0.931,
 'key': 1,
 'loudness': -3.432,
 'mode': 0,
 'speechiness': 0.0742,
 'acousticness': 0.0794,
 'instrumentalness': 2.33e-05,
 'liveness': 0.11,
 'valence': 0.613,
 'tempo': 124.008,
 'duration_ms': 176616}

In [None]:
# Crear X[new_song] con los features
features = {'track_id': '1z1Hg7Vb0AhHDiEmnDE79l',
 'track_name': 'All the Time - Don Diablo Remix',
 'track_artist': 'Zara Larsson',
 'track_artist_id': '1Xylc3o4UrD53lo9CvFvVg',
 'track_popularity': 42,
 'track_album_id': '1HoSmj2eLcsrR0vE9gThr4',
 'track_album_name': 'All the Time (Don Diablo Remix)',
 'album_release_date': '2019-07-05', # Debería llamarse 'track_album_release_date'
 'year': 2019,
 'danceability': 0.675,
 'energy': 0.931,
 'key': 1,
 'loudness': -3.432,
 'mode': 0,
 'speechiness': 0.0742,
 'acousticness': 0.0794,
 'instrumentalness': 2.33e-05,
 'liveness': 0.11,
 'valence': 0.613,
 'tempo': 124.008,
 'duration_ms': 176616}

#features = pd.DataFrame([features])


In [None]:
song = pd.DataFrame([features])

In [None]:
# Input from API data
song

In [None]:
# REMOVER para producción
# Verifico columnas faltantes

A = song.columns.tolist()
B = df.columns.tolist()
print(len(A), len(B))
for col in A:
    #print(col)
    for i in range(len(B)):
        if B[i] == col:
            #print(col, 'encontrado')
            #print(i)
            B.pop(i)
            break

if B != []:
    print(f'{B} no encontrado en "song"')
else:
    print('Todas las columnas encontradas en "song"')
    
B = song.columns.tolist()
A = df.columns.tolist()
print(len(A), len(B))
for col in A:
    #print(col)
    for i in range(len(B)):
        if B[i] == col:
            #print(col, 'encontrado')
            #print(i)
            B.pop(i)
            break

if B != []:
    print(f'{B} no encontrado en "dataframe"')
else:
    print('Todas las columnas encontradas en "dataframe"')

## Feature transformation

Elimina datos que no se usan para el embedding

In [None]:
song.drop(columns=['mode', 'album_release_date', 'track_artist_id', 'album_release_date', 'key'],inplace=True)

Aplica la misma transformación de los datos originales

In [None]:
# Escalado de variables
import pickle
from sklearn.preprocessing import MinMaxScaler

with open('/kaggle/input/transformations/scaler_minmax.pkl', 'rb') as f:
    scaler = pickle.load(f)

# Transformar nuevos datos usando el scaler cargado
song_scaled[['track_popularity','loudness','tempo','duration_ms','year']] = scaler.transform(song[['track_popularity','loudness','tempo','duration_ms','year']])
song_scaled.describe()

In [None]:
# Define custom transformers
def cube(x):
    return x ** 3

def custom(x):
    return x**3

# Mapping of transformer functions to names
transformer_name_map = {
    np.log1p: 'log1p',
    np.sqrt: 'sqrt',
    np.square: 'square',
    np.exp: 'exp',
    cube: 'cube',
    custom: '<lambda>'
}

# Reverse mapping of names to functions
function_map = {v: k for k, v in transformer_name_map.items()}

In [None]:
skew_df

In [None]:
# Corrección de Skewness
skew_df = pd.read_csv('/kaggle/input/transformations/skew_correct.csv')

def apply_transformations(df, skew_df):
    for idx, row in skew_df.iterrows():
        feature = row['feature']
        transformer_name = row['transformer']
        
        if transformer_name != 'None' and feature in df.columns:
            transformer = function_map.get(transformer_name)
            if transformer is not None:
                if transformer == np.sqrt:
                    df[feature] = transformer(df[feature] + 1)
                else:
                    df[feature] = transformer(df[feature])
            else:
                print(f"Transformer '{transformer_name}' not found for feature '{feature}'.")
    return df

# Assuming 'new_df' is your new dataset
song_scaled = apply_transformations(song_scaled, skew_df)


In [None]:
# Output formatted for model
song_scaled

### Comparación de features escalados (verificación)

In [None]:
song.describe()

In [None]:
song_scaled.describe()

In [None]:
df.describe()

## Prediction

Genera embeddings con los features del input externo

In [None]:
X_song = song_scaled[features].values.astype('float32')

In [None]:
# Para nuevas caciones exsternas al dataset
new_song_embedding = embedding_model.predict(X_song)

Compara esos embeddings contra las canciones del dataset para buscar similitudes

In [None]:
num_pred = 5 # Modificar a gusto para obtener más predicciones
similarities = cosine_similarity(new_song_embedding, song_embeddings)[0]

# Get top n most similar songs (excluding itself)
similar_indices = np.argsort(similarities)[-(num_pred+1):-1][::-1]
similar_songs = df.iloc[similar_indices]

for i, song in enumerate(similar_songs.index):
    print(f"{i+1}º) \"{similar_songs.loc[song, 'track_name']}\" by {similar_songs.loc[song, 'track_artist']} on album \"{similar_songs.loc[song, 'track_album_name']}\"\n")

In [None]:
similar_songs

## Análisis de resultados

In [None]:
# 0. Canción original (user input)
song = song_scaled.copy().squeeze()
song = song.drop(index=['track_id','track_name','track_artist',
                       'track_album_id','track_album_name'])
song = song.astype(float)

# 0. Calcular la media de las características en el dataframe completo
df_mean = df[features].mean()

# 1. Filtrar las canciones similares usando los índices y calcular la media
similar_mean = similar_songs[features].mean()

# 2. Crear un DataFrame que contenga ambas medias para fácil comparación
comparison_df = pd.DataFrame({'Original song': song, 'Similar songs': similar_mean})

# 3. Calcular el RMSE entre la canción original y la media de las canciones similares
comparison_df['Similarity (RMSE)'] = np.sqrt((comparison_df['Original song'] - comparison_df['Similar songs']) ** 2)
comparison_df['Other songs'] = df_mean

# 4. Calcular el RMSE entre la media de canciones similares y la media general
comparison_df['Difference (RMSE)'] = np.sqrt((comparison_df['Similar songs'] - comparison_df['Other songs']) ** 2)

# 5. Compara el RMSE de la predicción contra la media del dataset (Normalizado)
comparison_df['Eficacia (%)'] =  comparison_df['Difference (RMSE)'] / (comparison_df['Similarity (RMSE)'] + comparison_df['Difference (RMSE)'])
# Valores cercanos a 1 indican una predicción excelente
# Valores cercanos a 0 indican una predicción pobre (RMSE similar entre la predicción y la media)

In [None]:
comparison_df

**NOTAS:**
- Similarity/Diference: representan la proximidad de los datos a través del RMSE (0 es "similar" | 1 es "diferente")
- Eficacia: 1 indica una predicción excelente. 0 indica una predicción pobre.

In [None]:
comparison_df.index

In [None]:
# Define the color map from blue (near 0) to red (near 1) for Eficacia

norm = plt.Normalize(0, 1)
sm = plt.cm.ScalarMappable(cmap="coolwarm", norm=norm)
sm.set_array([])  # Setting an empty array to avoid warnings

# Create the figure and plot each bar with color based on Eficacia value
fig, ax = plt.subplots(figsize=(15, 6))

bars = ax.bar(
    comparison_df.index, 
    comparison_df['Eficacia (%)'], 
    color=sm.to_rgba(comparison_df['Eficacia (%)'])
)

# Personalizar las etiquetas del eje x
ax.set_xticks(range(len(comparison_df.index)))
ax.set_xticklabels(comparison_df.index, rotation=45, ha='right', fontsize=10)

# Etiquetas de los ejes y título
ax.set_xlabel("Features")
ax.set_ylabel("Eficacia (Normalized between 0 and 1)")
ax.set_title("Normalized Eficacia for Each Feature")

# Añadir una barra de colores
cbar = fig.colorbar(sm, ax=ax, orientation='vertical', pad=0.02)
cbar.set_label('Eficacia scale (0: Blue, 1: Red)')

# Mostrar el gráfico
plt.tight_layout()
plt.show()

In [None]:
threshold = 0.6
indices = comparison_df[comparison_df['Eficacia (%)'] > threshold].index
top_features = comparison_df.loc[indices].sort_values(by="Eficacia (%)", ascending=False)
print("Eficacia de los features representativos:")
for i, (feature, row) in enumerate(top_features['Eficacia (%)'].items(), start=1):
    print(f"{i}º) {feature}: {row*100:.1f}%")
    
#print(top_features['Eficacia'])