In [1]:
# import libraries
import os

from pyspark.shell import sqlContext
from pyspark.sql.types import *
from pyspark.sql import functions as F

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/

Using Python version 3.8.2 (default, Mar 26 2020 15:53:00)
SparkSession available as 'spark'.


In [2]:
# list directories
files_path = '../MDM_Project/Dataset/'  # File path here ../MDM_Project/Dataset/

triplets_file = files_path + 'train_triplets.txt'
songs2tracks_file = files_path + 'song_to_tracks.txt'
metadata_file = files_path + 'track_metadata.csv'

In [3]:
# Handle Windows.
if os.path.sep != '/':
    triplets_file = triplets_file.replace('/', os.path.sep)
    songs2tracks_file = songs2tracks_file.replace('/', os.path.sep)
    metadata_file = metadata_file.replace('/', os.path.sep)

In [4]:
# Creating schema so the cluster only runs through the data once
triplets_schema = StructType(
    [StructField('userId', StringType()),
     StructField('songId', StringType()),
     StructField('Plays', IntegerType())]
)
songs2tracks_schema = StructType(
    [StructField('songId', StringType()),
     StructField('trackId', StringType())]
)
metadata_schema = StructType(
    [StructField('trackId', StringType()),
     StructField('title', StringType()),
     StructField('songId', StringType()),
     StructField('release', StringType()),
     StructField('artist_id', StringType()),
     StructField('artist_mbid', StringType()),
     StructField('artist_name', StringType()),
     StructField('duration', DoubleType()),
     StructField('artist_familiarity', DoubleType()),
     StructField('artist_hotttness', DoubleType()),
     StructField('year', IntegerType()),
     StructField('track_7digitalid', IntegerType()),
     StructField('shs_perf', DoubleType()),
     StructField('shs_work', DoubleType())]
)

In [5]:
# load the data into DataFrames
plays_df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(delimiter='\t', header=True, inferSchema=False) \
    .schema(triplets_schema) \
    .load(triplets_file)

songs2tracks_df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(delimiter=',', header=True, inferSchema=False) \
    .schema(songs2tracks_schema) \
    .load(songs2tracks_file)

metadata_df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(delimiter=',', header=True, inferSchema=False) \
    .schema(metadata_schema) \
    .load(metadata_file)

In [6]:
# change ids from strings to integers
userId_change = plays_df.select('userId').distinct().select('userId',F.monotonically_increasing_id().alias('new_userId'))
user_als_id_LUT = sqlContext.createDataFrame(userId_change.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userId", StringType(), True),StructField("user_als_id", IntegerType(), True)]))

songId_change = plays_df.select('songId').distinct().select('songId', F.monotonically_increasing_id().alias('new_songId'))
song_als_id_LUT = sqlContext.createDataFrame(songId_change.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("songId", StringType(), True),StructField("song_als_id", IntegerType(), True)]))

In [7]:
# RUN THE BLOCK TO CHECK IF THE  NEW USER_ID, SONG_ID GENERATED PROPERLY
user_als_id_LUT.show(5)
song_als_id_LUT.show(5)

+--------------------+-----------+
|              userId|user_als_id|
+--------------------+-----------+
|b11e6a84c54160e61...|          0|
|2c218a60b3d777e9e...|          1|
|672a1973a091e8ed8...|          2|
|e51bbbd28659be401...|          3|
|cc9fc2eccf0d6fe78...|          4|
+--------------------+-----------+
only showing top 5 rows

+------------------+-----------+
|            songId|song_als_id|
+------------------+-----------+
|SOAQZXK12A6701D993|          0|
|SODASIJ12A6D4F5D89|          1|
|SOPQGWI12A8C135DDB|          2|
|SOHNFBA12AB018CD1D|          3|
|SOMMUEG12AF729B982|          4|
+------------------+-----------+
only showing top 5 rows



In [8]:
# RUN THE BLOCK TO Get total unique users and songs
unique_users = user_als_id_LUT.count()
unique_songs = song_als_id_LUT.count()
print('Number of unique users: {0}'.format(unique_users))
print('Number of unique songs: {0}'.format(unique_songs))

Number of unique users: 88750
Number of unique songs: 248144


In [9]:
# Joining the new ID's to the Plays_df
plays_df_2 = plays_df.join(user_als_id_LUT,'userId').join(song_als_id_LUT,'songId')

# remove half users to make more manageable
plays_df_2 = plays_df_2.filter(plays_df_2.user_als_id < unique_users / 2)

In [10]:
# Summary of each DataFrame
plays_df_2.cache()
plays_df_2.show(5)

songs2tracks_df.cache()
songs2tracks_df.show(5)

metadata_df.cache()
metadata_df.show(5)

+------------------+--------------------+-----+-----------+-----------+
|            songId|              userId|Plays|user_als_id|song_als_id|
+------------------+--------------------+-----+-----------+-----------+
|SOAADAS12A58A784EC|72b850a0b614bf2ca...|    1|       5675|        483|
|SOAAZPG12A6D4F8D8B|06e2112e9fa9a5305...|    2|       1649|        116|
|SOAAZPG12A6D4F8D8B|36d2c3cf1eacc5161...|    2|       1608|        116|
|SOAAZPG12A6D4F8D8B|27c21f013bf67bc60...|    1|       6329|        116|
|SOAAZPG12A6D4F8D8B|8c87aae74e9644017...|    2|       6628|        116|
+------------------+--------------------+-----+-----------+-----------+
only showing top 5 rows

+------------------+--------------------+
|            songId|             trackId|
+------------------+--------------------+
|TRMMMYQ128F932D901|  SOQMMHC12AB0180CB8|
|TRMMMKD128F425225D|  SOVFVAK12A8C1350D9|
|TRMMMRX128F93187D9|  SOGTUKN12AB017F4F1|
|TRMMMCH128F425532C|  SOBNYVR12A8C13558C|
|TRMMMWA128F426B589|  SOHSBXH12A8

In [12]:
#Total Listens(plays) of Each SongID
Total_listens = plays_df_2.groupBy('songId') \
                                              .agg(F.count(plays_df_2.Plays).alias('User_Count'),
                                                            F.sum(plays_df_2.Plays).alias('Total_Plays')) \
                                                       .orderBy('Total_Plays', ascending = False)

print('Total Listens of Each SONG_ID:')
Total_listens.show(6, truncate=False)

Total Listens of Each SONG_ID:
+------------------+----------+-----------+
|songId            |User_Count|Total_Plays|
+------------------+----------+-----------+
|SOBONKR12A58A7A7E0|3682      |31851      |
|SOAUWYT12A81C206F1|4026      |28582      |
|SOSXLTC12AF72A7F54|3585      |23166      |
|SOEGIYH12A6D4FC0E3|3087      |18587      |
|SOFRQTD12A81C233C0|4827      |18295      |
|SOAXGDH12A8C13F8A1|4032      |15470      |
+------------------+----------+-----------+
only showing top 6 rows



In [14]:
# Joining with metadata to get artist and song title for the Total_Listens
Song_names = Total_listens.join(metadata_df, 'songId' ) \
                                                      .filter('User_Count >= 200') \
                                                      .select('artist_name', 'title', 'songId', 'User_Count','Total_Plays') \
                                                      .orderBy('Total_Plays', ascending = False)

print('Complete Details of Songs Listened')
Song_names.show(10, truncate = False)

Complete Details of Songs Listened
+----------------------------------------------------------------------+-------------------------------------------------------------------+------------------+----------+-----------+
|artist_name                                                           |title                                                              |songId            |User_Count|Total_Plays|
+----------------------------------------------------------------------+-------------------------------------------------------------------+------------------+----------+-----------+
|Dwight Yoakam                                                         |You're The One                                                     |SOBONKR12A58A7A7E0|3682      |31851      |
|Björk                                                                 |Undo                                                               |SOAUWYT12A81C206F1|4026      |28582      |
|Kings Of Leon                                    

SPLITING THE DATASET INTO TRAIN, TEST & VALIDATION SETS

In [15]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 180229192
(split_1, split_2, split_3) = plays_df_2.randomSplit([0.6, 0.2, 0.2], seed = seed)

# Let's cache these datasets for performance
train_set = split_1.cache()
validation_set = split_2.cache()
test_set = split_3.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  train_set.count(), validation_set.count(), test_set.count())
)
train_set.show(5)
validation_set.show(5)
test_set.show(5)

Training: 1265744, validation: 422123, test: 423137

+------------------+--------------------+-----+-----------+-----------+
|            songId|              userId|Plays|user_als_id|song_als_id|
+------------------+--------------------+-----+-----------+-----------+
|SOAADAS12A58A784EC|72b850a0b614bf2ca...|    1|       5675|        483|
|SOAAZPG12A6D4F8D8B|06e2112e9fa9a5305...|    2|       1649|        116|
|SOAAZPG12A6D4F8D8B|1c42d0656661832a3...|    1|      15145|        116|
|SOAAZPG12A6D4F8D8B|2720bf07577332aac...|    1|      12927|        116|
|SOAAZPG12A6D4F8D8B|27c21f013bf67bc60...|    1|       6329|        116|
+------------------+--------------------+-----+-----------+-----------+
only showing top 5 rows

+------------------+--------------------+-----+-----------+-----------+
|            songId|              userId|Plays|user_als_id|song_als_id|
+------------------+--------------------+-----+-----------+-----------+
|SOAAZPG12A6D4F8D8B|0ba364be007519415...|    1|      39285

In [16]:
# Number of plays needs to be double type
validation_set = validation_set.withColumn("Plays", validation_set["Plays"].cast(DoubleType()))
validation_set.show(5)

+------------------+--------------------+-----+-----------+-----------+
|            songId|              userId|Plays|user_als_id|song_als_id|
+------------------+--------------------+-----+-----------+-----------+
|SOAAZPG12A6D4F8D8B|0ba364be007519415...|  1.0|      39285|        116|
|SOAAZPG12A6D4F8D8B|399d1e9bc1ae37682...|  2.0|      32465|        116|
|SOAAZPG12A6D4F8D8B|436122fe92bb4955f...|  1.0|      17579|        116|
|SOAAZPG12A6D4F8D8B|bb84b605789d89899...|  1.0|      41660|        116|
|SOAEBYL12AC468B119|2780f0ed8d98849f8...|  1.0|      25809|        237|
+------------------+--------------------+-----+-----------+-----------+
only showing top 5 rows



## MODEL GENERATION (Alternating Least Squares)

In [17]:
# initialising our First ALS learner
als_01 = ALS()
# Setting the parameters for the method
als_01.setMaxIter(5)\
   .setSeed(seed)\
   .setItemCol("song_als_id")\
   .setRatingCol("Plays")\
   .setUserCol("user_als_id")

ALS_9cc061c3aed0

In [18]:
# computing an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns

reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12, 16]
regParams = [0.15, 0.2, 0.25]
errors = [[0]*len(ranks)]*len(regParams)
models = [[0]*len(ranks)]*len(regParams)
err = 0
min_error = float('inf')
best_rank = -1

i = 0
for regParam in regParams:
  j = 0
  for rank in ranks:
    # Set the rank here:
    als_01.setParams(rank = rank, regParam = regParam)
    # Create the model with these parameters.
    model = als_01.fit(train_set)
    # Run the model to create a prediction. Predict against the validation_df.
    predictions = model.transform(validation_set)

    # Remove NaN values from prediction (due to SPARK-14489)
    predicted_plays = predictions.filter(predictions.prediction != float('nan'))
    predicted_plays = predicted_plays.withColumn("prediction", F.abs(F.round(predicted_plays["prediction"],0)))

    # Run the previously created RMSE evaluator, reg_eval, on the predicted_plays DataFrame
    error = reg_eval.evaluate(predicted_plays)
    errors[i][j] = error
    models[i][j] = model
    print ('For rank :',rank, ' regularization parameter:', regParam,' the RMSE is', error)
    if error < min_error:
      min_error = error
      best_params = [i,j]
    j += 1
  i += 1

als_01.setRegParam(regParams[best_params[0]])
als_01.setRank(ranks[best_params[1]])
print ('The best model was trained with regularization parameter %s' % regParams[best_params[0]])
print ('The best model was trained with rank %s' % ranks[best_params[1]])
my_model = models[best_params[0]][best_params[1]]

For rank : 4  regularization parameter: 0.15  the RMSE is 7.264644516563655
For rank : 8  regularization parameter: 0.15  the RMSE is 6.640101260086675
For rank : 12  regularization parameter: 0.15  the RMSE is 6.467822764651799
For rank : 16  regularization parameter: 0.15  the RMSE is 6.264071090807914
For rank : 4  regularization parameter: 0.2  the RMSE is 7.018550737612064
For rank : 8  regularization parameter: 0.2  the RMSE is 6.457395112480655
For rank : 12  regularization parameter: 0.2  the RMSE is 6.28958308758756
For rank : 16  regularization parameter: 0.2  the RMSE is 6.206920879675748
For rank : 4  regularization parameter: 0.25  the RMSE is 6.882385757354028
For rank : 8  regularization parameter: 0.25  the RMSE is 6.350196847342608
For rank : 12  regularization parameter: 0.25  the RMSE is 6.203698598333162
For rank : 16  regularization parameter: 0.25  the RMSE is 6.1651038673686305
The best model was trained with regularization parameter 0.25
The best model was train

In [19]:
#predicted plays
predicted_plays.show(10)

+------------------+--------------------+-----+-----------+-----------+----------+
|            songId|              userId|Plays|user_als_id|song_als_id|prediction|
+------------------+--------------------+-----+-----------+-----------+----------+
|SOYIYOG12A6D4F98CD|82ce98a5021005df7...|  1.0|       2748|        148|       2.0|
|SOYIYOG12A6D4F98CD|4f67cb97dee7ef34e...|  1.0|      10049|        148|       5.0|
|SOYIYOG12A6D4F98CD|cbe161a3d8767529b...|  6.0|      18605|        148|       1.0|
|SOYIYOG12A6D4F98CD|96b86ce393f1b6545...|  5.0|       4877|        148|       2.0|
|SOYIYOG12A6D4F98CD|56c0159379617eb13...|  3.0|      33898|        148|       2.0|
|SOYIYOG12A6D4F98CD|fe0add7149c2ca4cb...|  4.0|      41079|        148|       2.0|
|SOYIYOG12A6D4F98CD|b1d7e37caa99f45b1...|  1.0|      26292|        148|       1.0|
|SOYIYOG12A6D4F98CD|93b5fe4ed0c637099...|  3.0|      40953|        148|       4.0|
|SOYIYOG12A6D4F98CD|67fd48ba403033681...|  5.0|      19667|        148|       2.0|
|SOY

In [20]:
test_set = test_set.withColumn("Plays", test_set["Plays"].cast(DoubleType()))
predict_df = my_model.transform(test_set)

# Remove NaN values from prediction (due to SPARK-14489)
Test_predictions = predict_df.filter(predict_df.prediction != float('nan'))

# Round floats to whole numbers
Test_predictions = Test_predictions.withColumn("prediction", F.abs(F.round(Test_predictions["prediction"],0)))
# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
Test_RMSE = reg_eval.evaluate(Test_predictions)
print('The model had a RMSE on the test set of {0}'.format(Test_RMSE))

The model had a RMSE on the test set of 7.5163240006236585


In [21]:
# Comparing the Model
avg_plays = train_set.groupBy().avg('Plays').select(F.round('avg(Plays)'))
avg_plays.show(3)
train_avg_plays = avg_plays.collect()[0][0]
print('The average number of plays in the dataset is {0}'.format(train_avg_plays))

+--------------------+
|round(avg(Plays), 0)|
+--------------------+
|                 3.0|
+--------------------+

The average number of plays in the dataset is 3.0


In [22]:
# Add a column with the average rating
test_avg = test_set.withColumn('prediction', F.lit(train_avg_plays))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = reg_eval.evaluate(test_avg)
print("The RMSE on the average set is {0}".format(test_avg_RMSE))

The RMSE on the average set is 7.350144436048447


In [23]:
UserID = 13
songs_listened = plays_df_2.filter(plays_df_2.user_als_id == UserID) \
    .join(metadata_df, 'songId') \
    .select('song_als_id', 'artist_name', 'title') \
 \
# Generating List of Listened Songs
listened_songs_list = []
for song in songs_listened.collect():
    listened_songs_list.append(song['song_als_id'])

print('Songs user has listened to:')
songs_listened.select('artist_name', 'title').show()

# generate dataframe of unlistened songs
songs_unlistened = plays_df_2.filter( ~ plays_df_2['song_als_id'].isin(listened_songs_list)) \
    .select('song_als_id').withColumn('user_als_id', F.lit(UserID)).distinct()

# feed unlistened songs into model
predicted_listens = my_model.transform(songs_unlistened)

# remove NaNs
predicted_listens = predicted_listens.filter(predicted_listens['prediction'] != float('nan'))

# print output
print('Predicted Songs:')
predicted_listens.join(plays_df_2, 'song_als_id') \
    .join(metadata_df, 'songId') \
    .select('artist_name', 'title', 'prediction') \
    .distinct() \
    .orderBy('prediction', ascending=False) \
    .show(10)

Songs user has listened to:
+--------------------+--------------------+
|         artist_name|               title|
+--------------------+--------------------+
|       Justin Bieber|  Common Denominator|
|Sean Kingston and...|        Eenie Meenie|
|       Justin Bieber|              Bigger|
|       Justin Bieber|        Runaway Love|
|         John Legend|P.D.A. (We Just D...|
|              Staind|Reply (Album Vers...|
|         Los Pericos|            La Hiena|
|      Colbie Caillat|              Bubbly|
|         Lionel Rogg|Die Kunst der Fug...|
|       Justin Bieber|            One Time|
|       Justin Bieber| Stuck In The Moment|
|       Dwight Yoakam|      You're The One|
|           Morcheeba|             The Sea|
|       Justin Bieber|             Love Me|
|          Clara Hill|Clara meets Slope...|
|       Justin Bieber|       Down To Earth|
|      The Black Keys|All Hands Against...|
|           Sam Cooke|    Ain't Misbehavin|
|       Justin Bieber|             U Smile|
|   

## MAKING PREDICTIONS BASED ON  'SONGS LISTENED TO' AT LEAST TWICE

In [24]:
plays_df_2more_plays = plays_df.join(user_als_id_LUT, 'userId') \
                                       .join(song_als_id_LUT, 'songId') \
                                       .filter(plays_df.Plays >= 2)\
                                       .distinct()

total_entries_2more = plays_df_2more_plays.count()
print('Total enties with two or more plays: {0}'.format(total_entries_2more))

plays_df_2more_plays = plays_df_2more_plays.filter(plays_df_2more_plays.user_als_id < (unique_users)*0.8) \
                                                   .select('user_als_id', 'song_als_id', 'Plays')
plays_df_2more_plays.cache()

Total enties with two or more plays: 1728069


DataFrame[user_als_id: int, song_als_id: int, Plays: int]

In [25]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 1800083193
(split_01, split_02, split_03) = plays_df_2more_plays.randomSplit([0.6, 0.2, 0.2], seed = seed)

# Let's cache these datasets for performance
trainset_2more = split_01.cache()
validationset_2more = split_02.cache()
testset_2more = split_03.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  trainset_2more.count(), validationset_2more.count(), testset_2more.count())
)
validationset_2more = validationset_2more.withColumn("Plays", validationset_2more["Plays"].cast(DoubleType()))
test_2more = testset_2more.withColumn("Plays", testset_2more["Plays"].cast(DoubleType()))

trainset_2more.show(3)
validationset_2more.show(3)
testset_2more.show(3)

Training: 828129, validation: 275653, test: 276379

+-----------+-----------+-----+
|user_als_id|song_als_id|Plays|
+-----------+-----------+-----+
|          6|          8|    2|
|          7|         45|    3|
|         12|        275|    5|
+-----------+-----------+-----+
only showing top 3 rows

+-----------+-----------+-----+
|user_als_id|song_als_id|Plays|
+-----------+-----------+-----+
|        110|         11|  2.0|
|        125|        273| 10.0|
|        151|          8|  5.0|
+-----------+-----------+-----+
only showing top 3 rows

+-----------+-----------+-----+
|user_als_id|song_als_id|Plays|
+-----------+-----------+-----+
|          6|         15|    3|
|          7|         15|    6|
|         12|        115|    2|
+-----------+-----------+-----+
only showing top 3 rows



In [26]:
# Let's initialize our ALS learner
als_2more = ALS()

# Now set the parameters for the method
als_2more.setMaxIter(2)\
   .setSeed(seed)\
   .setItemCol("song_als_id")\
   .setRatingCol("Plays")\
   .setUserCol("user_als_id")

ALS_65afbd8a8382

In [27]:
# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12, 16]
regParams = [0.1, 0.15, 0.2, 0.25]
errors = [[0]*len(ranks)]*len(regParams)
models = [[0]*len(ranks)]*len(regParams)
err = 0
min_error = float('inf')
best_rank = -1
i = 0
for regParam in regParams:
  j = 0
  for rank in ranks:
    # Set the rank here:
    als_2more.setParams(rank = rank, regParam = regParam)
    # Create the model with these parameters.
    model = als_2more.fit(trainset_2more)
    # Run the model to create a prediction. Predict against the validation_df.
    predict_df = model.transform(validationset_2more)

    # Remove NaN values from prediction (due to SPARK-14489)
    predicted_plays_df = predict_df.filter(predict_df.prediction != float('nan'))
    predicted_plays_df = predicted_plays_df.withColumn("prediction", F.abs(F.round(predicted_plays_df["prediction"],0)))
    # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
    error = reg_eval.evaluate(predicted_plays_df)
    errors[i][j] = error
    models[i][j] = model
    print ('For rank %s, regularization parameter %s the RMSE is %s' % (rank, regParam, error))
    if error < min_error:
      min_error = error
      best_params = [i,j]
    j += 1
  i += 1

als_2more.setRegParam(regParams[best_params[0]])
als_2more.setRank(ranks[best_params[1]])
print ('The best model was trained with regularization parameter %s' % regParams[best_params[0]])
print ('The best model was trained with rank %s' % ranks[best_params[1]])
my_model_2more = models[best_params[0]][best_params[1]]

For rank 4, regularization parameter 0.1 the RMSE is 12.902847366010763
For rank 8, regularization parameter 0.1 the RMSE is 12.418148607763834
For rank 12, regularization parameter 0.1 the RMSE is 11.95593034209839
For rank 16, regularization parameter 0.1 the RMSE is 11.231182460051976
For rank 4, regularization parameter 0.15 the RMSE is 11.811508335215171
For rank 8, regularization parameter 0.15 the RMSE is 11.22740415240161
For rank 12, regularization parameter 0.15 the RMSE is 10.731303477347986
For rank 16, regularization parameter 0.15 the RMSE is 10.363964007086643
For rank 4, regularization parameter 0.2 the RMSE is 11.176796354061143
For rank 8, regularization parameter 0.2 the RMSE is 10.599910145664028
For rank 12, regularization parameter 0.2 the RMSE is 10.157671030809718
For rank 16, regularization parameter 0.2 the RMSE is 9.913135472681212
For rank 4, regularization parameter 0.25 the RMSE is 10.766626184988802
For rank 8, regularization parameter 0.25 the RMSE is 10

In [28]:
#Testing the Model on the Test_2more Dataset
predict_2more = my_model_2more.transform(test_2more)

# Remove NaN values from prediction 
predicted_test_2more = predict_2more.filter(predict_2more.prediction != float('nan'))

In [29]:
# Round floats to whole numbers
predicted_test_2more = predicted_test_2more.withColumn("prediction", F.abs(F.round(predicted_test_2more["prediction"],0)))
# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test2more_RMSE = reg_eval.evaluate(predicted_test_2more)

print('The model had a RMSE on the test set of {0}'.format(test2more_RMSE))

The model had a RMSE on the test set of 9.66994003417862


In [30]:

##We again compare to selecting the average number of plays from the training dataset
avg_plays_2more = trainset_2more.groupBy().avg('Plays').select(F.round('avg(Plays)'))

avg_plays_2more.show(3)
# Extract the average rating value. (This is row 0, column 0.)
train_avg_plays2more = avg_plays_2more.collect()[0][0]

print('The average number of plays in the dataset is {0}'.format(train_avg_plays2more))

# Add a column with the average rating
test_for_avg_2more = test_2more.withColumn('prediction', F.lit(train_avg_plays2more))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE_2more = reg_eval.evaluate(test_for_avg_2more)

print("The RMSE on the average set is {0}".format(test_avg_RMSE_2more))

+--------------------+
|round(avg(Plays), 0)|
+--------------------+
|                 6.0|
+--------------------+

The average number of plays in the dataset is 6.0
The RMSE on the average set is 8.643135682564385


In [31]:
#PREDICTION FOR THE USER - 02
UserID = 13
songs_listened = plays_df_2.filter(plays_df_2.user_als_id == UserID) \
    .join(metadata_df, 'songId') \
    .select('song_als_id', 'artist_name', 'title') \
 \
# Generating List of Listened Songs
listened_songs_list = []
for song in songs_listened.collect():
    listened_songs_list.append(song['song_als_id'])

print('Songs user has listened to:')
songs_listened.select('artist_name', 'title').show()

# generate dataframe of unlistened songs
songs_unlistened = plays_df_2.filter( ~ plays_df_2['song_als_id'].isin(listened_songs_list)) \
    .select('song_als_id').withColumn('user_als_id', F.lit(UserID)).distinct()

# feed unlistened songs into model
predicted_listens = my_model_2more.transform(songs_unlistened)

# remove NaNs
predicted_listens = predicted_listens.filter(predicted_listens['prediction'] != float('nan'))

# print output
print('Predicted Songs:')
predicted_listens.join(plays_df_2, 'song_als_id') \
    .join(metadata_df, 'songId') \
    .select('artist_name', 'title', 'prediction') \
    .distinct() \
    .orderBy('prediction', ascending=False) \
    .show(10)


Songs user has listened to:
+--------------------+--------------------+
|         artist_name|               title|
+--------------------+--------------------+
|       Justin Bieber|  Common Denominator|
|Sean Kingston and...|        Eenie Meenie|
|       Justin Bieber|              Bigger|
|       Justin Bieber|        Runaway Love|
|         John Legend|P.D.A. (We Just D...|
|              Staind|Reply (Album Vers...|
|         Los Pericos|            La Hiena|
|      Colbie Caillat|              Bubbly|
|         Lionel Rogg|Die Kunst der Fug...|
|       Justin Bieber|            One Time|
|       Justin Bieber| Stuck In The Moment|
|       Dwight Yoakam|      You're The One|
|           Morcheeba|             The Sea|
|       Justin Bieber|             Love Me|
|          Clara Hill|Clara meets Slope...|
|       Justin Bieber|       Down To Earth|
|      The Black Keys|All Hands Against...|
|           Sam Cooke|    Ain't Misbehavin|
|       Justin Bieber|             U Smile|
|   