## Table of contents
1. [Load the data](#1.-Load-the-data) <br>
    1.1 Load song table<br>
    1.2 Load user table <br>
    1.3 Load user play frequency table<br>
2. [Spark performance basics](#2.-Spark-performance-basics)<br>
3. [Explore the data with Spark APIs](#3.-Explore-the-data-with-Spark-APIs)<br>
    3.1 [Clean song table](#3.1-Clean-song-table) <br>
    3.2 [Clean play table and user table](#3.2-Clean-play-table-and-user-table) <br>
4. [Visualize the data](#4.-Visualize-the-data)<br>
5. [Build the recommender system](#5.-Build-the-recommender-system)<br>
    5.1 Setup training and test set<br>
    5.2 Train collaborative filtering<br>
    5.3 Tune parameters <br>
    5.4 Evaluate recommendation results<br>
6. [Hybrid recommender system](#6.-Hybrid-recommender-system)<br>
    6.1 Setup vectors<br>
    6.2 Compute similarity matrix<br>
    6.3 Recommend for Selected user <br>
    
7. Summary and next steps

In [1]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, count, struct, row_number, when, isnan, log,lit
from pyspark.sql.functions import round as cround
from pyspark.sql.window import Window

from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# The code was removed by DSX for sharing.

## 1. Load the data

Only for the first time. <br>
Cleaned data will be saved and could be imported in future. <<br>

Jump to other sections: 
- [Build the recommender system](#5.-Build-the-recommender-system)<br>
- [Hybrid recommender system](#6.-Hybrid-recommender-system)<br>

In [None]:
spark = SparkSession.builder.getOrCreate()
user_table_raw = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(bmos.url('musicrecommendation', 'valid_user_highfreq.csv'))

user_table_raw.take(5)

song_table= spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(bmos.url('musicrecommendation', 'song_table.csv'))
print(song_table.take(5))

song_freq = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(bmos.url('musicrecommendation', 'user_song_freq.csv'))
print(song_freq.take(5))

download_table = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(bmos.url('musicrecommendation', 'clean_download.csv'))
print (download_table.take(5))

In [7]:
user_db = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(bmos.url('musicrecommendation', '3_1uid.csv'))
print (user_db.select('uid').distinct().count())

264715


## 2. Spark performance basics

In [8]:
print (sc.defaultParallelism)
print (sc.getConf().toDebugString())
print ("Number of partitions for the song_freq DataFrame: " + str(song_freq.rdd.getNumPartitions()))

4
hive.metastore.warehouse.dir=file:/gpfs/global_fs01/sym_shared/YPProdSpark/user/sf05-764985d1937dab-a222de131660/notebook/work/spark-warehouse
spark.app.id=app-20171021174259-0155-332c25ad-3f8b-4fff-be46-e709b7342dff
spark.app.name=PySparkShell
spark.deploy.resourceScheduler.factory=org.apache.spark.deploy.master.EGOResourceSchedulerFactory
spark.driver.host=10.143.133.71
spark.driver.maxResultSize=1210M
spark.driver.memory=1512M
spark.driver.port=35869
spark.eventLog.dir=/gpfs/fs01/user/sf05-764985d1937dab-a222de131660/events
spark.eventLog.enabled=true
spark.executor.extraJavaOptions=-Djava.security.egd=file:/dev/./urandom
spark.executor.id=driver
spark.executor.memory=6G
spark.extraListeners=com.ibm.spaas.listeners.DB2DialectRegistrar
spark.history.fs.logDirectory=/gpfs/fs01/user/sf05-764985d1937dab-a222de131660/events
spark.logConf=true
spark.master=spark://yp-spark-dal09-env5-0022:7089
spark.port.maxRetries=512
spark.r.command=/usr/local/src/bluemix_jupyter_bundle.v65/R/bin/Rscr

## 3. Explore the data with Spark APIs

In [36]:
user_table_raw.show(truncate=False)
print ("Number of users: ", user_table_raw.count())
print ("Number of different users: " + str(user_table_raw.select('uid').distinct().count()))

+---+----------+---------+
|_c0|Unnamed: 0|uid      |
+---+----------+---------+
|1  |1         |154563989|
|2  |2         |154806874|
|3  |3         |154777984|
|4  |4         |154801899|
|5  |5         |154522980|
|6  |6         |154466362|
|7  |7         |154467953|
|8  |8         |158752252|
|9  |9         |154559964|
|10 |10        |154542883|
|11 |11        |154828695|
|12 |12        |154723056|
|13 |13        |154751052|
|14 |14        |154630129|
|15 |15        |154684841|
|16 |16        |154799108|
|17 |17        |154786598|
|18 |18        |154561771|
|19 |19        |154508382|
|20 |20        |154710857|
+---+----------+---------+
only showing top 20 rows

Number of users:  264714
Number of different users: 264714


In [21]:
song_table.show(truncate=False)
print ("Number of songs: ", song_table.count())
print ("Number of different songs: " + str(song_table.select('song_id').distinct().count()))

+--------+---------+-------------------------------+------+-----------+
|song_id |song_type|song_name                      |singer|song_length|
+--------+---------+-------------------------------+------+-----------+
|602239  |null     |薛凯琪                            |0     |null       |
|160911  |null     |蔡依林&周杰伦                        |0     |null       |
|1033156 |null     |汪苏泷                            |0     |null       |
|294622  |null     |DJ舞曲                           |0     |null       |
|517174  |null     |梦鸽                             |0     |null       |
|6606144 |null     |杨小曼&冷漠                         |0     |null       |
|6432663 |null     |小乔                             |0     |null       |
|6587633 |null     |韩宇                             |0     |null       |
|6587662 |null     |韩宇                             |0     |null       |
|158182  |null     |张学友                            |0     |null       |
|1037626 |null     |张学友                            |0     |null 

In [5]:
song_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- song_type: string (nullable = true)
 |-- song_name: string (nullable = true)
 |-- singer: string (nullable = true)
 |-- song_length: string (nullable = true)



### 3.1 Clean song table 
##### Need to clean song_table:
- remove invalid song_id 
- get single entry for each song_id (most common song)
- drop song_length column because 1) large variance 2) not very relevant

In [9]:
# song_table.createOrReplaceTempView('song_table')
song_table.createOrReplaceTempView('song_table')
song_table_valid = spark.sql("select *  from song_table where song_id > 0 and song_id is not null")

print ("Number of songs: ", song_table_valid.count())
print ("Number of different songs: " + str(song_table_valid.select('song_id').distinct().count()))

Number of songs:  3230980
Number of different songs: 1559987


In [10]:
# get most common non-zero song_type
type_counts = song_table_valid.groupBy(['song_id', 'song_type'])\
    .count().alias('cnt')\
    .where(col('song_type') != '0')

max_type = (type_counts
    .groupBy('song_id')
    .agg(F.max(struct(col('count'), col('song_type'))).alias('max'))
    .select(col('song_id'), col('max.song_type')))

# get most common not null song_name 
name_counts = song_table_valid.groupBy(['song_id', 'song_name'])\
    .count().alias('cnt')\
    .where(col('song_name').isNotNull())
    
max_name = (name_counts.groupBy('song_id')
            .agg(F.max(struct(col('count'), col('song_name'))).alias('max'))
            .select(col('song_id'),col('max.song_name')))


# get most common not null singer 
singer_counts = song_table_valid.groupBy(['song_id', 'singer']).count().alias('cnt').where(col('singer').isNotNull())
w = Window().partitionBy('song_id').orderBy(col('count').desc())
max_singer = (singer_counts
              .withColumn('rn', row_number().over(w))
              .where(col('rn')==1)
              .select('song_id', 'singer'))

note: tried pandas - takes a long time for me; both Window or struct should work 

In [11]:
print (max_type.select('song_id').distinct().count())
print (max_name.select('song_id').distinct().count())
print (max_singer.select('song_id').distinct().count())

231216
1559532
1547516


In [12]:
songs = song_table_valid.select('song_id').distinct().alias('songs')
song_unique = songs\
    .join(max_type, 'song_id','left')\
    .join(max_name, 'song_id','left')\
    .join(max_singer, 'song_id','left')\
    .select('song_id', 'song_type','song_name', 'singer')\

song_unique = song_unique.na.fill('0', subset=['song_type'])

In [22]:
song_unique.show()
print (song_unique.select('song_id').distinct().count())

+--------+---------+--------------------+----------------+
| song_id|song_type|           song_name|          singer|
+--------+---------+--------------------+----------------+
|  100140|        0|             天外天上天无涯|             陈洁丽|
|10015022|        0|                最后一次|             薛晓枫|
| 1003644|        0|Save The One, Sav...|  T.M.Revolution|
| 1004266|        0|        Broken heart|             黄义达|
| 1006370|        0|           二十四式太极拳音乐|             纯音乐|
| 1006422|        0|                  浣纱|              朱洁|
|10065669|        1|                花房姑娘|              崔健|
|10071852|        0|Dirt Road Anthem ...|  Country Nation|
|10087323|        0|Helden sterben ei...| Michael Wendler|
| 1009129|        1|    Dietro L'Incanto|Ludovico Einaudi|
|  100964|        1|                  问情|             黄思婷|
| 1010103|        0|Sunday Sunshine ／...|          いとうかなこ|
|10101536|        0|          打击乐曲 爵士鼓独奏| Various Artists|
|  101021|        0|           I Believe|             Er

In [13]:
song_unique.groupBy('song_type').count().sort(col('count').desc()).show()

+---------+-------+
|song_type|  count|
+---------+-------+
|        0|1328771|
|        1| 155797|
|        2|  65042|
|        3|  10053|
|       73|     22|
|       90|     18|
|       89|     12|
|       91|     12|
|       48|      7|
|       88|      6|
|       43|      6|
|       66|      6|
|       60|      6|
|       26|      6|
|       30|      6|
|       27|      5|
|       41|      5|
|       33|      5|
|       82|      5|
|       32|      4|
+---------+-------+
only showing top 20 rows



In [14]:
song_unique.coalesce(1).write\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .save(bmos.url('musicrecommendation', 'cleaned_song.csv'))

### 3.2 Clean play table and user table
- remove extremely high frequency (0.9999 quantile at approximately 1000 play frequency)
- remove play history without uid 

Number of valid users:  264708 <br>
Number of invalid users:  186

In [15]:
song_freq = song_freq.withColumn('freq', song_freq['freq'].cast('integer'))
song_freq = song_freq.filter((col('uid').isNotNull()) & (col('uid') > '0') &
                             (col('song_id').isNotNull()) & (col('song_id')>'0'))

In [7]:
song_freq.sort(col('freq').desc()).show()

+---------+------+--------+-----+
|      uid|device| song_id| freq|
+---------+------+--------+-----+
|167982849|    ar| 4554016|73609|
|  1791497|    ar| 3401476|51858|
|   751824|    ar| 9950164|46970|
|  1685126|    ar|15249349|41265|
|  1685126|    ar| 9950164|39207|
| 37025504|    ar| 9950164|35198|
|  1791497|    ar|15198178|30975|
| 37025504|    ar|15249349|29830|
|  1791497|    ar| 6468891|26765|
|  1791497|    ar|  442265|23907|
|  1685126|    ar| 5237384|22949|
|  1791497|    ar| 9950164|22849|
| 22730453|    ar| 7005106|22294|
| 22730453|    ar| 5965686|22289|
|   497685|    ar| 9950164|21574|
|  1062806|    ar| 9950164|20929|
|  1791497|    ar| 5245130|19813|
| 37025504|    ar| 5237384|19799|
|  1685126|    ar| 6468891|19709|
|  1791497|    ar|  125802|19697|
+---------+------+--------+-----+
only showing top 20 rows



In [16]:
cutoff_freq = song_freq.approxQuantile('freq', [0.99], 0.005)
print (cutoff_freq)

[54.0]


In [17]:
cutoff_freq = 1000

In [18]:
extreme_freq = song_freq.filter(col('freq') >= cutoff_freq)
# get valid song played frequency
valid_freq = song_freq.filter(col('freq') < cutoff_freq)

# get valid users 
outlier_user = extreme_freq.select('uid').distinct()
outlier_user.createOrReplaceTempView('filter_view')
valid_user = user_table_raw.where('uid not in (select uid from filter_view)')

In [19]:
print ("Number of valid users: ", valid_user.select('uid').distinct().count())
print ("Number of invalid users: ", outlier_user.select('uid').distinct().count())
# Number of valid users:  264708
# Number of invalid users:  186

Number of valid users:  264708
Number of invalid users:  186


In [None]:
# too confusing to add this feature
# song_length = song_table.groupBy(['song_id', 'length']).avg(col('song_length')).alias('avg_length').where((col('song_length').isNotNull()) & (col('song_length') > '200') & (col('song_length') <'720'))

In [18]:
download_table.show(truncate=False)

+---+-----------+------+----------+------------------------------+---------+
|_c0|uid        |device|song_id   |song_name                     |paid_flag|
+---+-----------+------+----------+------------------------------+---------+
|0  |null       |ip    |6945370.0 |null                          |null     |
|1  |1685126.0  |ar    |170455.0  |顺流、逆流                         |null     |
|2  |736305.0   |ar    |23380344.0|一人我喊另类(伤感版)                   |null     |
|3  |168042561.0|ar    |6292506.0 |帝都                            |null     |
|4  |1749320.0  |ar    |21473237.0|三生三世十里桃花                      |null     |
|5  |155948236.0|ar    |93388.0   |亚拉伯跳舞女郎                       |null     |
|6  |167794453.0|ar    |497722.0  |藕断丝连                          |null     |
|7  |168505311.0|ip    |4188142.0 |null                          |null     |
|8  |168031064.0|ar    |4243838.0 |爱情码头(2651,cn 天地人音乐网)          |null     |
|9  |167626177.0|ar    |1080516.0 |无法原谅(电视剧《回家的诱惑》主题曲)           |null     |

In [11]:
song_freq.show(truncate=False)

+---------+------+--------+----+
|uid      |device|song_id |freq|
+---------+------+--------+----+
|751824   |ar    |6483029 |385 |
|168156920|ip    |6792060 |5   |
|497685   |ar    |7207401 |26  |
|1062806  |ar    |6841262 |50  |
|168195436|ar    |12808784|22  |
|1685126  |ar    |59582   |26  |
|168286187|ar    |4188404 |2   |
|37025504 |ar    |481552  |733 |
|168478031|ar    |9822502 |4   |
|168406030|ar    |909773  |7   |
|168410987|ar    |5425869 |10  |
|168511270|ar    |6817428 |68  |
|168115240|ar    |23665227|2   |
|168396372|ar    |4276822 |10  |
|168417737|ar    |5383328 |19  |
|1062806  |ar    |20870989|73  |
|168373631|ar    |7202991 |60  |
|168335848|ip    |4112638 |15  |
|37025504 |ar    |1108956 |76  |
|168453430|ar    |1705363 |1   |
+---------+------+--------+----+
only showing top 20 rows



In [143]:
download_table.select('uid').distinct().count()

242243

In [20]:
valid_user.coalesce(1).write\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .save(bmos.url('musicrecommendation', 'cleaned_user.csv'))
    
valid_freq.coalesce(1).write\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .save(bmos.url('musicrecommendation', 'cleaned_freq.csv'))

## 4. Visualize the data
###  TODO
Using Seaborn and matplotlib

In [164]:
!pip install seaborn



In [None]:
import seaborn as sns
%matplotlib inline

In [None]:
import pandas as pd
freqPandas = valid_freq.toPandas()
dlPandas = download_table.toPandas()
sns.lmplot(x='uid', y='song_id', data = dlPandas, fit_reg=False)

plot matrix for song played by user, with frequency as the hue 

In [None]:
min(freqPandas['uid'])

In [None]:
sns.palplot(sns.diverging_palette(10, 133, sep=80, n=10))
# user_id: max 100047599, min 99983627
# song_id: max 2147483647, min 2
sns.(x='uid', y='song_id', data = dlPandas, fit_reg=False)

## 5. Build the recommender system
(Cited from IBM bluemix Data Science Experience (DSX) document) <br>

"
Collaborative filtering calculates recommendations based on similarities between users and products. For example, collaborative filtering assumes that users who have similar preference on the same item will also have similar opinions on items that they haven't seen.

The alternating least squares (ALS) algorithm provides collaborative filtering between users and products to find products that the customers might like, based on their previous ratings.

In this case, the ALS algorithm will create a matrix of all users versus all songs. Most cells in the matrix will be empty. An empty cell means the user hasn't played the song yet. The ALS algorithm will fill in the probable (predicted) ratings, based on similarities between user ratings. The algorithm uses the least squares computation to minimize the estimation errors, and alternates between solving for song factors and solving for user factors.
"


Challenge in this recommender system: <br>
1. Small number of play history that could be shown by sparsity of the utility matrix
2. Limited features for songs - mixed language

Solution: <br>
1. Hybrid

In [3]:
# restarting kernel
reload = True

In [4]:
# load saved files if starting from middle
if reload: 
    song_unique = spark.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .load(bmos.url('musicrecommendation', 'cleaned_song.csv'))
    song_unique.take(5)
    
    valid_freq = spark.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .load(bmos.url('musicrecommendation', 'cleaned_freq.csv'))
    valid_freq.take(5)
    
    valid_user = spark.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .load(bmos.url('musicrecommendation', 'cleaned_user.csv'))
    valid_user.take(5)
    
    download_table = spark.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .load(bmos.url('musicrecommendation', 'clean_download.csv'))

In [5]:
song_unique.printSchema()
song_unique.count()

root
 |-- song_id: string (nullable = true)
 |-- song_type: string (nullable = true)
 |-- song_name: string (nullable = true)
 |-- singer: string (nullable = true)



1559987

In [6]:
valid_freq = valid_freq.withColumn('uid', valid_freq['uid'].cast('integer'))
valid_freq = valid_freq.withColumn('song_id', valid_freq['song_id'].cast('integer'))
valid_freq = valid_freq.withColumn('freq', valid_freq['freq'].cast('double'))

valid_user = valid_user.withColumn('uid', valid_user['uid'].cast('integer'))

song_unique = song_unique.withColumn('song_type', song_unique['song_type'].cast('integer'))
song_unique = song_unique.withColumn('song_id', song_unique['song_id'].cast('integer'))

valid_download = download_table.withColumn('uid', download_table['uid'].cast('integer'))
valid_download = valid_download.withColumn('song_id', valid_download['song_id'].cast('integer'))
valid_download = valid_download.withColumn('song_id', valid_download['song_id'].cast('integer'))

# valid_freq = valid_freq.withColumn('label',log(10.0, valid_freq.freq))

In [10]:
song_unique.createOrReplaceTempView('song_unique')
valid_freq.createOrReplaceTempView('valid_freq')
song_freq = spark.sql("""
    select s.song_type, s.song_id, s.song_name, s.singer, COALESCE(f.cnt,0) as freq
    from song_unique as s
    left join
        (select song_id, count(*) as cnt from valid_freq group by song_id) as f
        on f.song_id = s.song_id
""")

In [39]:
# take log of the frequency and scale to -1 to 100 representing degree of preference 

valid_freq2 = valid_freq.withColumn('label',cround(log(10.0, valid_freq.freq)/3.0*10,0))
valid_freq2 = valid_freq2.replace(0, -1, subset=['label'])
# should be None 
valid_freq2 = valid_freq2.na.fill(0, subset=['label'])

# print(valid_freq2.groupBy().max('label').show())

In [40]:
# combine with download table

valid_download = valid_download.withColumn('download', lit(1))
valid_download = valid_download.drop('device')
valid_download = valid_download.drop('song_name')

valid_score = valid_freq2.join(valid_download, ['uid','song_id'], 'left_outer')
valid_score = valid_score.na.fill(0, subset=['download'])

valid_score = valid_score.withColumn('label', when(valid_score.download==1, lit(10)).otherwise(valid_score.label))

valid_score = valid_score.replace(-0.5, -1, subset=['label'])

Jump to other sections: 
- [Hybrid recommender system](#6.-Hybrid-recommender-system)<br>

In [7]:
song_unique.printSchema()
valid_user.printSchema()
valid_freq.printSchema()
download_table.printSchema()

root
 |-- song_id: integer (nullable = true)
 |-- song_type: integer (nullable = true)
 |-- song_name: string (nullable = true)
 |-- singer: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- Unnamed: 0: string (nullable = true)
 |-- uid: integer (nullable = true)

root
 |-- uid: integer (nullable = true)
 |-- device: string (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- freq: double (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- device: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- song_name: string (nullable = true)
 |-- paid_flag: string (nullable = true)



In [7]:
valid_freq.printSchema()
valid_user.printSchema()
song_unique.printSchema()
valid_download.printSchema()

root
 |-- uid: integer (nullable = true)
 |-- device: string (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- freq: double (nullable = true)
 |-- label: double (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- Unnamed: 0: string (nullable = true)
 |-- uid: integer (nullable = true)

root
 |-- song_id: integer (nullable = true)
 |-- song_type: integer (nullable = true)
 |-- song_name: string (nullable = true)
 |-- singer: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- uid: integer (nullable = true)
 |-- device: string (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- song_name: string (nullable = true)
 |-- paid_flag: string (nullable = true)



In [18]:
##### checking
# check sparsity 
counts_freq = valid_freq.count()
print ('Number of song frequency entried: ', counts_freq)
# songs played/( songs x users)
percentage = (counts_freq)*1.0/1559987/264708
print ('Percentage of song played: ', percentage)

Number of song frequency entried:  32801295
Percentage of song played:  7.943336195316835e-05


In [28]:
##### checking

moreThanOnce = valid_freq.groupBy('song_id').count().alias('cnt').where(col('count') > 1).select('song_id').count()
print ('Number of songs played more than once: ', moreThanOnce)
# significantly less, only recommend songs played more than once
# utility matrix will be less sparse
# number of song frequency entries will be reduce by 686993(only played once) -32114102, which is 2%

Number of songs played more than once:  872994


In [42]:
print ('Percentage of filled matrix: ', round((32801295-686993)*100.0/(872994*264708),4))
# vs. ~0.0073% filled 

Percentage of filled matrix:  0.0139


In [26]:
song_freq.printSchema()
song_freq.count()

root
 |-- song_type: integer (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- song_name: string (nullable = true)
 |-- singer: string (nullable = true)
 |-- freq: long (nullable = false)



1559987

In [17]:
##### checking

print (song_freq.select('song_id').count())
print (song_freq.select('song_id').where(col('freq')>1).count())


1559987
872994


### 5.1 Setup training and test set

setup: 80% training and 20% test set 

In [29]:
(trainingFreq, testFreq) = valid_score.randomSplit([80.0, 20.0])

trainingFreq.printSchema()
# utility_matrix_small.select([count(when(isnan(c), c)).alias(c) for c in utility_matrix_small.columns]).show()
testFreq.printSchema()

root
 |-- uid: integer (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- device: string (nullable = true)
 |-- freq: double (nullable = true)
 |-- label: double (nullable = false)
 |-- _c0: string (nullable = true)
 |-- paid_flag: string (nullable = true)
 |-- download: integer (nullable = true)

root
 |-- uid: integer (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- device: string (nullable = true)
 |-- freq: double (nullable = true)
 |-- label: double (nullable = false)
 |-- _c0: string (nullable = true)
 |-- paid_flag: string (nullable = true)
 |-- download: integer (nullable = true)



### 5.2 Setup collaborative filtering model

Accoding to DSX document again:
"
A NaN result is due to [SPARK-14489](https://issues.apache.org/jira/browse/SPARK-14489) and because the model can't predict values for users for which there's no data."

In [None]:
model = ALS(userCol="uid", itemCol="song_id", ratingCol="label").fit(trainingFreq)

In [10]:
predictions = model.transform(testFreq)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="label", predictionCol="prediction")
print ("The root mean squared error for our model is: " + str(evaluator.evaluate(predictions.na.fill(0))))

The root mean squared error for our model is: 2.917168466920176


### 5.3 Tune parameters

Cross validation and grid search to tune for hyperparameters

ALS algorithm :
```python
    class pyspark.ml.recommendation.ALS(
        rank=10,
        maxIter=10,
        regParam=0.1,
        numUserBlocks=10,
        numItemBlocks=10,
        implicitPrefs=false,
        alpha=1.0,
        userCol="user",
        itemCol="item",
        seed=None,
        ratingCol="rating",
        nonnegative=false,
        checkpointInterval=10,
        intermediateStorageLevel="MEMORY_AND_DISK",
        finalStorageLevel="MEMORY_AND_DISK"
    )
```

The ALS hyperparameters are:
- `rank` = the number of latent factors in the model
- `maxIter` = the maximum number of iterations 
- `regParam` = the regularization parameter

In [None]:
# takes way too long, killed at second run

(trainingScore, validationScore) = valid_score.randomSplit([90.0, 10.0])

als = ALS(userCol="uid", itemCol="song_id", ratingCol="label")
evaluator = RegressionEvaluator(metricName="rmse", labelCol="label", predictionCol="prediction")

# paramGrid = ParamGridBuilder().addGrid(als.rank, [1, 3, 5, 7]).addGrid(als.regParam, [0.05, 0.1, 0.5]).build()
paramGrid = ParamGridBuilder().addGrid(als.rank, [1,2,3]).addGrid(als.regParam, [0.05, 0.1]).build()

crossval = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = crossval.fit(trainingScore)
predictions = cvModel.transform(validationScore)

print ("The root mean squared error for our model is: " + str(evaluator.evaluate(predictions.na.drop())))

In [136]:
print ('Best rank is: ', cvModel.bestModel.rank)
print ('Best regularizer is: ', cvModel.bestModel.params)
# evaluate with 0 for null prediction
# print ("The root mean squared error for our model is: " + str(evaluator.evaluate(predictions.na.fill(0))))
# The root mean squared error for our model is: 3.1921429126212857

Best rank is:  1
Best regularizer is:  []


In [None]:
valid_score.printSchema()

In [None]:
predictions.coalesce(1).write\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .save(bmos.url('musicrecommendation', 'predictions_CF_1015.csv'))

### 5.4 Evaluate recommendation results

- MSE is low -> recommender seems to be pretty good
- However, looking at recommended songs for individual users, it seems to recommending weird songs
- Therefore, need alternative for inactive users for a more explainable model

Detailed evaluation as following:

In [27]:
utility_matrix_small = valid_user.crossJoin(song_freq.select('song_id')).select('uid','song_id')
utility_matrix_small = utility_matrix_small.join(valid_score, ['uid', 'song_id'], 'left_outer').select('uid', 'song_id', 'label')

utility_matrix = utility_matrix_small.na.fill('0', subset=['label'])

# Replace predicted NaN values with the average frequency and evaluate the model
# avgScore = utility_matrix.select('label').groupBy().avg().first()[0]
# print ("The average score in the dataset is: " + str(avgScore))

In [11]:
predictions.createOrReplaceTempView('pred_subset')
pred_first20 = spark.sql('select uid, song_id, label,download, prediction from pred_subset order by uid limit 100').show()

+-----+--------+-----+--------+-----------+
|  uid| song_id|label|download| prediction|
+-----+--------+-----+--------+-----------+
|12333|21596231|  1.0|       0|  2.4781432|
|12333|   55219|  3.0|       0|  1.4091263|
|12333|  708667|  4.0|       0|  3.9749548|
|12333| 2725093|  3.0|       0|  4.3128133|
|12333| 5114569|  6.0|       0|   4.895308|
|36816| 6906526| -1.0|       0|   4.813818|
|60183| 3627946| -1.0|       0|-0.13519855|
|60183|  116329| -1.0|       0| -0.2599472|
|60183|23610522| -1.0|       0| -0.7154152|
|60183|20866010|  2.0|       0| 0.11216657|
|60183| 1242385|  1.0|       0|-0.47428238|
|60183|  134967|  1.0|       0| 0.13887726|
|60183|21234365| -1.0|       0|-0.37378824|
|60183| 6227103| -1.0|       0| 0.34997967|
|60183|  710442| -1.0|       0|-0.09907829|
|60183| 6128890| -1.0|       0|-0.11929984|
|60183|  223973|  1.0|       0| 0.30912238|
|60183|  981246| -1.0|       0| -0.6802237|
|60183|23641903| -1.0|       0| 0.11194147|
|60183|  706554| -1.0|       0| 

In [44]:
def recommendCF(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = song_unique.select("song_id").distinct().withColumn("uid", lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    songsAlreadyRated = valid_score.filter(valid_score.uid == user).select("song_id", "uid")

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(songsAlreadyRated)).dropna().orderBy("prediction", ascending=False).limit(nbRecommendations).select("song_id", "prediction")

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(song_unique, predictions.song_id == song_unique.song_id).select(predictions.song_id, song_unique.song_name, song_unique.singer, predictions.prediction).orderBy("prediction", ascending=False)

    recommendations.show(truncate=False)

In [None]:
print ("Recommendations for user 169262317:")
# recommendCF(model, 169262317, 10)
# print "Recommendations for user 471:"
# recommendCF(cvModel, 471, 10)
# print "Recommendations for user 496:"
# recommendCF(cvModel, 496, 10)

print ("Recommendations for user 12333:")
recommendCF(model, 12333, 10)

In [40]:
# total user:264708 
# select 12333
select_valid = valid_score.createOrReplaceTempView('select_valid')
selected = spark.sql("""
    select distinct uid, count(1) as cnt
    from select_valid
    where uid = 12333
    group by uid
    order by 2 asc 
""")
# inactive_user.count()
# treshold at 1: 68268 -> around a quarter 
# treshold at 5: 154568 -> more than half 
# treshold at 2: 98849 -> around one third

98849

In [None]:
selected.show()

print ("Recommendations for user 12333:")
recommendCF(model, 12333, 10)

In [37]:
inactive_user.show()

+---------+---+
|      uid|cnt|
+---------+---+
|168984483|  1|
|168275548|  1|
|168572441|  1|
|168532375|  1|
|168510789|  1|
|168713236|  1|
|168686527|  1|
|168683346|  1|
|167801928|  1|
|168173268|  1|
|168889990|  1|
|168779601|  1|
|169042674|  1|
|168266657|  1|
|167640755|  1|
|168029402|  1|
|168767795|  1|
|167579971|  1|
|168902557|  1|
|168975045|  1|
+---------+---+
only showing top 20 rows



In [39]:
print ("Recommendations for user 167579971:")
recommendCF(model, 167579971, 10)

print ("Recommendations for user 168975045:")
recommendCF(model, 168975045, 10)

# 169042674 0
# 167579971 10
# 169042674 0
# 167640755 10


# valid_score.select().where(col('uid')==169042674).show()
# problem is it's not in the user list

Recommendations for user 167579971:
+--------+-----------------+----------------+----------+
|song_id |song_name        |singer          |prediction|
+--------+-----------------+----------------+----------+
|11109921|克卜勒 (Cover)      |郑小宇             |3.387147  |
|5009810 |茂名这场雨            |秦齐              |3.3330908 |
|22399046|鼓与花              |萧忆情Alex         |3.2971313 |
|7190353 |念念不忘的姑娘(1分02秒铃声版)|阿权              |3.035895  |
|413840  |Ich verzeih' Dir |Veronika Fischer|2.9503236 |
|6110029 |明天(37秒铃声版)       |萧亚轩             |2.6847723 |
|22827582|Come Together    |Michael Jackson |2.626977  |
|2697950 |在雨中漫步            |付娜              |2.5599701 |
|366974  |雪染的风采            |陈诺              |2.257006  |
|6922682 |请你像我这样做-跟我来-(红果果)|儿童故事            |2.2517805 |
+--------+-----------------+----------------+----------+

Recommendations for user 168975045:
+--------+-------------------------+----------------+----------+
|song_id |song_name                |singer          |prediction|

In [None]:
dataSet = song_unique.select("song_id").distinct().withColumn("uid", lit(user))

# Create a Spark DataFrame with the movies that have already been rated by this user
songsAlreadyRated = valid_score.filter(valid_score.uid == user).select("song_id", "uid")

# Apply the recommender system to the data set without the already rated movies to predict ratings
predictions = model.transform(dataSet.subtract(songsAlreadyRated)).dropna().orderBy("prediction", ascending=False).limit(nbRecommendations).select("song_id", "prediction")

# Join with the movies DataFrame to get the movies titles and genres
recommendations = predictions.join(song_unique, predictions.song_id == song_unique.song_id).select(predictions.song_id, song_unique.song_name, song_unique.singer, predictions.prediction)

recommendations.show(truncate=False)

## 6. Hybrid recommender system

supplemnt the collaborative filtering with item-item recommender <br>
for user with only 1 song played history (or two):<br>
- recommend songs based on cosine similarity to the song/songs played by the user 
- (not implemented) for user with 0 songs player: recommend top k songs  

- tries implementing on all 800k songs, failed after countless trial
- eventually turning to subsample 

In [11]:
from pyspark.mllib.linalg import Vectors
# Package for distributed linear algebra DIMSUM
# Dimension Independent Matrix Square using MapReduce
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import split
from pyspark.sql.types \
import ArrayType, StringType, DoubleType, StructType, StructField

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.mllib.linalg.distributed \
import IndexedRowMatrix,IndexedRow, RowMatrix, BlockMatrix,CoordinateMatrix


### 6.1 Setup feature vector

Get feature vectors for top 500 songs and save it for future use; it's okay to run 

In [12]:
song_freq = song_freq.where(col('freq')>1).orderBy('freq', ascending=False)

top_songs = song_freq.orderBy('freq', ascending =False).limit(500)
top_songs = top_songs.na.fill('0', subset=['singer','song_name'])

top_songs = top_songs.withColumn('id', monotonically_increasing_id()+1)

In [14]:
if not reload:
    top_songs.coalesce(1).write\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .save(bmos.url('musicrecommendation', 'top_500.csv'))

In [15]:
# convert to VectorAssembler 
from pyspark.ml.feature import VectorAssembler

def extract(row):
    return (row.song_id, row.song_type, ) + tuple(row.name_vec.toArray().tolist()) + tuple(row.singer_vec.toArray().tolist())

In [16]:
song_freq.count()

872994

In [17]:
reload

True

In [18]:
# get a dataframe of vector
if not reload:
    
    song_str2arr = top_songs.withColumn('singer_arr', split(col("singer"), " ").cast(ArrayType(StringType())).alias("singer_arr"))
    w2v= Word2Vec(vectorSize=3, minCount=0, inputCol="singer_arr", outputCol="singer_vec")
    top_song_vec = w2v.fit(song_str2arr).transform(song_str2arr)

    top_song_vec = top_song_vec.withColumn('name_arr', split(col("song_name"), " ").cast(ArrayType(StringType())).alias("name_arr"))
    w2v = Word2Vec(vectorSize=5, minCount=0, inputCol="name_arr", outputCol="name_vec")
    top_song_vec = w2v.fit(top_song_vec).transform(top_song_vec)
    
    top_vec_temp = top_song_vec.select('song_type','name_vec','singer_vec', 'song_id')
    # save as dataframe 
    top_vec_df = top_vec_temp.rdd.map(extract).toDF()
    top_vec_df = top_vec_df.withColumn("id", monotonically_increasing_id())
    top_vec_df = top_vec_df.withColumn('_1', top_vec_df['_1'].cast('double'))
    
    top_songs.coalesce(1).write\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .save(bmos.url('musicrecommendation', 'top_500.csv'))

    top_vec_df.coalesce(1).write\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .save(bmos.url('musicrecommendation', 'top_500_vec.csv'))
    

else:
    top_vec_df = spark.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .option("inferschema", "true")\
      .load(bmos.url('musicrecommendation', 'top_500_vec.csv'))    
    
    sim_df_ori = spark.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .option("inferschema", "true")\
      .load(bmos.url('musicrecommendation', 'top_500_simi.csv'))    
    
    sim_df = sim_df.withColumn('id', sim_df.id+1).orderBy('id')
    sim_df.select('id').show()
    

In [229]:
sim_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option("inferschema", "true")\
  .load(bmos.url('musicrecommendation', 'top_500_simi.csv')) 

Jump to other sections: 
- [Recommend for selected users](#6.3.-Recommend-for-selected-users)

In [25]:
top_vec_df.printSchema()

root
 |-- _1: double (nullable = true)
 |-- _2: integer (nullable = true)
 |-- _3: double (nullable = true)
 |-- _4: double (nullable = true)
 |-- _5: double (nullable = true)
 |-- _6: double (nullable = true)
 |-- _7: double (nullable = true)
 |-- _8: double (nullable = true)
 |-- _9: double (nullable = true)
 |-- _10: double (nullable = true)
 |-- id: integer (nullable = true)



In [19]:
top_songs.show()
1    0.000000
2    0.000000
3    0.000000
4    0.000000
5    0.000000
6    0.991990
7    0.996428
8    0.990109
9    0.991140
10   0.983688
11   0.989649
12   0.996449
13   0.992721
14   0.961659
15   0.985563
16   0.981806
17   0.990653
18   0.956802
19   0.984632
20   0.956802

+---------+--------+--------------------+-----------+------+
|song_type| song_id|           song_name|     singer|  freq|
+---------+--------+--------------------+-----------+------+
|        2|15249349|凉凉-(电视剧《三生三世十里桃花》...|    张碧晨&杨宗纬|109798|
|        2| 9950164|               刚好遇见你|        李玉刚|102218|
|        2| 5237384|                逆流成河|        金南玲| 71834|
|        2| 6468891|                  演员|        薛之谦| 68042|
|        2|15807836|三生三世-(电视剧《三生三世十里桃...|         张杰| 58433|
|        2| 5114569|          没有你陪伴真的好孤单|         梦然| 53102|
|        2| 3287564|                 小时候|        苏打绿| 52403|
|        2|16400733|思慕-(电视剧《三生三世十里桃花》...|        郁可唯| 48343|
|        2| 6657692|             走着走着就散了|        庄心妍| 47438|
|        2| 3620537|              你还要我怎样|        薛之谦| 43961|
|        2| 7149583|                告白气球|        周杰伦| 41887|
|        2| 6749207|               Faded|Alan Walker| 38523|
|        2| 3971731|               以后的以后|        庄心妍| 38138|
|        1|23498554|    

In [39]:
top_songs.filter(col('id') == 315).show()

+---------+-------+---------+------+----+---+
|song_type|song_id|song_name|singer|freq| id|
+---------+-------+---------+------+----+---+
|        2| 203139|      蓝莲花|    许巍|7257|315|
+---------+-------+---------+------+----+---+



In [27]:
top_vec_df.printSchema()

root
 |-- _1: double (nullable = true)
 |-- _2: double (nullable = true)
 |-- _3: double (nullable = true)
 |-- _4: double (nullable = true)
 |-- _5: double (nullable = true)
 |-- _6: double (nullable = true)
 |-- _7: double (nullable = true)
 |-- _8: double (nullable = true)
 |-- _9: double (nullable = true)
 |-- id: long (nullable = false)



### 6.2 Compute similarity matrix
#### using pyspark's distributed matrix

understanding the goal: is to get single row of similarities when needed <br>
problem: 

p1: columnSimilarities: need to transpose the matrix -> using toBlockMatrix().transpose() results in absurdly large matrix<br>
Solved: manually transpose at DataFrame<br>

p2: distributed container does not allow random access to computed similarity matrix <br>
Trial and Error:
- to RDD? takes a long time, and very likely, does not have enough memory for a local matrix
- do matrix multiplication for BlockMatrix? takes a long time
- broadcast to different workers, couldn't do patten match from csr_matrix 
- reduced to 872994

In [20]:
# manuely transpose the dataframe - 20s
top_vec_rdd = top_vec_df.rdd
data = []
for i in range(9):
    data.append(top_vec_rdd.map(lambda row: row[i]).collect())

In [21]:
# manurally construct IndexedRow
# indexedT = IndexedRow(0, Vectors.dense(data[0]))

toIndexedRows = [IndexedRow(i, data[i]) for i in range(9)]
indexedRows = sc.parallelize(toIndexedRows)
mat = IndexedRowMatrix(indexedRows)

rowMat = mat.toRowMatrix()

# usualy takes long, but when u'r luck: 40s
simThres = rowMat.columnSimilarities(0.05)

In [45]:
top_song_vec.printSchema()

root
 |-- song_type: integer (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- song_name: string (nullable = false)
 |-- singer: string (nullable = false)
 |-- freq: long (nullable = false)
 |-- id: long (nullable = false)
 |-- singer_arr: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- singer_vec: vector (nullable = true)
 |-- name_arr: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- name_vec: vector (nullable = true)



In [None]:
# convert to matrix 
# this shows RDD is empty error on the second run 

# mat = IndexedRowMatrix(data.map(lambda row:IndexedRow(monotonically_increasing_id(), Vectors(list(row)))
# song_mat_temp = IndexedRowMatrix(song_vec_mod.rdd.map(lambda row: IndexedRow(row['id'], Vectors.dense(row[:9]))))

# it will return an rdd of indexed row, need to extract 
def extractRows(row):
#     count = count+1
    return (row.index, ) + tuple(row.vector.toArray().tolist())

# save as dataframe 
# count = 0

columnName = [str(i+1) for i in range(500)]
columnName = ['id'] + columnName
sim_df = simRdd.map(extractRows).toDF(columnName)

In [47]:
print(simThres.numCols(), simThres.numRows())

500 500


### 6.3 Recommend for selected users 

In [242]:
# filter users: listened to only one song before, but more than once

select_score= valid_score.createOrReplaceTempView('select_score')
inactive_user = spark.sql("""
    select distinct uid, count(1) as cnt
    from select_score
    group by uid
    having cnt = 1
    order by 2 asc 
""")

inactive_user = inactive_user.createOrReplaceTempView('inactive_user')

select_pos = spark.sql("""
    select uid, song_id, freq, label 
    from select_score 
    where uid in (
    select uid from inactive_user)
    and label > -1
""")

select_neg = spark.sql("""
    select uid, song_id, freq, label 
    from select_score 
    where uid in (
    select uid from inactive_user)
    and label = -1
""")

In [134]:
select_pos.join(song_freq, 'song_id').orderBy(song_freq.freq, ascending = False).show()

+--------+---------+----+-----+---------+--------------------+-------+------+
| song_id|      uid|freq|label|song_type|           song_name| singer|  freq|
+--------+---------+----+-----+---------+--------------------+-------+------+
|15249349|168965732| 2.0|  1.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|168848103| 3.0|  2.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|168597540| 2.0|  1.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|168028168| 3.0|  2.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|168826351|10.0| 10.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|168258482| 4.0|  2.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|168916514| 2.0|  1.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|167764401| 3.0|  2.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|167639000| 5.0|  2.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧晨&杨宗纬|109798|
|15249349|168149928| 0.0|  0.0|        2|凉凉-(电视剧《三生三世十里桃花》...|张碧

In [57]:
select_pos.printSchema()

root
 |-- uid: integer (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- freq: double (nullable = true)
 |-- label: double (nullable = false)



In [55]:
select_pos.count()
select_neg.count()

49455

In [None]:
def recommendCF(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = song_unique.select("song_id").distinct().withColumn("uid", lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    songsAlreadyRated = valid_score.filter(valid_score.uid == user).select("song_id", "uid")

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(songsAlreadyRated)).dropna().orderBy("prediction", ascending=False).limit(nbRecommendations).select("song_id", "prediction")

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(song_unique, predictions.song_id == song_unique.song_id).select(predictions.song_id, song_unique.song_name, song_unique.singer, predictions.prediction).orderBy("prediction", ascending=False)

    recommendations.show(truncate=False)

In [189]:
import pandas as pd
sim_pd = sim_df.toPandas()

In [308]:
# requires sim_df matrix similaries for all songs 

def getSongSimilarity(song_index):
    """
    param: song_index 
    return: similarity df of this song to all other songs 
    """
    df1 = sim_df.filter(col('id')==song_index).toPandas().transpose().drop(['id'])
    df1.insert(0, 'id', range(1, 501))
    df2 = sim_df.select(str(song_index), 'id').toPandas()
    dfComb = df1.join(df2.set_index('id'), on='id')
    dfComb['max'] = dfComb[[0, str(song_index)]].max(axis=1)
    dfComb = dfComb.drop([0, str(song_index)], axis = 1)

    song_df = spark.createDataFrame(dfComb)
    return song_df

In [305]:
def recommendSim(user, song, nbRecommendations):
#     song = select_user.song_id
#     get similarity score for this song 
    print ('User liked this song: ')
#     song_freq.filter(col('song_id')==song).show()
    
#   get row index for that song 
    find_song = top_songs.filter(col('song_id')==song)

#     if in the top 500 songs 
    if find_song:
        song_index = find_song.select('id').collect()[0].id
        song_sim = sim_df.select(str(song_index+1), 'id')
        prediction = getSongSimilarity(song_index).orderBy('max', ascending=False)
        
        recommendations = predictions.join(top_songs, top_songs.id == predictions.id)\
            .select(top_songs.song_id, top_songs.song_name, top_songs.singer, predictions.max)\
            .orderBy("max", ascending=False).limit(nbRecommendations)

        recommendations.show(truncate=False)
        
        
# else: fit into the matrix, add to dataframe, get a new similarity matrix, get last row
    else:
        top_songs.select('song_id', 'singer', 'song_type','freq').limit(nbRecommendation).show(truncate=False)
        

In [120]:
def recommendSong(user, nbRecommendations):
    pos_user = select_pos.filter(col('uid')==user)
    neg_user = select_neg.filter(col('uid')==user)

#     if shown more than one preference
    if (not pos_user) and (not neg_user):
        return recommendCF(model, user, nbRecommendations)
    
#     if shown only one preference 
    elif pos_user: 
        song = pos_user.select('song_id').collect()[0].song_id
        return recommendSim(user, song, nbRecommendations)
    
#     if never liked any song from the platform, recommend top 10 songs
    else:
        top_songs.select('song_id', 'singer', 'song_type','freq').limit(nbRecommendation).show(truncate=False)

In [311]:
# # test on single user
# user = 12333
favorite = valid_score.filter(col('uid') == 12333).join(top_songs, 'song_id').orderBy('label', ascending=False)
favorite.show(truncate=False)

# get index of favorite song which is in top 500 list 
# future work: generate a new similarity matrix with the song not in top 500 list
song_index = favorite.select('id').limit(1).collect()[0]

recommendSim(user, song_index , 10)


+--------+------------------+---------+-----------------+------------------+
|song_id |song_name         |song_type|singer           |max               |
+--------+------------------+---------+-----------------+------------------+
|157767  |等你等到我心痛           |2        |张学友              |0.9985164466289682|
|6477086 |小水果               |2        |筷子兄弟             |0.9982372430988816|
|7196022 |时光笔墨-(电视剧《青云志》片尾曲)|2        |张碧晨              |0.997938413881416 |
|23497506|画心(Live)          |2        |张靓颖              |0.9978128617846123|
|7153193 |别把疼你的人弄丢了         |2        |雨宗林              |0.9977944865013622|
|7186112 |将军令               |2        |吴克群&拖鞋&SING组合&叶晓粤|0.9976869503427881|
|5746692 |Try               |2        |Colbie Caillat   |0.9976558123419578|
|157908  |吻别                |2        |张学友              |0.9975941893360246|
|9919225 |我要你-(电影《驴得水》主题曲)  |2        |任素汐              |0.9974928271853185|
|500897  |过火                |2        |张信哲              |0.9974926040740845|

##  Future work:
- upon recommended songs, generate 100 songs per user - keep the song_id 
- try classification methods for fine tune to get top 20 
- take into considerations of more features: song_name, singer, song_type
- predict the class: 0 - 5 score (training with normalized frequency 1-5 and download automatically 5)
- evalute both model's top 20 recommendation, rms error 

For collaborative filter<br>
- visualize the frequency distribution - try normalization 

learning about distributed linear algebra 
challenge: 
- matrix stored in distributed container, moving everything to memory is impossible?