In [105]:
# !pip install numpy
# !pip install pandas
# !pip install scikit-learn
# !pip install tensorflow
# !pip install supabase
# !pip install python-dotenv
# !pip install matplotlib
# !pip install tqdm

In [106]:
import time, numpy as np
import pandas as pd
import os
import json
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import precision_recall_curve, auc
import tensorflow as tf
from supabase import create_client
from dotenv import load_dotenv
import matplotlib.pyplot as plt
import tqdm
from sklearn.preprocessing import MinMaxScaler

# Data Preprocessing

In [107]:
load_dotenv()

False

In [108]:
url = os.environ['SUPABASE_URL']
key = os.environ['SUPABASE_KEY']

supabase = create_client(url, key)

data = supabase.table("liked_songs").select("user_id, song_id, like_count").execute()
d = json.loads(data.json())

# Storing the extracted data in a variable as a list of dictionaries
liked_songs = d.get('data', [])

In [109]:
songs_data = pd.read_csv("/data/notesync-dataset-1k.csv")
liked_songs_data = pd.DataFrame(liked_songs)
saved_user_id = liked_songs_data['user_id'].iloc[-1]
recent_user_id = saved_user_id

In [110]:
test_likes = pd.read_csv("/content/data/mixed_likes_dataset.csv")

In [111]:
# Mapping song_ids to integer IDs
songs_data['mapped_song_id'] = songs_data['song_id'].astype('category').cat.codes
liked_songs_data['mapped_song_id'] = liked_songs_data['song_id'].astype('category').cat.codes
test_likes['mapped_song_id'] = test_likes['song_id'].astype('category').cat.codes

# Mapping UUIDs to integer IDs
unique_user_ids = liked_songs_data['user_id'].unique()
# unique_user_ids = test_likes['user_id'].unique()
user_id_map = {user_id: i for i, user_id in enumerate(unique_user_ids)}

# Converting UUIDs in liked_songs_data to integer IDs
liked_songs_data['user_id'] = liked_songs_data['user_id'].map(user_id_map)
test_likes['user_id'] = test_likes['user_id'].map(user_id_map)

# Content-Based Filtering

In [112]:
def calculate_user_cosine_similarity(user_id, liked_songs, all_songs):
    # Filter liked songs for the specified user
    user_liked_songs = liked_songs[liked_songs['user_id'] == user_id]

    if user_liked_songs.empty:
        raise ValueError("No liked songs found for the user: ", user_id)

    # Features extracted from dataset
    features = ['speechiness', 'acousticness', 'instrumentalness', 'tempo']

    user_liked_songs_features = user_liked_songs.merge(all_songs, on='song_id', how='inner')[features]

    # Normalizing the scale to be between 0 and 1, this is used for specifically scaling the tempo since its range can get closer to 200
    # This makes sure the scores are calculated accurately and aren't vastly different from each other
    scaler = MinMaxScaler()
    all_songs_scaled = scaler.fit_transform(all_songs[features])
    user_liked_songs_scaled = scaler.transform(user_liked_songs_features)

    similarity_scores = cosine_similarity(user_liked_songs_scaled, all_songs_scaled)

    mean_similarity_scores = np.mean(similarity_scores, axis=0)

    return mean_similarity_scores

# Collaborative Filtering

In [113]:
class MatrixFactorization():
    def __init__(self, ratings, dimensions, alpha, beta, iterations):
        self.ratings = ratings
        self.num_users, self.num_songs = ratings.shape
        self.dimensions = dimensions
        self.alpha = alpha
        self.beta = beta
        self.iterations = iterations

    def train_model(self):
        # Initialize latent factors U (user) and S (song)
        self.U = tf.Variable(tf.random.normal(shape=(self.num_users, self.dimensions), stddev=0.1/self.dimensions))
        self.S = tf.Variable(tf.random.normal(shape=(self.num_songs, self.dimensions), stddev=0.1/self.dimensions))

        # Training loop
        for _ in tqdm.tqdm(range(self.iterations), desc="Training"):
            mse = self.stochastic_gd()
            print(" MSE: ", mse)

    def stochastic_gd(self):
        total_error = 0.0
        num_ratings = 0

        for i in range(self.num_users):
            for j in range(self.num_songs):
                if self.ratings[i, j] == 1:
                    # Extract the prediction scores
                    prediction = tf.reduce_sum(self.U[i, :] * self.S[j, :])

                    # Compute the error values
                    err = (self.ratings[i, j] - prediction)
                    total_error += err**2
                    num_ratings += 1

                    # Updating the user latent factors
                    update_U = tf.expand_dims(self.alpha * (err * self.S[j, :] - self.beta * self.U[i, :]), axis=0)
                    indices_U = tf.expand_dims([i], axis=0)
                    self.U = tf.tensor_scatter_nd_add(self.U, indices_U, update_U)

                    # Updating the song latent factors
                    update_S = tf.expand_dims(self.alpha * (err * self.U[i, :] - self.beta * self.S[j, :]), axis=0)
                    indices_S = tf.expand_dims([j], axis=0)
                    self.S = tf.tensor_scatter_nd_add(self.S, indices_S, update_S)

        # Calculating the MSE for each iteration
        mse = total_error / num_ratings
        return mse

    # Generate predicted ratings
    def predict(self):
        predictions = tf.matmul(self.U, tf.transpose(self.S))

        # Min-max normalization
        min_scores = tf.reduce_min(predictions)
        max_scores = tf.reduce_max(predictions)
        predictions_normalized = (predictions - min_scores) / (max_scores - min_scores)

        return predictions_normalized

In [None]:
# Extracting the IDs
user_ids = liked_songs_data['user_id'].unique()
song_ids = songs_data['song_id']

user_to_index = {user_id: index for index, user_id in enumerate(user_ids)}
song_to_index = {song_id: index for index, song_id in enumerate(song_ids)}

# Initializing the user-song matrix
ratings_matrix = np.zeros((len(user_ids), len(song_ids)))

# Filling the matrix with 1s where a user has liked a song
for _, row in liked_songs_data.iterrows():
    if row['song_id'] in song_to_index:
        user_idx = user_to_index[row['user_id']]
        song_idx = song_to_index[row['song_id']]
        ratings_matrix[user_idx, song_idx] = 1

# Generating the ratings matrix
ratings_matrix.shape, np.sum(ratings_matrix)

# Initialize and train matrix factorization model
matrix_fz = MatrixFactorization(ratings_matrix, dimensions=15, alpha=0.05, beta=0.05, iterations=200)
matrix_fz.train_model()

# Generate predicted ratings
predicted_ratings = matrix_fz.predict()

# Hybrid Recommendations

In [115]:
def recommend_songs(user_id, n=20, alpha=0.5):
    get_user_id = user_id_map[user_id]

    # Content-based filtering scores
    content_based_scores = calculate_user_cosine_similarity(get_user_id, liked_songs_data, songs_data)
    print("\n Content Based Scores: ", content_based_scores)

    # Collaborative filtering scores
    collaborative_scores = matrix_fz.predict()[get_user_id].numpy()
    print("\n Collaborative Scores: ", collaborative_scores)

    # Ensuring the sizes of both scores are the same
    assert len(content_based_scores) == len(collaborative_scores) == len(songs_data), "Scores and songs dataset must match in length."

    # Calculating hybrid scores
    hybrid_scores = alpha * content_based_scores + (1 - alpha) * collaborative_scores
    print("\n Hybrid Scores: ", hybrid_scores)

    # Adding the hybrid scores to the column
    songs_data['hybrid_score'] = hybrid_scores

    # Filtering out the songs that don't exist on the application
    filtered_songs_data = songs_data.dropna(subset=['song_id'])

    # Sorting the scores in descending order and taking the top n rows
    recommended_songs_df = filtered_songs_data.sort_values(by='hybrid_score', ascending=False).head(n)
    recommended_songs_df = recommended_songs_df[['hybrid_score', 'song_id', 'artist', 'track_name']]

    return recommended_songs_df

In [None]:
recommended_songs_hybrid = recommend_songs(recent_user_id)
print("\nHybrid recommended songs for user", recent_user_id, ":")
print(recommended_songs_hybrid)

# Push Recommended Songs to Database

In [117]:
# song_ids = recommended_songs_hybrid['song_id'].values
# artists = recommended_songs_hybrid['artist'].values

# for song_id in song_ids:
#     supabase.table("recommended_songs").upsert({'user_id': recent_user_id, 'song_id': song_id}).execute()