In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/music-recommender/artist_data.txt
/kaggle/input/music-recommender/README.txt
/kaggle/input/music-recommender/user_artist_data.txt
/kaggle/input/music-recommender/artist_alias.txt


In [2]:
#install Apache Spark
!pip install pyspark --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, min, max, col, broadcast, when
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml.recommendation import ALS

# Cria uma sessão Spark
spark = SparkSession.builder \
    .appName("Music_Recommender_System") \
    .master("local[*]") \
    .config("spark.executor.memory", "70g") \
    .config("spark.driver.memory", "50g") \
    .config("spark.memory.offHeap.enabled",True) \
    .config("spark.memory.offHeap.size","16g")\
    .getOrCreate()

spark.sparkContext.setLogLevel('WARN')


raw_user_artist_path = "/kaggle/input/music-recommender/user_artist_data.txt" 
raw_user_artist_data = spark.read.text(raw_user_artist_path)
raw_user_artist_data.show(5)

raw_artist_data = spark.read.text("/kaggle/input/music-recommender/artist_data.txt")
raw_artist_data.show(5)

raw_artist_alias = spark.read.text("/kaggle/input/music-recommender/artist_alias.txt")
raw_artist_alias.show(5)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
+-------------------+
only showing top 5 rows

+--------------------+
|               value|
+--------------------+
|1134999\t06Crazy ...|
|6821360\tPang Nak...|
|10113088\tTerfel,...|
|10151459\tThe Fla...|
|6826647\tBodensta...|
+--------------------+
only showing top 5 rows

+-----------------+
|            value|
+-----------------+
| 1092764\t1000311|
| 1095122\t1000557|
| 6708070\t1007267|
|10088054\t1042317|
| 1195917\t1042317|
+-----------------+
only showing top 5 rows



In [4]:
# Preparing the Data

#split lines by space characters

user_artist_df = raw_user_artist_data.withColumn('user', 
                                                 split(raw_user_artist_data['value'], ' ').\
                                                 getItem(0).cast(IntegerType()))

user_artist_df = user_artist_df.withColumn('artist', 
                                           split(raw_user_artist_data['value'], ' ').\
                                            getItem(1).cast(IntegerType()))                                                 

user_artist_df = user_artist_df.withColumn('count', 
                                           split(raw_user_artist_data['value'], ' ').
                                           getItem(2).cast(IntegerType())).drop('value')                                            

user_artist_df.show(5)
user_artist_df.select([min("user"), max("user"), min("artist"),max("artist")]).show()

artist_by_id = raw_artist_data.withColumn('id', split(col('value'), '\s+', 2).
                                          getItem(0).cast(IntegerType())) 

artist_by_id = artist_by_id.withColumn('name', split(col('value'), '\s+', 2).\
                                       getItem(1).cast(StringType())).drop('value') 
artist_by_id.show(5)


artist_alias = raw_artist_alias.withColumn('artist', split(col('value'), '\s+').\
                                           getItem(0).cast(IntegerType())).\
                                withColumn('alias', split(col('value'), '\s+').\
                                           getItem(1).cast(StringType())).\
                                drop('value')  
artist_alias.show(5)

print('\n Same name, different ID')
artist_by_id.filter(artist_by_id.id.isin(1092764, 1000311)).show()

+-------+-------+-----+
|   user| artist|count|
+-------+-------+-----+
|1000002|      1|   55|
|1000002|1000006|   33|
|1000002|1000007|    8|
|1000002|1000009|  144|
|1000002|1000010|  314|
+-------+-------+-----+
only showing top 5 rows

+---------+---------+-----------+-----------+
|min(user)|max(user)|min(artist)|max(artist)|
+---------+---------+-----------+-----------+
|       90|  2443548|          1|   10794401|
+---------+---------+-----------+-----------+

+--------+--------------------+
|      id|                name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
|10151459| The Flaming Sidebur|
| 6826647|   Bodenstandig 3000|
+--------+--------------------+
only showing top 5 rows

+--------+-------+
|  artist|  alias|
+--------+-------+
| 1092764|1000311|
| 1095122|1000557|
| 6708070|1007267|
|10088054|1042317|
| 1195917|1042317|
+--------+-------+
only showing top 5 rows


 Same name, differ

In [5]:
# Building a First Model
print("\n Building a First Model\n")
train_data = user_artist_df.join(broadcast(artist_alias), 'artist', how='left')
# Get artist’s alias if it exists; otherwise, get original artist
train_data = train_data.withColumn('artist', when(col('alias').isNull(), col('artist')).otherwise(col('alias')))
train_data = train_data.withColumn('artist', col('artist').cast(IntegerType())).drop('alias')
train_data.cache()
print(train_data.count())


model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1, implicitPrefs=True, alpha=1.0, 
            userCol='user', itemCol='artist', ratingCol='count').\
        fit(train_data)

model.userFactors.show(1, truncate = False)
spark.stop()


 Building a First Model

24296858
+---+------------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                      |
+---+------------------------------------------------------------------------------------------------------------------------------+
|90 |[0.16020624, 0.20717518, -0.17194684, 0.06038469, 0.062727705, 0.54658705, -0.40481892, 0.43657345, -0.10396775, -0.042728312]|
+---+------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row

