# Music Recommendation Engine

In [1]:
# Starter code
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.types import StringType
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg

# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
# event_data = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df = df.withColumn('timestamp', f.to_timestamp((df.ts/1000).cast(dataType=t.TimestampType())))
df = df.withColumn('date', f.to_date((df.ts/1000).cast(dataType=t.TimestampType())))

#create event table
df.createOrReplaceTempView("event_data")


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1582438049660_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data Preprocessing

##### 1. Capture Song - User Value

In [2]:
# we will value 1 and 0 for the thumbs up or thumbs down music respectively.

df_rating = spark.sql('''
            with get_song_data as (
            select timestamp,artist, song, userId,page,
                lag( song,1) over (partition by userId order by timestamp) as lag_song,
                lag( artist,1) over (partition by userId order by timestamp) as lag_artist
            from event_data
            where page in ('NextSong','Thumbs Up','Thumbs Down','Add to Playlist')
            order by timestamp
            )
            select 
                    userId,
                    concat(lag_artist,'-',lag_song) as song,
        
                    min(
                    case when page = 'Thumbs Up' then 1.000 
                         when page = 'Thumbs Down' then 0
                    end )
                    as value
            from get_song_data
            where page in ('Thumbs Up','Thumbs Down')
                  and lag_song is not NULL
                  and lag_artist is not NULL
            group by 1,2
            order by 1,2
            ''')

df_rating.show()
df_rating.createOrReplaceTempView("dataset")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+-----+
| userId|                song|value|
+-------+--------------------+-----+
|1000025|    17 Hippies-Adieu|1.000|
|1000025|A Day To Remember...|1.000|
|1000025|A Rocket To The M...|0.000|
|1000025|Aaron Shust-Give ...|1.000|
|1000025|Afroman-Crazy Rap...|1.000|
|1000025|Air France-Windmi...|1.000|
|1000025|Alan Silvestri-Ou...|0.000|
|1000025|Alicia Keys-Un-th...|1.000|
|1000025|Alliance Ethnik-S...|1.000|
|1000025|Andres Calamaro-L...|1.000|
|1000025|  Andrew Bird-Effigy|1.000|
|1000025|B.o.B-Nothin' On ...|1.000|
|1000025|Barry Tuckwell/Ac...|1.000|
|1000025|Bellatrax feat. T...|1.000|
|1000025|     BeyoncÃÂ©-Halo|1.000|
|1000025|Big Drill Car-The...|1.000|
|1000025|Bitter Sweet-Suga...|1.000|
|1000025|  Bloc Party-Banquet|0.000|
|1000025|    Bo Diddley-Pills|1.000|
|1000025|Boys Noize-Shine ...|1.000|
+-------+--------------------+-----+
only showing top 20 rows

##### 2. Create Index for song

In [3]:
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, \
    IDF, StringIndexer

indexer = StringIndexer(inputCol="song", outputCol="songId")
df_rating = indexer.fit(df_rating).transform(df_rating)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### 3. Cast userId as Integer 

In [4]:
from pyspark.sql.types import DoubleType,IntegerType
df_rating = df_rating.withColumn("userId", df_rating["userId"].cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df_rating.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- value: decimal(13,3) (nullable = true)
 |-- songId: double (nullable = false)

## Build Training, Tuning, and Evaluating Function

In [12]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark.sql.functions as func

def tune_ALS(train_data, validation_data, maxIter, regParams, ranks):
    """
    grid search function to select the best model based on RMSE of
    validation data
    Parameters
    ----------
    train_data: spark DF with columns ['userId', 'movieId', 'rating']
    
    validation_data: spark DF with columns ['userId', 'movieId', 'rating']
    
    maxIter: int, max number of learning iterations
    
    regParams: list of float, one dimension of hyper-param tuning grid
    
    ranks: list of float, one dimension of hyper-param tuning grid
    
    Return
    ------
    The best fitted ALS model with highest accuracy score on validation data
    """
    # initial
    max_accuracy = 0
    best_rank = -1
    best_regularization = 0
    best_model = None
    

    
    udf_convert_bol = udf(lambda x: 1 if x>0.5 else 0, IntegerType())
    
    for rank in ranks:
        for reg in regParams:
            # get ALS model
            als = ALS(userCol="userId", itemCol="songId", ratingCol="value").setMaxIter(maxIter).setRank(rank).setRegParam(reg)
            # train ALS model
            model = als.fit(train_data)
            # evaluate the model by computing the RMSE on the validation data
            
            predictions_train = model.transform(train_data)
            predictions_validation = model.transform(validation_data)
                      
            #get boolean
            predictions_train = predictions_train.withColumn('prediction_bol',udf_convert_bol(predictions_train.prediction))
            predictions_validation = predictions_validation.withColumn('prediction_bol',udf_convert_bol(predictions_validation.prediction))
            
            #get correct variable
            predictions_train = predictions_train.withColumn('correct',col('prediction_bol')==col('value'))
            predictions_validation = predictions_validation.withColumn('correct',col('prediction_bol')==col('value'))
            
            accuracy_train = predictions_train \
                            .agg(avg(col("correct").cast(IntegerType())).alias('average') ) \
                            .collect()[0].asDict()['average']
            
            accuracy_validation = predictions_validation \
                            .agg(avg(col("correct").cast(IntegerType())).alias('average')) \
                            .collect()[0].asDict()['average']
            
            precission_train = predictions_train \
                            .filter(col('prediction_bol')==1) \
                            .agg(avg(col("correct").cast(IntegerType())).alias('average')) \
                            .collect()[0].asDict()['average']
            
            precission_validation = predictions_validation \
                            .filter(col('prediction_bol')==1) \
                            .agg(avg(col("correct").cast(IntegerType())).alias('average')) \
                            .collect()[0].asDict()['average']
            
            recall_train = predictions_train \
                            .filter(col('value')==1) \
                            .agg(avg(col("correct").cast(IntegerType())).alias('average')) \
                            .collect()[0].asDict()['average']
            
            recall_validation = predictions_validation \
                            .filter(col('value')==1) \
                            .agg(avg(col("correct").cast(IntegerType())).alias('average')) \
                            .collect()[0].asDict()['average']
            
            print('{} latent factors and regularization = {}: \n'

                  'train accuracy is {}\n'
                  'train recall is {}\n'
                  'train precission is {}\n'
                   

                  'validation accuracy is {}\n'
                  'validation recall is {}\n'
                  'validation precission is {}\n'.format(rank \
                                                          , reg\
                                                          
                                                          , accuracy_train\
                                                          , recall_train\
                                                          , precission_train\
                                                          
                                                          , accuracy_validation\
                                                          , recall_validation\
                                                          , precission_validation))
                                                  
            if accuracy_validation > max_accuracy:
                max_accuracy = accuracy_validation
                best_rank = rank
                best_regularization = reg
                best_model = model
                
    print('\nThe best model has {} latent factors and '
          'regularization = {}'.format(best_rank, best_regularization))
    return predictions_validation, best_model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Model Training

##### Split the Dataset and Filter the Test Data only for Song and User available in Train

In [8]:
#create split train and test, and filter test only for value available in train
train, test = df_rating.randomSplit([0.7,0.3],seed=42)

train.createOrReplaceTempView("train")
test.createOrReplaceTempView("test")

clean_test = spark.sql('''
    select *
    from test a 
    where a.userId in (select b.userId from train b)
          and a.songId in (select c.songId from train c)

''')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Try first model with following hyperparameter

We will rebuild the first model with following hyperparameter:
 - maxIter = 10
 - regParams = 0.05
 - ranks = 20

In [10]:
prediction,model = tune_ALS(train,clean_test, 10, [0.05],[20])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20 latent factors and regularization = 0.05: 
train accuracy is 0.8805544240091231
train recall is 0.9504484726043391
train precission is 0.9086967811450426
validation accuracy is 0.8860862574506779
validation recall is 0.962892507322604
validation precission is 0.9047913602206707


The best model has 20 latent factors and regularization = 0.05

In [11]:
prediction.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

384804

##### Let's Take a look on Confusion Matrix

In [12]:
TP = prediction.filter(col("correct")==1).filter(col('value')==1).count()
TN = prediction.filter(col("correct")==1).filter(col('value')==0).count()
FP = prediction.filter(col("correct")==0).filter(col('value')==0).count()
FN = prediction.filter(col("correct")==0).filter(col('value')==1).count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
print(TP)
print(TN)
print(FP)
print(FN)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

305589
35167
32133
11688

###### try second model with different hyperparameter
We will rebuild the first model with following hyperparameter:
 - maxIter = 10
 - regParams = 0.05
 - ranks = 30

In [9]:
# Refining Model by finding best hyperparameter
prediction_2,model_2 = tune_ALS(train,clean_test, 10, [0.05],[30])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30 latent factors and regularization = 0.05: 
train accuracy is 0.8862011184885742
train recall is 0.9508521465463636
train precission is 0.9148280123483324
validation accuracy is 0.8912287355217119
validation recall is 0.9636562767510952
validation precission is 0.9105597247441092


The best model has 30 latent factors and regularization = 0.05

##### Let's Take a look on Confusion Matrix

In [10]:
TP_2 = prediction_2.filter(col("correct")==1).filter(col('value')==1).count()
TN_2 = prediction_2.filter(col("correct")==1).filter(col('value')==0).count()
FP_2 = prediction_2.filter(col("correct")==0).filter(col('value')==0).count()
FN_2 = prediction_2.filter(col("correct")==0).filter(col('value')==1).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
print(TP_2)
print(TN_2)
print(FP_2)
print(FN_2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

305818
37194
29954
11488

## Let's give a recommendation for user 1000025

##### Model Rebuild
We will rebuild the first model with following hyperparameter:
 - maxIter = 10
 - regParams = 0.05
 - ranks = 20

In [13]:
#Rebuild the model
prediction, model = tune_ALS(train,clean_test, 10, [0.05],[20])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20 latent factors and regularization = 0.05: 
train accuracy is 0.8807497810904003
train recall is 0.950859461434618
train precission is 0.908615839435724
validation accuracy is 0.8857612150540708
validation recall is 0.9630784233539904
validation precission is 0.9042101962825807


The best model has 20 latent factors and regularization = 0.05

In [14]:
train.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+-----+--------+
| userId|                song|value|  songId|
+-------+--------------------+-----+--------+
|1000025|    17 Hippies-Adieu|1.000| 67783.0|
|1000025|Afroman-Crazy Rap...|1.000| 11885.0|
|1000025|Air France-Windmi...|1.000|  3448.0|
|1000025|Alan Silvestri-Ou...|0.000|113992.0|
|1000025|Alicia Keys-Un-th...|1.000|  1324.0|
|1000025|Alliance Ethnik-S...|1.000|    14.0|
|1000025|Andres Calamaro-L...|1.000|  9941.0|
|1000025|  Andrew Bird-Effigy|1.000|  9460.0|
|1000025|B.o.B-Nothin' On ...|1.000|    19.0|
|1000025|Barry Tuckwell/Ac...|1.000|     4.0|
|1000025|     BeyoncÃÂ©-Halo|1.000|    61.0|
|1000025|Big Drill Car-The...|1.000|  3537.0|
|1000025|  Bloc Party-Banquet|0.000|  1260.0|
|1000025|    Bo Diddley-Pills|1.000|  3066.0|
|1000025|Boys Noize-Shine ...|1.000|  1347.0|
|1000025|Buscemi-Couleurs ...|1.000| 10603.0|
|1000025|CafÃÂ© Tacvba-El...|1.000|  7340.0|
|1000025|Chris Cornell-Bla...|1.000| 40469.0|
|1000025|Chris Isaak-Have ...|1.00

##### Create Dataframe that contains all songs with target userId for predicting the value

In [34]:
to_predict = spark.sql('''
    select songId, song,1000025 as userId
    from train
    group by 1,2
''')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

###### Predict The song Value and Filter the rated song in Train data and sort by Prediction value

In [44]:
song_predict = model.transform(to_predict)
song_predict.createOrReplaceTempView("rec")

spark.sql('''
    select a.*
    from rec a 
    left join train b on a.songId=b.songId and a.userId = b.userId
    where b.userId is null
    and prediction <10 --to prevent unlimited value
    order by prediction desc
    limit 10

''').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+-------+----------+
| songId|                song| userId|prediction|
+-------+--------------------+-------+----------+
|15792.0|Charly GarcÃÂ­a-...|1000025| 1.1071172|
|17187.0|Neutral Milk Hote...|1000025| 1.1034241|
|35172.0|Farmakon-Loosely ...|1000025| 1.1016318|
| 1896.0|Streetlight Manif...|1000025| 1.0840064|
|23337.0|Van Halen-Can't G...|1000025| 1.0835639|
|24889.0|Rikarena-Ella Es ...|1000025|  1.082952|
|12560.0|Sneaker Pimps-Six...|1000025| 1.0769445|
|10872.0|blessthefall-God ...|1000025| 1.0760309|
|15883.0|G. Love-Ain't Tha...|1000025|  1.075892|
| 5652.0|Interpol-Pioneer ...|1000025| 1.0757957|
+-------+--------------------+-------+----------+