In [65]:
pip install scikit-learn

Collecting scikit-learn
  Downloading scikit_learn-1.0.1-cp38-cp38-macosx_10_13_x86_64.whl (7.9 MB)
[K     |████████████████████████████████| 7.9 MB 4.4 MB/s eta 0:00:01
Collecting threadpoolctl>=2.0.0
  Downloading threadpoolctl-3.0.0-py3-none-any.whl (14 kB)
Installing collected packages: threadpoolctl, scikit-learn
Successfully installed scikit-learn-1.0.1 threadpoolctl-3.0.0
Note: you may need to restart the kernel to use updated packages.


In [89]:
from surprise import Dataset
from surprise import Reader
import pandas as pd
import numpy as np
import random
import nltk
import re
from unidecode import unidecode
from surprise.model_selection import KFold
from surprise import SVD


In [None]:
pd.options.display.max_rows = 5000

# Spark initialization

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, avg

from pyspark.serializers import MarshalSerializer
from pyspark.context import SparkContext
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
spark = SparkSession(sc)
spark.sparkContext is sc

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/20 00:01:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/20 00:01:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


True

# Initial Data Processing using Pandas

In [4]:
p = 1
df = pd.read_csv('./data/spotify_dataset.csv', on_bad_lines='skip')
df.dropna(inplace=True)

In [5]:
df.columns = df.columns.str.replace('"', '')
df.columns = df.columns.str.replace('name', '')
df.columns = df.columns.str.replace(' ', '')

In [6]:
def clean_artist(text):
    text = (str(text)).lower()
    FEAT_PAT = re.compile(r"[\s\S]+[\s]+(feat\.|ft\.|featuring|ft|feat)[\s]+[\s\S]+")
    AMP_PAT = re.compile(r"[\s\S]*(&|and|\+)[\s\S]*")
    #check if we have featured artist
    if FEAT_PAT.match(text):
        text = re.split(r"(feat\.|ft\.|featuring|ft|feat)", text)[0]
    
    #Remove & from all artists
    if AMP_PAT.match(text):
        text = re.split(r"&", text)[0]
    tok = nltk.RegexpTokenizer(r"\w+")
    word = " ".join(tok.tokenize(text))
    text = unidecode(word)
    w="".join(text.split(" "))
    text=w
    return text.strip()

In [7]:
df['clean_artist'] = df['artist'].apply(clean_artist)

**Save preprocessed dataframe to be processed in spark**

In [8]:
df.to_csv('./data/cleaned_df.csv', index=False)

In [9]:
clean_df = pd.read_csv('./data/cleaned_df.csv')

In [10]:
artist= pd.unique(clean_df['clean_artist'].values.ravel())
artist = pd.Series(np.arange(len(artist)), artist)
clean_df["artist_id"] = clean_df[['clean_artist']].applymap(artist.get)
clean_df = clean_df[['user_id','artist_id', 'clean_artist']]

In [11]:
clean_df = clean_df.astype(str)

In [12]:
clean_df.to_csv('./data/hugo_df.csv', index=False) #Save cleaned dataframe to hugo_df.csv

# Working with Spark

In [13]:
spark_df = spark.read.csv("./data/cleaned_df.csv", header=True) #Read df processed in pandas as spark_df

In [14]:
spark_df_agg = spark_df.groupBy('user_id', 'clean_artist').agg(count('*')) #Get frequencies

In [15]:
spark_df_agg=spark_df_agg.withColumnRenamed("count(1)","freq") #Rename aggregate column to freq

In [16]:
spark_clean_df = spark.read.csv('./data/hugo_df.csv', header=True) #Contains user_id, artist_id, clean_artist

In [17]:
spark_clean_df.show(2)

+--------------------+---------+-------------+
|             user_id|artist_id| clean_artist|
+--------------------+---------+-------------+
|9cc0cfd4d7d788510...|        0|elviscostello|
|9cc0cfd4d7d788510...|        0|elviscostello|
+--------------------+---------+-------------+
only showing top 2 rows



# Perform inner Join to get user_id, artist_id, clean_artist, and frequencies

In [18]:
combined_df = spark_df_agg.join(spark_clean_df, 
                                (spark_df_agg.clean_artist==spark_clean_df.clean_artist) & \
                                 (spark_df_agg.user_id==spark_clean_df.user_id),
                                'inner').select(spark_df_agg.user_id, spark_df_agg.clean_artist, spark_df_agg.freq, spark_clean_df.artist_id)

In [19]:
combined_df.count()

                                                                                

12855173

In [None]:
combined_df.show(2)

# 1).given user_id, return all of its songs

In [53]:
def get_all_artists(user_id):
#     res = list(combined_df.filter(combined_df['user_id'] == user_id).select('clean_artist').toPandas()['clean_artist'])
#     mvv_list = list(
#         tableA.select('clean_artist').toPandas()['clean_artist']
#     )
    
    combined_df.createOrReplaceTempView("hugo_table") #Create view to run sql
    res = spark.sql(f"SELECT clean_artist from hugo_table where user_id='{user_id}';").collect()
     
    return res

In [31]:
# res_list = [i.clean_artist for i in get_all_songs('00055176fea33f6e027cd3302289378b')]
res_list = get_all_artists('00055176fea33f6e027cd3302289378b')



                                                                                

In [49]:
my_artists = [list(i.asDict().values()) for i in res_list]

In [51]:
my_artists = [item for sublist in my_artists for item in sublist]

In [52]:
my_artists

['auryn',
 'bmike',
 'thecatempire',
 'thevamps',
 'ollymurs',
 'littlemix',
 'austinmahone',
 'blink182',
 'beamiller',
 'simpleplan',
 'christinaperri',
 'onerepublic',
 '5secondsofsummer',
 'jakemiller',
 'meghantrainor',
 'jannikbrunke',
 'thefray',
 'beckyg',
 'cimorelli',
 'abigailbreslin',
 'meghantonjes',
 'markronson',
 'alltimelow',
 'againstthecurrent',
 'highschoolmusicalcast',
 'greenday',
 'onedirection',
 'avrillavigne',
 'falloutboy',
 'jack',
 'taylorswift',
 'gerardway',
 'shawnmendes',
 'jamesarthur',
 'thewanted',
 'nickjonas',
 'panicatthedisco',
 'maroon5',
 'brunomars',
 'the1975',
 'edsheeran',
 'imaginedragons',
 'natalieimbruglia',
 'charlixcx',
 'demilovato']

# 2).given artist_id, return the corresponding unique artist

In [61]:
def get_unique_artist(artist_id):
    combined_df.createOrReplaceTempView("hugo_table") #Create view to run sql
    res = spark.sql(f"SELECT clean_artist from hugo_table where artist_id='{artist_id}';").collect()
    return list(res[0].asDict().values())[0]


In [62]:
x=get_unique_artist('0')

                                                                                

In [63]:
x

'elviscostello'

# Get all artists for user 00055176fea33f6e027cd3302289378b

In [None]:
sc.stop() #Run last

# Scale 

In [66]:
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range=(1,5))

In [75]:
combined_pandas_df = combined_df.write.csv?

In [None]:
combined_pandas_df = combined_df.write.csv

In [77]:
combined_df.coalesce(1).write.csv('./data/combined_pandas_df.csv',header=True)

                                                                                

In [78]:
combined_pandas_df = pd.read_csv('./data/combined_pandas_df.csv')

In [79]:
combined_pandas_df[["freq"]] = scaler.fit_transform(combined_pandas_df[["freq"]])
combined_pandas_df.head()

Unnamed: 0,user_id,clean_artist,freq,artist_id
0,00055176fea33f6e027cd3302289378b,5secondsofsummer,1.010762,707
1,00055176fea33f6e027cd3302289378b,5secondsofsummer,1.010762,707
2,00055176fea33f6e027cd3302289378b,5secondsofsummer,1.010762,707
3,00055176fea33f6e027cd3302289378b,5secondsofsummer,1.010762,707
4,00055176fea33f6e027cd3302289378b,5secondsofsummer,1.010762,707


In [81]:
reader = Reader(rating_scale=(1, 5))

In [83]:
data = Dataset.load_from_df(combined_pandas_df[['user_id', 'artist_id', 'freq']], reader)

In [84]:
from surprise.model_selection import cross_validate
from surprise import NormalPredictor

# Training with surprise

In [93]:
def get_testset_accuracy(testset):
    total_matches = 0
    for item in testset:
        uid = item[0]
        preds = []
        for id in combined_pandas_df.artist_id.values:
            preds.append(svd.predict(uid=uid, iid=id))
        iid=[]
        for pred in preds:
            iid.append(pred.iid)
        iid=list(dict.fromkeys(iid))
        predicted_artists = set()
        for i in iid[:30]: #Compare top 30 recommendations
#             artist = artist_df.loc[artist_df.artist_id == i].artist.values[0]
            artist = get_unique_artist(artist_id=i)
            predicted_artists.add(artist)
#         known_artists = list(set(df.loc[df["user_id"] == uid].artist.values))
        known_artist = get_all_artists(user_id=uid)
        #print(len(known_artists))
        total_matches += len(predicted_artists.intersection(known_artists))
    print(total_matches / (30 * len(testset)))      

In [94]:
# define a cross-validation iterator
kf = KFold(n_splits=2)
svd = SVD(n_epochs=1, verbose=True)
for trainset, testset in kf.split(data):
    # train and test algorithm.
    svd.fit(trainset)
    get_testset_accuracy(testset[:2])
    predictions = svd.test(testset)
    # Compute and print Root Mean Squared Error
    accuracy.rmse(predictions, verbose=True)

Processing epoch 0


                                                                                

KeyboardInterrupt: 

In [None]:
def get_top_songs(artist, num_songs = 5):
    return list(combined_pandas_df[combined_pandas_df['cleaned_artist'] == artist]['track'].value_counts()[0:num_songs].index)


In [None]:
def get_recommendation():
    res = {}
    for i in iid[:5]:
        artist = combined_pandas_df.loc[combined_pandas_df.artist_id == i].cleaned_artist.values[0]
        res[artist] = get_top_songs(artist)
    return res