<p style="text-align: center;">
  <font face="Segoe Script" size="6" color='#FF5733'>
    <strong>Pyspark para Ciência de Dados</strong>
  </font>
</p>

<p style="text-align: center;">
<font face="Segoe Print" size="7" color='#900C3F'><strong>Projeto: Recomendações de Filmes - Spark ML</strong></font>
</p>

<p style="text-align: right;">
  <font face="Segoe Script" size="4" color='#444444'>
    Roberto Soares 
  </font>
</p>
<p style="text-align: right;"><font face="Segoe Script" size="2" color='#444444'>
<a href="https://www.linkedin.com/in/roberto-soares-full-stack-data-scientist/">in/roberto-soares-full-stack-data-scientist</a> 

<p style="text-align: right;"><font face="Segoe Script" size="2" color='#444444'>    
<a href="https://github.com/roberto-ssoares/roberto-ssoares.github.io">roberto-ssoares-portifolio</a> 

>

## <font face="Segoe Print" size="4" color='#334CFF'>Instalando e Carregando Pacotes</font>

<td>
<font face="Segoe Print" size="2" color='#66666'>
    
- Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
- pip install -U nome_pacote

- Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
- !pip install nome_pacote==versão_desejada

- Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

- Instala o pacote watermark.
- Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
</font>
</td>

In [1]:
#!pip install -q -U watermark

In [2]:
!pip install -q -U pyspark

In [3]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

from pyspark.sql import SparkSession 
from pyspark.ml  import Pipeline     
from pyspark.sql import SQLContext  
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [4]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Roberto Soares - in/roberto-soares-full-stack-data-scientist" 

Author: Roberto Soares - in/roberto-soares-full-stack-data-scientist



## <font face="Segoe Print" size="4" color='#334CFF'>Em primeiro lugar, precisamos criar um contêiner Spark chamando SparkSession.</font>

<font face="Segoe Print" size="3" color='#66666'>

- Esta etapa é necessária antes de fazer qualquer coisa.

In [5]:
# Criar uma SparkSession
spark = SparkSession.builder.appName('recomendacao_filme').getOrCreate()

## <font face="Segoe Print" size="4" color='#334CFF'>O método Pyspark.toPandas() nos permite retornar SparkDataFrame como exibição da tabela Pandas.</font>

<font face="Segoe Print" size="3" color='#66666'>


In [6]:
# carrega o dataset e cria dataframe spark
df = spark.read.csv('data/movie_ratings_df.csv', inferSchema=True, header=True)

## <font face="Segoe Print" size="4" color='#334CFF'>Usando limit(), ou select() ou show() para visualizar os dados.</font>

<font face="Segoe Print" size="3" color='#66666'>
    
- Usando limit()
- Usando o método toPandas() para retornar Pyspark DataFrame como tabela Pandas.

In [7]:
df.limit(5).toPandas()

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5
3,154,Kolya (1996),3
4,306,Kolya (1996),5


## <font face="Segoe Print" size="4" color='#334CFF'>Nossa tarefa:</font>

<font face="Segoe Print" size="3" color='#66666'>

- Dado um usuário, prevemos e retornamos uma lista de recomendações de filmes para aquele usuário assistir.

Usamos: printSchema() para uma visão geral rápida do tipo de dados de recursos.

In [8]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



## <font face="Segoe Print" size="4" color='#334CFF'>Como podemos ver, a coluna do título é armazenada como tipo string.</font>

<font face="Segoe Print" size="3" color='#66666'>

- Para trabalhar com a biblioteca pyspark Mlib, precisamos converter o tipo de string em valores numéricos.



In [9]:
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')

<font face="Segoe Print" size="3" color='#66666'>
    
- Aplicando objeto stringindexer na coluna de título do filme dataframe.

In [10]:
model = stringIndexer.fit(df)

<font face="Segoe Print" size="3" color='#66666'>

- Criando novo dataframe com valores transformados.

In [11]:
indexed = model.transform(df)

<font face="Segoe Print" size="3" color='#66666'>

- Validar os valores numéricos do título

In [12]:
indexed.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


## <font face="Segoe Print" size="4" color='#334CFF'>Usamos o algoritmo Alternating Least Squares (ALS) na biblioteca Pyspark Ml para recomendação.</font>

<font face="Segoe Print" size="3" color='#66666'>
    
Para ler mais, você pode visitar https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.htmlark em um ambiente Python.

<font face="Segoe Print" size="2" color='#7D83B2'>
    
Introdução ao ALS:

- O ALS é um algoritmo de filtragem colaborativa amplamente usado em sistemas de recomendação.
- Ele aborda o problema de recomendação tratando a matriz de utilidade (usuários x itens) como uma matriz esparsa e tenta preencher os valores desconhecidos.

Funcionamento do ALS:

- O algoritmo tenta minimizar a diferença entre os valores reais e os valores previstos, usando mínimos quadrados alternados.
- Alterna entre otimizar as representações latentes dos usuários e dos itens até convergir para uma solução.

Pontos Positivos do ALS:

- Escalabilidade: Pode lidar eficientemente com grandes conjuntos de dados distribuídos.
- Boa performance: O ALS é eficaz em sistemas de recomendação, mesmo com conjuntos de dados muito grandes.
- Pode lidar com dados esparsos: Funciona bem mesmo quando a matriz de utilidade é esparsa, ou seja, muitos valores são desconhecidos.

Pontos Negativos do ALS:

- Sensibilidade aos hiperparâmetros: Os resultados do ALS podem variar significativamente com diferentes configurações de hiperparâmetros, como o número de fatores latentes.
- Necessidade de ajuste fino: Em algumas situações, pode ser necessário ajustar cuidadosamente os parâmetros para obter os melhores resultados.

Por que usar Pyspark para rodar algoritmos:

- Processamento distribuído: Pyspark permite distribuir o processamento de dados em um cluster, o que é essencial para lidar com grandes volumes de dados.
- Integração com Spark: Pyspark é uma biblioteca Python para Spark, que é amplamente adotado em ambientes de Big Data devido à sua velocidade e capacidade de escala.
- Suporte a algoritmos ML: Pyspark ML fornece uma ampla gama de algoritmos de aprendizado de máquina e ferramentas para trabalhar com dados distribuídos de forma eficiente.
- Facilidade de uso: Pyspark oferece uma interface Python familiar para trabalhar com Spark, facilitando o desenvolvimento e a depuração de código.

Conclusão:

- O ALS é um algoritmo eficaz para recomendação, especialmente em grandes conjuntos de dados esparsos.
- Ao usar Pyspark ML para implementar o ALS, você pode aproveitar a escalabilidade e o desempenho do Spark em um ambiente Python.

- https://github.com/apache/spark/tree/master/examples/src/main/python

<font face="Segoe Print" size="3" color='#66666'>

- Dividir os dados em conjunto de dados de treinamento e teste.

In [13]:
train, test = indexed.randomSplit([0.75,0.25])

<font face="Segoe Print" size="3" color='#66666'>

- Treinando o modelo de recomendação usando dados de treinamento.melhores resultados possíveis.

In [14]:
from pyspark.ml.recommendation import ALS

rec=ALS( maxIter=10
        ,regParam=0.01
        ,userCol='userId'
        ,itemCol='title_new'
        ,ratingCol='rating'
        ,nonnegative=True
        ,coldStartStrategy="drop")

<td>
<font face="Segoe Print" size="2" color='#7D83B2'>
    
Descrição e explicação de cada parâmetro do algoritmo ALS em Pyspark:

- **maxIter:**
    - Descrição: Número máximo de iterações (passos) que o algoritmo ALS executará.
    - Explicação: Quanto maior o número de iterações, mais oportunidades o algoritmo tem para convergir para uma solução.
- **regParam:**
    - Descrição: Parâmetro de regularização que controla a penalidade adicionada aos termos do modelo para evitar overfitting.
    - Explicação: Um valor maior de regParam aumentará a penalidade, o que pode ajudar a evitar overfitting, mas muito alto pode prejudicar a capacidade do modelo de ajustar os dados de treinamento.
- **userCol:**
    - Descrição: Nome da coluna no DataFrame de entrada que contém IDs de usuário.
    - Explicação: Esta é a coluna que contém a identificação única de cada usuário para quem o sistema de recomendação fornecerá recomendações.
- **itemCol:**
    - Descrição: Nome da coluna no DataFrame de entrada que contém IDs de itens (ou seja, produtos, filmes, etc.).
    - Explicação: Esta é a coluna que contém a identificação única de cada item que pode ser recomendado aos usuários.
- **ratingCol:**
    - Descrição: Nome da coluna no DataFrame de entrada que contém as classificações dadas pelos usuários aos itens.
    - Explicação: Esta é a coluna que contém as avaliações que os usuários deram aos itens, as quais o algoritmo tentará prever.
- **nonnegative:**
    - Descrição: Booleano que indica se os fatores latentes gerados pelo ALS devem ser restritos a valores não negativos.
    - Explicação: Definir isso como True garante que os fatores latentes não sejam negativos, o que pode ser útil em certos cenários de aplicação.
- **coldStartStrategy:**
    - Descrição: Estratégia usada para lidar com novos usuários ou itens durante o processo de recomendação.
    - Explicação: "drop" significa que o algoritmo ALS irá descartar quaisquer linhas no conjunto de dados de entrada que contenham novos usuários ou itens que não estavam presentes durante o treinamento.
        - Existem outras estratégias como "nan" ou "drop" que podem ser úteis dependendo do contexto.

Esses parâmetros são ajustados de acordo com a natureza dos dados e os requisitos do sistema de recomendação para obter os melhores resultados possíveis.

- https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS

</font>
</td>

<font face="Segoe Print" size="3" color='#66666'>

- Ajuste o modelo nos dados de treinamento.

In [15]:
rec_model=rec.fit(train)

<font face="Segoe Print" size="3" color='#66666'>

- Fazendo previsões no conjunto de teste.

In [16]:
predicted_ratings=rec_model.transform(test)

predicted_ratings.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new,prediction
0,833,Heavenly Creatures (1994),4,463.0,3.409583
1,137,Kull the Conqueror (1997),5,471.0,4.708867
2,451,Kull the Conqueror (1997),1,471.0,3.222012
3,451,Kull the Conqueror (1997),2,471.0,3.222012
4,296,Much Ado About Nothing (1993),5,148.0,4.607786


## <font face="Segoe Print" size="4" color='#334CFF'>Avalie o treinamento.</font>

<font face="Segoe Print" size="3" color='#66666'>


<font face="Segoe Print" size="3" color='#66666'>
    
- Importando Regression Evaluator para medir RMSE

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator

<font face="Segoe Print" size="3" color='#66666'>

- Criando um objeto avaliador Regressor para medir a acurácia

In [18]:
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

<font face="Segoe Print" size="3" color='#66666'>

- Aplicando o RE no dataframe de previsões para calcular RMSE

In [19]:
rmse=evaluator.evaluate(predicted_ratings)

<font face="Segoe Print" size="3" color='#66666'>

- Imprimir o erro RMSE

In [20]:
print(rmse)

1.0230161577440855


## <font face="Segoe Print" size="4" color='#334CFF'>Após o treinamento, agora é a hora de recomendar os melhores filmes que o usuário possa gostar.</font>

<font face="Segoe Print" size="3" color='#66666'>


<font face="Segoe Print" size="3" color='#66666'>

- Primeiro precisamos criar um um conjunto de dados de todos os filmes distintos.

In [21]:
unique_movies=indexed.select('title_new').distinct()

<font face="Segoe Print" size="3" color='#66666'>

- Criar uma função para recomendar os melhores filmes para qualquer usuário específico.

In [22]:
def top_movies(user_id,n):
    """
    Esta função retorna os principais filmes que o usuário ainda não viu, mas que pode gostar.
    
    """
    # Atribuindo o nome alternativo 'a' aos filmes do dataframe.
    a = unique_movies.alias('a')
    
    # Criando outro dataframe que contém filmes já assistidos pelo usuário.
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    # Atribuindo o nome alternativo 'b' aos filmes assistidos do dataframe.
    b=watched_movies.alias('b')
    
    # Juntando as duas tabelas na junção pela esquerda
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    # Selecionando filmes que o usuário ativo ainda não avaliou ou assistiu.
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    # Adicionando nova coluna de user_id do usuário ativo aos filmes restantes do dataframe.
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    # Fazendo recomentações usando o modelo de recomendações ALS e selecionando apenas aos principais filmes.
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    # Adicionando colunas de títulos de filmes nas recomendações.
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    # Retornar as recomendações ao usuário.
    return final_recommendations.show(n,False)

<font face="Segoe Print" size="3" color='#66666'>

- Filmes assistindos por usuário de Id=60

In [23]:
test.filter(test.userId == 60).show(5)

+------+--------------------+------+---------+
|userId|               title|rating|title_new|
+------+--------------------+------+---------+
|    60|      Alien 3 (1992)|     4|    334.0|
|    60|         Babe (1995)|     3|     96.0|
|    60|Belle de jour (1967)|     5|    732.0|
|    60|   Birds, The (1963)|     4|    180.0|
|    60| Blade Runner (1982)|     5|     52.0|
+------+--------------------+------+---------+
only showing top 5 rows



<font face="Segoe Print" size="3" color='#66666'>

- Testando - Recomende 5 filmes para usuário de id=60.

In [24]:
top_movies(60,5)

+---------+------+----------+------------------------------------------------------------------+
|title_new|userId|prediction|title                                                             |
+---------+------+----------+------------------------------------------------------------------+
|1517.0   |60    |6.2218757 |Slingshot, The (1993)                                             |
|1288.0   |60    |5.866636  |Whole Wide World, The (1996)                                      |
|1207.0   |60    |5.700029  |Aparajito (1956)                                                  |
|1103.0   |60    |5.6819525 |Stalker (1979)                                                    |
|892.0    |60    |5.5139546 |Double vie de V�ronique, La (Double Life of Veronique, The) (1991)|
+---------+------+----------+------------------------------------------------------------------+



<font face="Segoe Print" size="3" color='#66666'>

- Filmes assistindos por usuário de Id=9999

In [25]:
test.filter(test.userId == 9999).show()

+------+-----+------+---------+
|userId|title|rating|title_new|
+------+-----+------+---------+
+------+-----+------+---------+



<font face="Segoe Print" size="3" color='#66666'>
 
- Testando - Recomende 5 filmes para usuário de id=9999.

In [26]:
top_movies(9999,5)

+---------+------+----------+---------------------------------------+
|title_new|userId|prediction|title                                  |
+---------+------+----------+---------------------------------------+
|1057.0   |9999  |5.471543  |Safe (1995)                            |
|1277.0   |9999  |5.3113337 |Mina Tannenbaum (1994)                 |
|1198.0   |9999  |5.2031913 |Pather Panchali (1955)                 |
|691.0    |9999  |5.1957664 |Some Folks Call It a Sling Blade (1993)|
|107.0    |9999  |4.92759   |Godfather: Part II, The (1974)         |
+---------+------+----------+---------------------------------------+



In [27]:
indexed.select('title').distinct().show(truncate=False)

+--------------------------------------+
|title                                 |
+--------------------------------------+
|Annie Hall (1977)                     |
|Heavenly Creatures (1994)             |
|Psycho (1960)                         |
|Snow White and the Seven Dwarfs (1937)|
|Night of the Living Dead (1968)       |
|When We Were Kings (1996)             |
|If Lucy Fell (1996)                   |
|Fair Game (1995)                      |
|Three Wishes (1995)                   |
|Cosi (1996)                           |
|Paris, France (1993)                  |
|Spanking the Monkey (1994)            |
|I'll Do Anything (1994)               |
|Mondo (1996)                          |
|Evil Dead II (1987)                   |
|Threesome (1994)                      |
|Last Action Hero (1993)               |
|Reality Bites (1994)                  |
|Colonel Chabert, Le (1994)            |
|Blue Chips (1994)                     |
+--------------------------------------+
only showing top

<font face="Segoe Print" size="3" color='#66666'>    

- https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html

In [28]:
# Salve o DataFrame em um arquivo CSV
indexed.limit(3).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0


<font face="Segoe Print" size="3" color='#66666'>

- Convertendo o DataFrame Spark para um DataFrame Pandas

In [29]:
pandas_df = indexed.select('title').distinct().toPandas()

<font face="Segoe Print" size="3" color='#66666'>

- Salvando os valores distintos da coluna 'title' em um arquivo CSV usando o Pandas

In [30]:
pandas_df.to_csv("data/lista_de_filmes.csv", index=False, )

In [31]:
%reload_ext watermark
%watermark -a "Roberto Soares - in/roberto-soares-full-stack-data-scientist"

Author: Data Science Academy



In [32]:
%watermark -v -m

Python implementation: CPython
Python version       : 3.11.5
IPython version      : 8.20.0

Compiler    : MSC v.1916 64 bit (AMD64)
OS          : Windows
Release     : 10
Machine     : AMD64
Processor   : Intel64 Family 6 Model 158 Stepping 9, GenuineIntel
CPU cores   : 4
Architecture: 64bit



In [33]:
%watermark --iversions

matplotlib: 3.8.0
seaborn   : 0.12.2
sklearn   : 1.2.2
pandas    : 2.1.4
numpy     : 1.26.3



<font face="Segoe Print" size="5" color='#66666'>

Fim 