In [3]:
import pandas as pd
import numpy as np
import plotly.graph_objs as go
import plotly.io as pio
import os
import random
import math
from tqdm import tqdm
from sklearn.metrics import mean_squared_error
from scipy.stats import spearmanr
from multiprocessing import Pool, cpu_count
import uuid
import logging
import json
import gradio as gr

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [1]:
# load dataset

SONGS_FILE = "../data/Spotify_MPD_Feature_Engineered.csv"
S = 50  # Hyper Parameter
totReco = 0  # Number of total recommendation till now
startConstant = 5  # for low penalty in starting phase
# Read data
Songs = pd.read_csv(SONGS_FILE)
# Songs.shape
NFEATURE = Songs.shape[1] - 60  # Number of Features (excluding all columns before 'Artiste Popularity')
ratedSongs = set()
userRecommendations = {}  # Store user recommendations
# User data structure
users = {}

def register_user_interface(name, age, gender, country, edu_level):
    if name in users:
        return "User already exists. Please login."
    user_id = str(uuid.uuid4())
    users[user_id] = {
        "name": name,
        "age": age,
        "gender": gender,
        "country": country,
        "edu_level": edu_level,
        "features": np.zeros(NFEATURE + 4, dtype=np.float64),
        "rated_songs": set()
    }
    userRecommendations[user_id] = []
    return f"User {name} registered successfully. Your user ID is {user_id}"

def login_user_interface(user_id):
    if user_id not in users:
        return "Invalid user ID. Please register first."
    try:
        with open(f"user_data_{user_id}.json", "r") as file:
            user_data = json.load(file)
            user_data["features"] = np.array(user_data["features"])
            user_data["rated_songs"] = set(user_data["rated_songs"])
            users[user_id] = user_data
    except FileNotFoundError:
        pass
    return f"Welcome back, {users[user_id]['name']}!"

def compute_utility(user_features, song_features, epoch, s=S):
    """ Compute utility U based on user preferences and song preferences """
    user_features = user_features.copy()
    song_features = song_features.copy()
    dot = user_features.dot(song_features)
    ee = (1.0 - 1.0 * math.exp(-1.0 * epoch / s))
    res = dot * ee
    return res

def get_song_features(song):
    """ Feature of particular song """
    if isinstance(song, pd.Series):
        features = song[12:-48].values  # Exclude all columns before 'Artiste Popularity' and after 'Genre_world-music'
        return features.astype(np.float64)  # Convert features to float64
    elif isinstance(song, pd.DataFrame):
        return get_song_features(pd.Series(song.loc[song.index[0]]))
    else:
        raise TypeError("{} should be a Series or DataFrame".format(song))

def update_features(user_features, song_features, rating, t):
    """
    Update user features based on the song features and user rating
    """
    impact_factor = (rating - 3) / 2  # Scale the impact factor based on the rating (-1 to 1)
    user_features[:-4] = user_features[:-4].astype(np.float64)  # Convert user_features[:-4] to float64
    
    # Check for NaN values in song_features and replace them with 0
    song_features = np.nan_to_num(song_features)
    
    user_features[:-4] += song_features * impact_factor
    return user_features

def rate_song_interface(user_id, song_id, rating):
    if user_id not in users:
        return "Invalid user ID. Please register or login first."
    if song_id not in Songs.index:
        return "Invalid song ID."
    user_features = users[user_id]["features"]
    song_features = get_song_features(Songs.loc[song_id])
    user_features = update_features(user_features, song_features, rating, totReco)
    users[user_id]["rated_songs"].add(song_id)
    return f"Song rated successfully. Your preferences have been updated."

def get_user_data(user_id):
    if user_id not in users:
        return None
    return users[user_id]

def get_song_genre(song):
    genres = []
    for col in Songs.columns[-48:]:
        try:
            if isinstance(song, pd.Series):
                value = song[col]
            else:
                value = getattr(song, col)

            if isinstance(value, pd.Series):
                if value.any():  # Check if any value in the Series is True
                    genres.append(col[6:])  # Remove the "Genre_" prefix
            elif value is True:
                genres.append(col[6:])  # Remove the "Genre_" prefix
        except AttributeError:
            pass  # Skip the attribute if it doesn't exist
    return genres


def get_recommendations_interface(user_id):
    if user_id not in users:
        return "Invalid user ID. Please register or login first."
    user_features = users[user_id]["features"]
    liked_genres = {genre for genre, value in enumerate(user_features[12:-4]) if value > 0}
    recommendations, justification = get_recommendations(initialize_q_table(), user_features, user_id, liked_genres)
    recommendations_list = recommendations.apply(lambda song: f"{song['Music']} by {song['artname']}", axis=1).tolist()
    return recommendations_list, justification

def choose_action(q_table, user_features, epsilon, rated_songs):
    """
    Choose an action (recommend a song) based on the Q-table and an exploration strategy,
    excluding songs that have already been rated by the user.
    """
    unrated_songs = Songs.index.difference(rated_songs)
    print(f"Unrated songs: {unrated_songs}")
    print(f"Unrated songs type: {type(unrated_songs)}")
    print(f"Unrated songs length: {len(unrated_songs)}")
    
def epsilon_greedy_policy(q_table, epsilon):
    """
    Epsilon-greedy policy:
    With probability epsilon, choose random action
    """
    if np.random.uniform(0, 1) < epsilon:
        return True
    else:
        return False
    
    if epsilon_greedy_policy(q_table, epsilon):
        # Choose a random action (song) from the unrated songs
        action = Songs.loc[unrated_songs].sample(1).index[0]
    else:
        # Ensure q_table and user_features have the correct shapes
        q_table = q_table.squeeze()
        user_features = user_features.reshape(1, -1)

        print(f"Q-table shape: {q_table.shape}")
        print(f"User features shape: {user_features.shape}")
        print(f"Q-table values:\n{q_table}")
        print(f"User features values:\n{user_features}")

        # Calculate Q-values for all songs
        q_values = np.dot(q_table, user_features.T).squeeze()
        print(f"Q-values shape: {q_values.shape}")
        
        if q_values.ndim == 0:
            # If q_values is a scalar, broadcast it to an array with the same length as unrated_songs
            q_values = np.full(len(unrated_songs), q_values)
        else:
            # Ensure q_values has the same length as unrated_songs
            q_values = q_values[:len(unrated_songs)]
        
        # Filter out indices in unrated_songs that are out of bounds for q_values
        valid_indices = unrated_songs[unrated_songs < len(q_values)]
        
        # Filter out rated songs from Q-values
        unrated_q_values = q_values[valid_indices]
        
        # Replace NaN values with 0
        unrated_q_values = np.nan_to_num(unrated_q_values, 0)
        
        print(f"Unrated Q-values shape: {unrated_q_values.shape}")
        action = valid_indices[unrated_q_values.argmax()]

    print(f"Chosen action: {action}")
    return action
    

def get_recommendations(q_table, user_features, user_id, liked_genres):
    """
    Get recommendations based on the learned Q-table or policy
    """
    # Check if user features contain NaN values
    if np.isnan(user_features).any():
        logger.warning("User features contain NaN values. Replacing with 0.")
        user_features = np.nan_to_num(user_features)

    q_values = np.dot(q_table.reshape(1, -1), user_features.reshape(-1, 1))

    # Check if Q-values contain NaN values
    if np.isnan(q_values).any():
        logger.warning("Q-values contain NaN values. Skipping songs with NaN Q-values.")
    
    sorted_indices = q_values.argsort()[0][::-1]  # Sort indices in descending order
    
    recommendations = []
    for index in sorted_indices:
        song = Songs.iloc[index]
        song_genres = get_song_genre(song)
        if not np.isnan(q_values[0, index]) and q_values[0, index] > 0 and any(genre in liked_genres for genre in song_genres):
            recommendations.append(song)
        if len(recommendations) >= 10:
            break
    
    if not recommendations:
        # If no recommendations found based on liked genres, fallback to top Q-value songs
        for index in sorted_indices:
            if not np.isnan(q_values[0, index]) and q_values[0, index] > 0:
                recommendations.append(Songs.iloc[index])
            if len(recommendations) >= 10:
                break
    
    recommendations = pd.DataFrame(recommendations)
    
    # Provide justification for the recommendations
    justification = "The recommendations are based on your preferences, demographics, and the songs you have rated highly. "
    justification += "We have learned from your ratings and selected songs that align with your taste. "
    justification += "Songs that you rated lower have been filtered out to provide more relevant suggestions."
    
    return recommendations, justification


def initialize_q_table():
    """
    Initialize q_table with all values set to 0
    """
    q_table = np.zeros(NFEATURE + 4)
    return q_table

# Define Gradio interface functions here
def gradio_register_user(name, age, gender, country, edu_level):
    # Call the existing register_user function
    
    return register_user_interface(name, age, gender, country, edu_level)

def gradio_login_user(user_id):
    # Call the existing login_user function
    return login_user_interface(user_id)

def gradio_rate_song(user_id, song_id, rating):
    # Call the existing rate_song function
    return rate_song_interface(user_id, song_id, rating)

def gradio_get_recommendations(user_id):
    # Call the existing get_recommendations function
    q_table = initialize_q_table()  # Initialize Q-table
    user_data = get_user_data(user_id)
    if user_data is None:
        return "Invalid user ID. Please register or login first."
    user_features = user_data["features"]
    liked_genres = {genre for genre, value in enumerate(user_features[12:-4]) if value > 0}
    recommendations, justification = get_recommendations(q_table, user_features, user_id, liked_genres)
    recommendations_list = recommendations.apply(lambda song: f"{song['Music']} by {song['artname']}", axis=1).tolist()
    return recommendations_list, justification

# Create Gradio app
iface = gr.Interface(
    fn=gradio_get_recommendations,
    inputs="text",
    outputs=["text", "text"],
    description="Get song recommendations based on your preferences"
)

# Run the Gradio app
iface.launch()

if __name__ == "__main__":
    # Launch the Gradio app
    iface.launch()



  gr.inputs.Textbox(label="Name"),
  gr.inputs.Textbox(label="Name"),
  gr.inputs.Textbox(label="Name"),
  gr.inputs.Slider(minimum=0, maximum=100, label="Age"),
  gr.inputs.Slider(minimum=0, maximum=100, label="Age"),
  gr.inputs.Radio(choices=["M", "F", "O"], label="Gender"),
  gr.inputs.Radio(choices=["M", "F", "O"], label="Gender"),
  gr.inputs.Textbox(label="Country"),
  gr.inputs.Textbox(label="Country"),
  gr.inputs.Textbox(label="Country"),
  gr.inputs.Dropdown(choices=["Graduate", "High School", "Middle School", "Undergraduate"], label="Education Level")
  gr.inputs.Dropdown(choices=["Graduate", "High School", "Middle School", "Undergraduate"], label="Education Level")


Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.




IMPORTANT: You are using gradio version 3.50.0, however version 4.29.0 is available, please upgrade.
--------


In [None]:
# # Define the Gradio interface
# iface = gr.Interface(
#     fn=register_user_interface,
#     inputs=[
#         gr.inputs.Textbox(label="Name"),
#         gr.inputs.Slider(minimum=18, maximum=80, label="Age"),
#         gr.inputs.Radio(choices=["M", "F", "O"], label="Gender"),
#         gr.inputs.Textbox(label="Country"),
#         gr.inputs.Dropdown(choices=["Graduate", "High School", "Middle School", "Undergraduate"], label="Education Level")
#     ],
#     outputs="text"
# )

# # Launch the Gradio app
# iface.launch()
