# Sistemas de Recomendação usando PySpark

###### Dataset: https://grouplens.org/datasets/movielens/
Código do sistema de recomendação baseado em: https://github.com/Apress/machine-learning-with-pyspark/tree/master/chapter_8_Recommender_System

### Existem breves explicações em cada pequena parte do código, facilitando assim, o entendimento do que está sendo executado.

#### Importação das bilibiotecas do PySpark.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, col, lit
from pyspark.ml.feature import StringIndexer,IndexToString
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

#### Criação de uma sessão Spark. Essa linha é muito importante pois permite que acessemos a API do Spark.

In [2]:
spark = SparkSession.builder.appName('recommender_system').getOrCreate()

#### Importação dos dados sobre os filmes, e junção de dois arquivos (ratings e movies). Nesse ponto ao ler os arquivos o Spark já adiciona os dados em um DataFrame.

In [3]:
df_ratings = spark.read.csv('ratings.csv',inferSchema=True, header=True)

In [4]:
df_movies = spark.read.csv('movies.csv',inferSchema=True, header=True)

In [5]:
df_ratings_movies = df_ratings.join(df_movies,df_ratings.movieId == df_movies.movieId,"inner"
    ).select(df_ratings["userId"],df_ratings["movieId"],df_ratings["rating"],df_movies["title"], df_movies["genres"])

#### Imprime o schema do DataFrame. Só por curiosidade!

In [6]:
df_ratings_movies.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



#### Seleciona de forma aleatória alguns registros.

In [7]:
df_ratings_movies.orderBy(rand()).show(5,False)

+------+-------+------+-------------------------------------+-----------------------+
|userId|movieId|rating|title                                |genres                 |
+------+-------+------+-------------------------------------+-----------------------+
|63298 |2022   |3.0   |Last Temptation of Christ, The (1988)|Drama                  |
|119748|707    |2.0   |Mulholland Falls (1996)              |Crime|Drama|Thriller   |
|29656 |6      |2.5   |Heat (1995)                          |Action|Crime|Thriller  |
|89061 |34405  |5.0   |Serenity (2005)                      |Action|Adventure|Sci-Fi|
|6039  |1333   |4.0   |Birds, The (1963)                    |Horror|Thriller        |
+------+-------+------+-------------------------------------+-----------------------+
only showing top 5 rows



#### Mostra os Top 5 usuários: os que mais avaliaram filmes.

In [8]:
df_ratings_movies.groupBy('userId').count().orderBy('count',ascending=False).show(5,False)

+------+-----+
|userId|count|
+------+-----+
|118205|9254 |
|8405  |7515 |
|82418 |5646 |
|121535|5520 |
|125794|5491 |
+------+-----+
only showing top 5 rows



#### Mostra os Top 5 filmes: os que foram mais avaliados.

In [9]:
df_ratings_movies.groupBy('title').count().orderBy('count',ascending=False).show(5,False)

+--------------------------------+-----+
|title                           |count|
+--------------------------------+-----+
|Pulp Fiction (1994)             |67310|
|Forrest Gump (1994)             |66172|
|Shawshank Redemption, The (1994)|63366|
|Silence of the Lambs, The (1991)|63299|
|Jurassic Park (1993)            |59715|
+--------------------------------+-----+
only showing top 5 rows



#### Aplica uma transformação na feature title. A função StringIndexer (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html) mapea a coluna do tipo string para uma coluna de índices. Essa nova coluna representa os valores numéricos para os filmes.

In [10]:
stringIndexer = StringIndexer(inputCol="title",outputCol="title_indexed")

In [11]:
model = stringIndexer.fit(df_ratings_movies)

In [12]:
titles_indexed = model.transform(df_ratings_movies)

In [13]:
titles_indexed.show(5)

+------+-------+------+--------------------+--------------------+-------------+
|userId|movieId|rating|               title|              genres|title_indexed|
+------+-------+------+--------------------+--------------------+-------------+
|     1|      2|   3.5|      Jumanji (1995)|Adventure|Childre...|        125.0|
|     1|     29|   3.5|City of Lost Chil...|Adventure|Drama|F...|        564.0|
|     1|     32|   3.5|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|         19.0|
|     1|     47|   3.5|Seven (a.k.a. Se7...|    Mystery|Thriller|         23.0|
|     1|     50|   3.5|Usual Suspects, T...|Crime|Mystery|Thr...|         14.0|
+------+-------+------+--------------------+--------------------+-------------+
only showing top 5 rows



In [17]:
titles_indexed.groupBy('title_indexed').count().orderBy('count',ascending=False).show(5,False)

+-------------+-----+
|title_indexed|count|
+-------------+-----+
|0.0          |67310|
|1.0          |66172|
|2.0          |63366|
|3.0          |63299|
|4.0          |59715|
+-------------+-----+
only showing top 5 rows



#### Treina o modelo.

In [18]:
train,test = titles_indexed.randomSplit([0.75,0.25])

In [19]:
train.count()

15000312

In [20]:
test.count()

4999951

#### Importa a função ALS que constroe o modelo baseado no dataset de testes anterior.  https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

In [21]:
rec = ALS(maxIter=10,regParam=0.01,userCol='userId',
itemCol = 'title_indexed',ratingCol='rating',nonnegative=True, coldStartStrategy="drop")

In [22]:
rec_model = rec.fit(train)

#### Avalia a performance do modelo. Para as predições é utilizada a função transform e RegressionEvaluator (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html) para testar o valor do RMSE nos dados de teste. O rmse mede a diferença entre os valores previstos pelo modelo e os valores atuais.

In [23]:
predicted_ratings = rec_model.transform(test)

In [24]:
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- title_indexed: double (nullable = false)
 |-- prediction: float (nullable = false)



In [25]:
predicted_ratings.orderBy(rand()).show(5)

+------+-------+------+--------------------+--------------------+-------------+----------+
|userId|movieId|rating|               title|              genres|title_indexed|prediction|
+------+-------+------+--------------------+--------------------+-------------+----------+
| 44105|   1580|   3.0|Men in Black (a.k...|Action|Comedy|Sci-Fi|         36.0| 2.5390618|
| 60119|    156|   3.0|Blue in the Face ...|        Comedy|Drama|       2298.0| 3.8011723|
| 88316|    110|   5.0|   Braveheart (1995)|    Action|Drama|War|          6.0|  4.159054|
| 35710|   2863|   5.0|Hard Day's Night,...|Adventure|Comedy|...|       1168.0|  4.147358|
| 30350|   3984|   3.0|Diamonds Are Fore...|Action|Adventure|...|       1145.0| 3.5844688|
+------+-------+------+--------------------+--------------------+-------------+----------+
only showing top 5 rows



In [26]:
evaluator = RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [27]:
rmse = evaluator.evaluate(predicted_ratings)

In [28]:
print(rmse)

0.8105306808793409


#### Recomenda os filmes. Verifica quais filmes o usuário já viu e quais ainda não assistiu, e apartir do merge entre os dois dataframes verifica quais poderá recomendar para o usuário.

In [29]:
distinct_movies = titles_indexed.select('title_indexed').distinct()

In [30]:
distinct_movies.count()

26729

In [31]:
dm = distinct_movies.alias('dm')

In [33]:
user_id = 150

In [35]:
watched_movies = titles_indexed.filter(titles_indexed['userId'] == user_id).select('title_indexed').distinct()

In [36]:
watched_movies.count()

26

In [37]:
wm = watched_movies.alias('wm')

In [38]:
total_movies = dm.join(wm, dm.title_indexed == wm.title_indexed,how ='left')

In [39]:
total_movies.show(5,False)

+-------------+-------------+
|title_indexed|title_indexed|
+-------------+-------------+
|299.0        |null         |
|305.0        |null         |
|496.0        |null         |
|558.0        |null         |
|596.0        |null         |
+-------------+-------------+
only showing top 5 rows



In [40]:
remaining_movies = total_movies.where(col("wm.title_indexed").isNull()).select(dm.title_indexed).distinct()

In [41]:
remaining_movies.count()

26703

In [42]:
remaining_movies = remaining_movies.withColumn("userId",lit(int(user_id)))

In [43]:
remaining_movies.show(5,False)

+-------------+------+
|title_indexed|userId|
+-------------+------+
|299.0        |150   |
|305.0        |150   |
|496.0        |150   |
|558.0        |150   |
|596.0        |150   |
+-------------+------+
only showing top 5 rows



In [44]:
recommendations = rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [45]:
recommendations.show(5,False)

+-------------+------+----------+
|title_indexed|userId|prediction|
+-------------+------+----------+
|18657.0      |150   |10.150861 |
|20392.0      |150   |9.207703  |
|17201.0      |150   |9.052946  |
|20208.0      |150   |8.101024  |
|22070.0      |150   |7.9838123 |
+-------------+------+----------+
only showing top 5 rows



In [46]:
movie_title = IndexToString(inputCol="title_indexed",outputCol="title",labels=model.labels)

In [47]:
final_recommendations = movie_title.transform(recommendations)

In [48]:
final_recommendations.show(5,False)

+-------------+------+----------+---------------------------------------+
|title_indexed|userId|prediction|title                                  |
+-------------+------+----------+---------------------------------------+
|18657.0      |150   |10.150861 |Fitzgerald Family Christmas, The (2012)|
|20392.0      |150   |9.207703  |Send a Bullet (Manda Bala) (2007)      |
|17201.0      |150   |9.052946  |Loose Change: Second Edition (2006)    |
|20208.0      |150   |8.101024  |Nine Deaths of the Ninja (1985)        |
|22070.0      |150   |7.9838123 |Out in the Dark (2013)                 |
+-------------+------+----------+---------------------------------------+
only showing top 5 rows

