# Constructing a recommendation system with PySpark (ALS) to recommend a music artist (Yahoo dataset)

### Information about the dataset

- Number of inputs: >150 000 000, for the project only 100,000 are used
- Total number of music artists: 97 956
- Dataset: https://webscope.sandbox.yahoo.com/catalog.php?datatype=r&did=1 <br>
A permission to use this data set for non-commercial usage was provided by Yahoo

**Initializing Spark and importing libraries**

In [1]:
import findspark
import pyspark
import numpy
import pandas

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("rec").getOrCreate()

In [3]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

**Import of a dataset**

In [4]:
data = spark.read.csv("data/part1.txt", inferSchema=True,header=False).limit(100000)
data.show(5)

+---+-------+---+
|_c0|    _c1|_c2|
+---+-------+---+
|  1|1000125| 90|
|  1|1006373|100|
|  1|1006978| 90|
|  1|1007035|100|
|  1|1007098|100|
+---+-------+---+
only showing top 5 rows



**Data preprocessing**

In [5]:
from pyspark.sql.functions import *

df = data.select(col("_c0").alias("UserID"), col("_c1").alias("ArtistID"), col("_c2").alias("score"))
df.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- ArtistID: integer (nullable = true)
 |-- score: integer (nullable = true)



In [6]:
df.describe().show()

+-------+------------------+-----------------+-----------------+
|summary|            UserID|         ArtistID|            score|
+-------+------------------+-----------------+-----------------+
|  count|            100000|           100000|           100000|
|   mean|          807.9369|    1033841.04656|         56.18287|
| stddev|501.30436056253603|32723.43135117443|40.57942182261647|
|    min|                 1|            24538|                0|
|    max|              1675|          1101671|              255|
+-------+------------------+-----------------+-----------------+



**Min-Max Scaler applied on scores**

In [7]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["score"],
                            outputCol="score_f")

df_t = assembler.transform(df)

In [8]:
df_t.show(5)

+------+--------+-----+-------+
|UserID|ArtistID|score|score_f|
+------+--------+-----+-------+
|     1| 1000125|   90| [90.0]|
|     1| 1006373|  100|[100.0]|
|     1| 1006978|   90| [90.0]|
|     1| 1007035|  100|[100.0]|
|     1| 1007098|  100|[100.0]|
+------+--------+-----+-------+
only showing top 5 rows



In [9]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors


scaler = MinMaxScaler(inputCol="score_f", outputCol="scaled_score")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(df_t)

# rescale each feature to range [min, max].
scaled_df = scalerModel.transform(df_t)

In [10]:
scaled_df.show(5)

+------+--------+-----+-------+--------------------+
|UserID|ArtistID|score|score_f|        scaled_score|
+------+--------+-----+-------+--------------------+
|     1| 1000125|   90| [90.0]|[0.35294117647058...|
|     1| 1006373|  100|[100.0]|[0.39215686274509...|
|     1| 1006978|   90| [90.0]|[0.35294117647058...|
|     1| 1007035|  100|[100.0]|[0.39215686274509...|
|     1| 1007098|  100|[100.0]|[0.39215686274509...|
+------+--------+-----+-------+--------------------+
only showing top 5 rows



In [11]:
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaled_df.select("score_f", "scaled_score").show(5)

Features scaled to range: [0.000000, 1.000000]
+-------+--------------------+
|score_f|        scaled_score|
+-------+--------------------+
| [90.0]|[0.35294117647058...|
|[100.0]|[0.39215686274509...|
| [90.0]|[0.35294117647058...|
|[100.0]|[0.39215686274509...|
|[100.0]|[0.39215686274509...|
+-------+--------------------+
only showing top 5 rows



**Converting a vector to Double**

In [12]:
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

new_df = scaled_df.withColumn("label", scaled_df.scaled_score.cast(StringType()))

In [13]:
new_df1 = new_df.withColumn('label_l', regexp_replace('label', ']', ''))
#new_df2 = new_df1.withColumn('label_l', regexp_replace('label', '[', ''))

In [14]:
nn = new_df1.withColumn("scores_d", new_df1.label_l.substr(2, 10).cast(DoubleType()))

In [15]:
nn.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- ArtistID: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- score_f: vector (nullable = true)
 |-- scaled_score: vector (nullable = true)
 |-- label: string (nullable = true)
 |-- label_l: string (nullable = true)
 |-- scores_d: double (nullable = true)



In [16]:
df_r = nn.select(["UserID","ArtistID","scores_d"])

In [17]:
#df_r.collect()

**Train-test split**

In [18]:
train, test = df_r.randomSplit([0.8, 0.2])

**Initialize of the ALS model**

In [19]:
als = ALS(userCol='UserID', itemCol='ArtistID', ratingCol='scores_d', coldStartStrategy='drop')

**Usage of ParGridBuilder to find the best combination of parameters**

In [20]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

paramGrid = ParamGridBuilder()\
    .addGrid(als.regParam, [0.1, 0.01]) \
    .addGrid(als.maxIter, [5, 10])\
    .addGrid(als.rank, [10, 20])\
    .build()


#some other regularization parameters:
#.addGrid(als.alpha, [0.1, 0.5, 1])\
#.addGrid(als.nonnegative, [False, True])\
#.addGrid(als.implicitPrefs, [False, True])\

**Cross-validation**

In [21]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName='rmse', labelCol='scores_d', 
                                                        predictionCol='prediction'),
                          numFolds=2)  # use 3+ folds in practice

**Fit training data**

In [22]:
cvModel = crossval.fit(train)

**Extract the best model**

In [23]:
best_model = cvModel.bestModel

**Generate predictions on test data**

In [24]:
predictions = best_model.transform(test)

In [25]:
predictions.show()

+------+--------+----------+------------+
|UserID|ArtistID|  scores_d|  prediction|
+------+--------+----------+------------+
|    91| 1015250|       0.0| -0.15642607|
|  1114| 1015250|0.35294117|  0.33530283|
|  1206| 1015250|0.39215686|  0.35214406|
|   868| 1015250| 0.2745098|  0.06059181|
|   542| 1015250|0.31764705|  0.33039916|
|    51| 1015250|0.39215686|  0.08423141|
|  1280| 1017864|0.10980392|  0.11224718|
|   232| 1017864|       0.0| 0.027070649|
|   140| 1017864|       0.0|  0.10479112|
|   205| 1017864| 0.2745098|  0.13553531|
|  1487| 1017864|       0.0| 0.010085652|
|  1248| 1017864|0.19607843|  0.21532978|
|    49| 1017864|0.35294117|  0.23889358|
|    51| 1017864|       0.0|0.0077098743|
|   957| 1017864|0.31372549|   0.3157921|
|   143| 1017864|       0.0|0.0031054735|
|  1668| 1017864|       0.0|  0.03946184|
|   608| 1017864| 0.2745098|   0.2274194|
|   541| 1017864|       0.0| 0.088495895|
|   566| 1017864|0.35294117|   0.3175629|
+------+--------+----------+------

**Use RMSE to evaluate the model on test data**

In [26]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='scores_d', predictionCol='prediction')

In [27]:
rmse = evaluator.evaluate(predictions)

In [28]:
print(rmse)

0.11281615227521201


+-11% of mistake 

**Predictions for a single user** <br>
Here user id 100 is selected

In [29]:
single_user = test.filter(test['UserID']==100).select(["ArtistID","UserID"])

In [31]:
recommendations = best_model.transform(single_user)

In [32]:
recommendations.orderBy('prediction', ascending=False).show()

+--------+------+----------+
|ArtistID|UserID|prediction|
+--------+------+----------+
| 1000276|   100|0.37870556|
| 1045024|   100|0.32465968|
| 1022226|   100| 0.2962859|
| 1019547|   100|0.28094342|
| 1046331|   100| 0.1974639|
| 1046217|   100| 0.1912872|
| 1024038|   100|0.14846405|
+--------+------+----------+

