## Sistema de Recomendación (Filtrado Colaborativo)

- Escribir una implementación del mismo para Spark
- Evaluar el modelo de filtrado colaborativo sobre el dataset de películas (Movies)

### Big Data, Máster Ciencia de Datos
#### Mayra Russo Botero 

In [1]:
SEED=666

# libraries
import numpy as np
np.random.seed(SEED)
import pandas as pd
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext,SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import FloatType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.sql.functions import col,min,max

In [2]:
# check if spark context is defined
sc = SparkContext()

# initialize SQLContext
sqlContext = SQLContext(sc)

# initialize Spark Session
spark = SparkSession.builder.appName('rec_app').getOrCreate()

KeyboardInterrupt: 

In [3]:
# data import
#users data
u_cols = 'user_id::sex::age::occupation::zip_code'.split('::')
df_users = sqlContext.read.csv('data/users.dat', sep=':',header=False,
                               inferSchema=True)
#ratings data
r_cols = 'userID::movieID::rating::timestamp'.split('::')
df_ratings = sqlContext.read.csv('data/ratings.dat', sep=':',header=False,
                                 inferSchema=True)
#movies data
m_cols = 'movie_id::title::genres'.split('::')
df_movies = sqlContext.read.csv('data/movies.dat', sep=':',header=False,
                               inferSchema=True)

In [4]:
#arrange dataframes 
#extract correct columns and rename them, plus add headers 
#ratings df 
dis_columns = np.array(df_ratings.columns)
dis_columns = dis_columns[range(0, len(df_ratings.columns), 2)]
df_ratings = df_ratings.select(dis_columns.tolist())

assert len(r_cols) == len(dis_columns)
for i in range(len(r_cols)):
    df_ratings = \
        df_ratings.withColumnRenamed(dis_columns[i], r_cols[i])
df_ratings.printSchema()
df_ratings.show()

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|     1|   1193|     5|978300760|
|     1|    661|     3|978302109|
|     1|    914|     3|978301968|
|     1|   3408|     4|978300275|
|     1|   2355|     5|978824291|
|     1|   1197|     3|978302268|
|     1|   1287|     5|978302039|
|     1|   2804|     5|978300719|
|     1|    594|     4|978302268|
|     1|    919|     4|978301368|
|     1|    595|     5|978824268|
|     1|    938|     4|978301752|
|     1|   2398|     4|978302281|
|     1|   2918|     4|978302124|
|     1|   1035|     5|978301753|
|     1|   2791|     4|978302188|
|     1|   2687|     3|978824268|
|     1|   2018|     4|978301777|
|     1|   3105|     5|978301713|
|     1|   2797|     4|978302039|
+------+-------+------+---------+
only showing top 2

In [5]:
#movies df 
dis_columns1 = np.array(df_movies.columns)
dis_columns1 = dis_columns1[range(0, len(df_movies.columns), 2)]
df_movies = df_movies.select(dis_columns1.tolist())

assert len(m_cols) == len(dis_columns1)
for i in range(len(m_cols)):
    df_movies = \
        df_movies.withColumnRenamed(dis_columns1[i], m_cols[i])
df_movies.printSchema()
df_movies.show()

root
 |-- movie_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+--------+--------------------+--------------------+
|movie_id|               title|              genres|
+--------+--------------------+--------------------+
|       1|    Toy Story (1995)|Animation|Childre...|
|       2|      Jumanji (1995)|Adventure|Childre...|
|       3|Grumpier Old Men ...|      Comedy|Romance|
|       4|Waiting to Exhale...|        Comedy|Drama|
|       5|Father of the Bri...|              Comedy|
|       6|         Heat (1995)|Action|Crime|Thri...|
|       7|      Sabrina (1995)|      Comedy|Romance|
|       8| Tom and Huck (1995)|Adventure|Children's|
|       9| Sudden Death (1995)|              Action|
|      10|    GoldenEye (1995)|Action|Adventure|...|
|      11|American Presiden...|Comedy|Drama|Romance|
|      12|             Dracula|                null|
|      13|        Balto (1995)|Animation|Children's|
|      14|        Nixon (1995)

In [6]:
#users df 
dis_columns2 = np.array(df_users.columns)
dis_columns2 = dis_columns2[range(0, len(df_users.columns), 2)]
df_users = df_users.select(dis_columns2.tolist())

assert len(u_cols) == len(dis_columns2)
for i in range(len(u_cols)):
    df_users = \
        df_users.withColumnRenamed(dis_columns2[i], u_cols[i])
df_users.printSchema()
df_users.show()

root
 |-- user_id: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation: integer (nullable = true)
 |-- zip_code: string (nullable = true)

+-------+---+---+----------+--------+
|user_id|sex|age|occupation|zip_code|
+-------+---+---+----------+--------+
|      1|  F|  1|        10|   48067|
|      2|  M| 56|        16|   70072|
|      3|  M| 25|        15|   55117|
|      4|  M| 45|         7|   02460|
|      5|  M| 25|        20|   55455|
|      6|  F| 50|         9|   55117|
|      7|  M| 35|         1|   06810|
|      8|  M| 25|        12|   11413|
|      9|  M| 25|        17|   61614|
|     10|  F| 35|         1|   95370|
|     11|  F| 25|         1|   04093|
|     12|  M| 25|        12|   32793|
|     13|  M| 45|         1|   93304|
|     14|  M| 35|         0|   60126|
|     15|  M| 25|         7|   22903|
|     16|  F| 35|         0|   20670|
|     17|  M| 50|         1|   95350|
|     18|  F| 18|         3|   95825|
| 

In [7]:
#extracting distinct user ids and movies, sparsity 
#number of ratings in matrix
numerator = df_ratings.count()
#distinct users and movies 
users=df_ratings.select('userID').distinct().count()
movies=df_ratings.select('movieID').distinct().count()
#number of ratings matrix could contain if no empty cells 
denominator = users * movies 

#calculating sparsity
sparsity = 1 - (numerator*1.0 / denominator)
sparsity

#removes users with less than 20 ratings
df_ratings.groupBy("userID").count().filter(col("count") >= 20).show()

# max and min num of ratings by userId
df_ratings.groupBy("userID").count().select(min("count")).show()
df_ratings.groupBy("userID").count().select(max("count")).show()

+------+-----+
|userID|count|
+------+-----+
|   148|  624|
|   463|  123|
|   471|  105|
|   496|  119|
|   833|   21|
|  1088| 1176|
|  1238|   45|
|  1342|   92|
|  1580|   37|
|  1591|  314|
|  1645|  522|
|  1829|   30|
|  1959|   61|
|  2122|  208|
|  2142|   77|
|  2366|   41|
|  2659|  161|
|  2866|  205|
|  3175|   87|
|  3749|  118|
+------+-----+
only showing top 20 rows

+----------+
|min(count)|
+----------+
|        20|
+----------+

+----------+
|max(count)|
+----------+
|      2314|
+----------+



In [8]:
#fitting the model 
#we will use a smaller dataset 
(small_ratings,everything)= df_ratings.randomSplit([0.1,0.9],seed=SEED)
# split data
(training_data, test_data) = small_ratings.randomSplit([0.8, 0.2],seed=SEED)
# ALS model
als = ALS(userCol="userID", itemCol="movieID", ratingCol="rating", nonnegative=True,
coldStartStrategy="drop", implicitPrefs=False)


#create a ParamGridBuilder 
param_grid = ParamGridBuilder()\
            .addGrid(als.rank,[10, 50]) \
            .addGrid(als.maxIter,[5]) \
            .addGrid(als.regParam,[.05, .1, 1.5]) \
            .build()
                      
#how to evaluate performance
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")

#create cross validator
cv = CrossValidator(estimator = als,
                    estimatorParamMaps = param_grid,
                    evaluator = evaluator,
                    numFolds = 2)

# Run the cv on the training data
model = cv.fit(training_data)
# Extract best combination of values from cross validation
best_model = model.bestModel

# Generate test set predictions and evaluate using RMSE
predictions = best_model.transform(test_data)

rmse = evaluator.evaluate(predictions)
# Print evaluation metrics and model parameters
print ("**Best Model**")
print ("RMSE = " , rmse)
print (" Rank: ", best_model.rank)
print (" MaxIter: ", best_model._java_obj.parent().getMaxIter())
print (" RegParam: ", best_model._java_obj.parent().getRegParam())
#due to using a smaller data set, the performance of the model is worse than if training the model with a larger dataset.
#however, due to the computational power and the resources available, we will keep the smaller dataset.

**Best Model**
RMSE =  1.0454524726183856
 Rank:  50
 MaxIter:  5
 RegParam:  0.1


In [19]:
##### 
predictions.show()

+------+-------+------+---------+----------+
|userID|movieID|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|  1242|    148|     3|974909976| 2.5737453|
|  2507|    148|     4|974082717| 2.7863207|
|  3425|    471|     4|967364970|  4.145903|
|  1404|    471|     3|974765002|  3.461893|
|    26|    471|     4|978140977|  3.971983|
|  3792|    471|     3|967123821| 3.7582922|
|  1125|    471|     3|974923365|  3.181326|
|  1377|    471|     1|974770883| 3.0990922|
|  5556|    471|     5|959444046| 3.2658684|
|  3031|    471|     5|970429234|  4.228343|
|    95|    471|     3|977628135| 3.8344915|
|  1980|    471|     4|974691272| 3.8825483|
|  1983|    471|     5|976898914|  4.352147|
|   934|    471|     5|991940804| 2.6414475|
|  1298|    833|     2|974835615| 1.9081663|
|  5046|    833|     2|962506488| 1.6108062|
|  4725|   1088|     4|963456486| 2.8618388|
|  3601|   1088|     4|966641981|  3.757358|
|  4823|   1088|     2|963174317| 2.7987018|
|  4521|  