# Download data and inspect data

In [1]:
!mkdir data
!wget "http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz" -P data

--2018-05-08 01:16:49--  http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
Resolving www.iro.umontreal.ca (www.iro.umontreal.ca)... 132.204.26.36
Connecting to www.iro.umontreal.ca (www.iro.umontreal.ca)|132.204.26.36|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 135880312 (130M) [application/x-gzip]
Saving to: ‘data/profiledata_06-May-2005.tar.gz’


2018-05-08 01:16:50 (88.5 MB/s) - ‘data/profiledata_06-May-2005.tar.gz’ saved [135880312/135880312]



In [2]:
!ls

csds-material  HW6.ipynb	      hw6.py	  spark-2.3.0-bin-hadoop2.7
data	       hw6-local-Copy1.ipynb  miniconda3


In [3]:
!tree data -hs

data
└── [130M]  profiledata_06-May-2005.tar.gz

0 directories, 1 file


In [4]:
!tar -xf "data/profiledata_06-May-2005.tar.gz" -C data

In [5]:
!tree data -hs

data
├── [4.0K]  profiledata_06-May-2005
│   ├── [2.8M]  artist_alias.txt
│   ├── [ 53M]  artist_data.txt
│   ├── [1.2K]  README.txt
│   └── [407M]  user_artist_data.txt
└── [130M]  profiledata_06-May-2005.tar.gz

1 directory, 5 files


In [6]:
!cat data/profiledata_06-May-2005/README.txt

Music Listening Dataset
Audioscrobbler.com
6 May 2005
--------------------------------

This data set contains profiles for around 150,000 real people
The dataset lists the artists each person listens to, and a counter
indicating how many times each user played each artist

The dataset is continually growing; at the time of writing (6 May 2005) 
Audioscrobbler is receiving around 2 million song submissions per day

We may produce additional/extended data dumps if anyone is interested 
in experimenting with the data. 

Please let us know if you do anything useful with this data, we're always
up for new ways to visualize it or analyse/cluster it etc :)


License
-------

This data is made available under the following Creative Commons license:
http://creativecommons.org/licenses/by-nc-sa/1.0/


Files
-----

user_artist_data.txt
    3 columns: userid artistid playcount

artist_data.txt
    2 columns: artistid artist_name

artist_alias.txt
    2 columns:

In [9]:
!echo "check user_artist_data.txt"
!head -n 3 data/profiledata_06-May-2005/user_artist_data.txt
!echo ""
!echo "line count:"
!wc -l data/profiledata_06-May-2005/user_artist_data.txt

check user_artist_data.txt
1000002 1 55
1000002 1000006 33
1000002 1000007 8

line count:
24296858 data/profiledata_06-May-2005/user_artist_data.txt


In [10]:
!echo "check artist_data.txt"
!head -n 3 data/profiledata_06-May-2005/artist_data.txt
!echo ""
!echo "line count:"
!wc -l data/profiledata_06-May-2005/artist_data.txt

check artist_data.txt
1134999	06Crazy Life
6821360	Pang Nakarin
10113088	Terfel, Bartoli- Mozart: Don

line count:
1848579 data/profiledata_06-May-2005/artist_data.txt


In [11]:
!echo "check artist_alias.txt"
!head -n 3 data/profiledata_06-May-2005/artist_alias.txt
!echo ""
!echo "line count:"
!wc -l data/profiledata_06-May-2005/artist_alias.txt

check artist_alias.txt
1092764	1000311
1095122	1000557
6708070	1007267

line count:
193027 data/profiledata_06-May-2005/artist_alias.txt


# Build Spark Data Frames

In [12]:
# make sure pyspark tells workers to use python3 not 2 if both are installed
import os
os.environ['PYSPARK_PYTHON'] = '/home/augustinus/miniconda3/bin/python'

In [13]:
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession \
    .builder \
    .appName("Recommender") \
    .config('spark.executor.memory','8G') \
    .config('spark.driver.memory','16G')\
    .config('spark.driver.maxResultSize','16G')\
    .getOrCreate()

Create user_artist DataFrame

In [15]:
df_user_artist = spark.read.csv("data/profiledata_06-May-2005/user_artist_data.txt",
                                sep=' ',
                                schema='userid INT, artistid INT, playcount INT')

In [16]:
df_user_artist.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- artistid: integer (nullable = true)
 |-- playcount: integer (nullable = true)



In [17]:
df_user_artist.show(5)

+-------+--------+---------+
| userid|artistid|playcount|
+-------+--------+---------+
|1000002|       1|       55|
|1000002| 1000006|       33|
|1000002| 1000007|        8|
|1000002| 1000009|      144|
|1000002| 1000010|      314|
+-------+--------+---------+
only showing top 5 rows



Create artist DataFrame

In [19]:
df_artist = spark.read.csv("data/profiledata_06-May-2005/artist_data.txt",
                            sep='\t',
                            schema='artistid INT, artist_name STRING')
df_artist.printSchema()
df_artist.show(5)

root
 |-- artistid: integer (nullable = true)
 |-- artist_name: string (nullable = true)

+--------+--------------------+
|artistid|         artist_name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
|10151459| The Flaming Sidebur|
| 6826647|   Bodenstandig 3000|
+--------+--------------------+
only showing top 5 rows



Create artist_alias DataFrame

In [22]:
df_artist_alias = spark.read.csv("data/profiledata_06-May-2005/artist_alias.txt",
                                sep='\t',
                                schema='badid INT, goodid INT')
df_artist_alias.printSchema()
df_artist_alias.show(5)

root
 |-- badid: integer (nullable = true)
 |-- goodid: integer (nullable = true)

+--------+-------+
|   badid| goodid|
+--------+-------+
| 1092764|1000311|
| 1095122|1000557|
| 6708070|1007267|
|10088054|1042317|
| 1195917|1042317|
+--------+-------+
only showing top 5 rows



In [23]:
# Register SQL Tables
df_user_artist.createOrReplaceTempView('user_artist')
df_artist.createOrReplaceTempView('artist')
df_artist_alias.createOrReplaceTempView('artist_alias')
spark.sql("show tables").show()

+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
|        |      artist|       true|
|        |artist_alias|       true|
|        | user_artist|       true|
+--------+------------+-----------+



# Preprocessing

* First, the aliases data set should be applied to convert all artist IDs to a canonical ID, if a different canonical ID exists. 
* Second, the data should be converted into Rating objects, which is the implementation’s abstraction for user-product-value data. Despite the name, Rating is suitable for use with implicit data

In [91]:
training_data=\
spark.sql("""
select userid, artistid, sum(playcount) as count
from user_artist
group by userid,artistid
""")


[Row(userid=1000002, artistid=1004315, count=1),
 Row(userid=1000002, artistid=1004395, count=1),
 Row(userid=1000002, artistid=1035248, count=17),
 Row(userid=1000002, artistid=1198, count=51),
 Row(userid=1000002, artistid=344, count=18),
 Row(userid=1000019, artistid=1000951, count=4),
 Row(userid=1000019, artistid=1004255, count=1),
 Row(userid=1000019, artistid=1004398, count=2),
 Row(userid=1000019, artistid=1063841, count=6),
 Row(userid=1000020, artistid=6663544, count=4),
 Row(userid=1000022, artistid=1000313, count=44),
 Row(userid=1000022, artistid=1000993, count=8),
 Row(userid=1000022, artistid=1001408, count=2),
 Row(userid=1000022, artistid=1001483, count=17),
 Row(userid=1000022, artistid=1002395, count=3),
 Row(userid=1000022, artistid=1004409, count=3),
 Row(userid=1000022, artistid=1066890, count=13),
 Row(userid=1000022, artistid=1238891, count=2),
 Row(userid=1000023, artistid=1160349, count=19),
 Row(userid=1000023, artistid=2007, count=14),
 Row(userid=1000023, a

# Build ALS Recommender

In [36]:
spark.sql("""
select *
from user_artist
limit 1
""").show()

+-------+--------+---------+
| userid|artistid|playcount|
+-------+--------+---------+
|1000002|       1|       55|
+-------+--------+---------+



In [80]:
from pyspark.ml.recommendation import ALS

In [81]:
als = ALS(rank=50,regParam=1,alpha=40,
          userCol='userid',itemCol='artistid',ratingCol='playcount',
          seed=0,numUserBlocks=12,numItemBlocks=12)

als_recommender = als.fit(df_user_artist)
print('Model fitted')

Model fitted


In [79]:
als_recommender.save('model.bin')

ALS_46b2aa6ec589ecb23976

# Recommend top 10 Artists for user 2093760

In [86]:
user_subsets = \
    spark.sql("""
        select distinct userid
        from user_artist
        where userid in (2093760)
        """)
user_subsets.show()

+-------+
| userid|
+-------+
|2093760|
+-------+



In [87]:
recommendations = als_recommender.recommendForUserSubset(user_subsets, 10)
recommendations.printSchema()

root
 |-- userid: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- artistid: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [88]:
recommendations.createOrReplaceTempView('recommendations')

top10 = spark.sql("""
select explode(recommendations.artistid) as artistid
from recommendations 
where userid = 2093760
""")
top10.createOrReplaceTempView('top10_recommended_artists_for_2093760')

top10_recommended_artist_names = \
spark.sql("""
select artist_name
from artist join top10_recommended_artists_for_2093760 
on artist.artistid = top10_recommended_artists_for_2093760.artistid
""")

In [89]:
top10_recommended_artist_names.show()

+--------------------+
|         artist_name|
+--------------------+
|        David Nadien|
|Firm Johnson & Th...|
|          dj antoine|
|          Prankradio|
|lil wayne ft. man...|
|            dj_beast|
|American School O...|
|Colton Jazz Ensemble|
|           DJ Kubesz|
|Wayne Hussey (Mis...|
+--------------------+

