In [None]:
!pip install pyspark
!pip install implicit

In [3]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from typing import Tuple, List
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
import implicit
from pathlib import Path

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("CollaborativeFilteringExample").getOrCreate()

class CollaborativeFiltering:
    def __init__(self, user_artists_file, artists_file):
        self.user_artists_file = user_artists_file
        self.artists_file = artists_file
        self.user_artists = None
        self.artist_names = None
        self.load_data()

    def load_data(self):
        # Read CSV files with Spark
        user_artists_df = spark.read.csv(self.user_artists_file, header=True, inferSchema=True)
        artists_df = spark.read.csv(self.artists_file, header=True, inferSchema=True)

        # Convert Spark DataFrames to Pandas DataFrames
        user_artists_df = user_artists_df.toPandas()
        artists_df = artists_df.toPandas()

        # Process data as before
        self.user_artists = self.create_user_artists_matrix(user_artists_df)
        self.artist_names = dict(zip(artists_df['id'], artists_df['name']))

    def create_user_artists_matrix(self, user_artists_df):
        # Process data as before
        unique_users = user_artists_df['userID'].unique()
        unique_artists = user_artists_df['artistID'].unique()

        user_artist_matrix = np.zeros((len(unique_users), len(unique_artists)))

        for _, row in user_artists_df.iterrows():
            user_idx = np.where(unique_users == row['userID'])[0][0]
            artist_idx = np.where(unique_artists == row['artistID'])[0][0]
            user_artist_matrix[user_idx, artist_idx] = row['weight']

        return csr_matrix(user_artist_matrix)

    def get_artist_name_from_id(self, artist_id):
        return self.artist_names.get(artist_id, "Unknown")

class ImplicitRecommender:
    def __init__(self, collaborative_filtering: CollaborativeFiltering, implicit_model: implicit.recommender_base.RecommenderBase):
        self.collaborative_filtering = collaborative_filtering
        self.implicit_model = implicit_model

    def fit(self, user_artists_matrix: csr_matrix) -> None:
        self.implicit_model.fit(user_artists_matrix)

    def recommend(self, user_id: int, user_artists_matrix: csr_matrix, n: int = 5) -> Tuple[List[str], List[float]]:
        artist_ids, scores = self.implicit_model.recommend(user_id, user_artists_matrix.getrow(user_id), N=n)
        artists = [self.collaborative_filtering.get_artist_name_from_id(artist_id) for artist_id in artist_ids]
        return artists, scores

# Load data
collaborative_filtering = CollaborativeFiltering(
    user_artists_file="/content/user_artists.csv",
    artists_file="/content/artists.csv"
)

# Load user artists data
user_artists_df = pd.read_csv(collaborative_filtering.user_artists_file)

user_artists_matrix = collaborative_filtering.create_user_artists_matrix(user_artists_df)

# Instantiate ALS using implicit (Tweak these parameters for different results)
implicit_model = implicit.als.AlternatingLeastSquares(factors=120, iterations=25, regularization=0.05)

# Instantiate recommender, fit, and recommend
recommender = ImplicitRecommender(collaborative_filtering, implicit_model)
recommender.fit(user_artists_matrix)

# Recommend for user 615
artists, scores = recommender.recommend(3, user_artists_matrix, n=10)

# Print results
for artist, score in zip(artists, scores):
    print(f"{artist}: {score}")

  0%|          | 0/25 [00:00<?, ?it/s]

김종국: 1.0474143028259277
Gloria: 1.0212959051132202
Go Koyashiki: 0.9259945154190063
Nevada Tan: 0.9089708924293518
Soilwork: 0.9002859592437744
Unknown: 0.8979610800743103
Mahalia Jackson: 0.8863801956176758
Cascada: 0.8723768591880798
The Sonics: 0.8578011989593506
Funker Vogt: 0.8533897399902344
