**Sistema de recomendação de filmes da Netflix**


https://www.kaggle.com/code/jieyima/netflix-recommendation-collaborative-filtering/data

In [None]:
#!pip install -U scikit-learn
#!pip install pyspark
#!pip install findspark

In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
import scipy.stats
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains,explode

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [5]:
diretorio = "../dados/training_set/"

#Importando os arquivos para o Spark
MAX_MEMORY = "15g"

spark = SparkSession.builder.appName('loadNetflix') \
            .config('spark.master', 'local')\
            .config("spark.executor.memory", MAX_MEMORY) \
            .config("spark.driver.memory", MAX_MEMORY) \
            .getOrCreate()

schemaMovies = StructType() \
      .add("Movie_Id",IntegerType(), True) \
      .add("Year",StringType(), True) \
      .add("MovieName",StringType(), True)

schema = StructType() \
      .add("Movie_Id",IntegerType(), True) \
      .add("Cust_Id",IntegerType(), True) \
      .add("Rating",IntegerType(), True) \
      .add("Date",StringType(), True) \

# RDD é uma estrutura paralelizada do spark .rdd
df = spark.read.options(delimiter=',').option("header", False).schema(schema).csv(diretorio + "tratado/") 
dfMovies = spark.read.options(delimiter=';').option("header", False).schema(schemaMovies).csv("../dados/movie_titles_f.txt")

df = df.join(dfMovies, ['Movie_Id'], 'left')

In [5]:
df.count()

100480507

In [8]:
spark.conf.set('spark.sql.pivotMaxValues', u'50000')

In [6]:
dfDesenhos = dfMovies.filter( dfMovies.Movie_Id.isin([8743,16660, 976, 6001, 3079, 16222]) )

In [7]:
dfDesenhos.toPandas()

Unnamed: 0,Movie_Id,Year,MovieName
0,976,2003,Tom and Jerry: Paws for a Holiday
1,3079,1994,The Lion King: Special Edition
2,6001,1992,Tom and Jerry: The Movie
3,8743,2002,Ice Age
4,16222,2004,The Lion King 1 1/2
5,16660,1998,The Lion King II: Simba's Pride


In [9]:
(training, test) = df.randomSplit([0.8, 0.2]) #divide o df em porções para treinamento e teste

In [10]:
#Adicionando A Ero do Gelo e Rei Leao e Tom e Jerry
columns = ['Movie_Id', 'Cust_Id', 'Rating', 'Date']
vals = [ (8743, 9999991, 5, '2022-10-16'), \
         (16660, 9999991, 5, '2022-10-16'), \
         (976, 9999991, 5, '2022-10-16'), \
            
        (6001, 9999991, 5, '2022-10-16'), \
        (3079, 9999991, 5, '2022-10-16'), \
        (16222, 9999991, 5, '2022-10-16'), \
        ]
dfNewCust = spark.createDataFrame(vals, columns)
dfNewCust = dfNewCust.join(dfMovies, ['Movie_Id'], 'left')

training = training.union(dfNewCust)

In [11]:
#df.filter( df.Cust_Id == 9999991).show(10)
training.filter( training.Cust_Id == 9999991).toPandas() #toJSON().collect()

Unnamed: 0,Movie_Id,Cust_Id,Rating,Date,Year,MovieName
0,8743,9999991,5,2022-10-16,2002,Ice Age
1,16660,9999991,5,2022-10-16,1998,The Lion King II: Simba's Pride
2,976,9999991,5,2022-10-16,2003,Tom and Jerry: Paws for a Holiday
3,6001,9999991,5,2022-10-16,1992,Tom and Jerry: The Movie
4,3079,9999991,5,2022-10-16,1994,The Lion King: Special Edition
5,16222,9999991,5,2022-10-16,2004,The Lion King 1 1/2


In [12]:
#%%capture
#https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

als = ALS(maxIter=10, regParam=0.01, userCol="Cust_Id", itemCol="Movie_Id", ratingCol="Rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [13]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8521370169496323


In [14]:
#topredict = test[ test['Cust_Id'] == 134 ]
topredict = training[ training['Cust_Id'] == 9999991 ] 

predictions = model.transform(topredict)
#predUsuario = predictions.filter( predictions.prediction > 0  ) \
                        # .sort([ col('Cust_Id'), col('prediction') ],ascending= [0,0] ).toPandas()
predictions.toPandas()

Unnamed: 0,Movie_Id,Cust_Id,Rating,Date,Year,MovieName,prediction
0,8743,9999991,5,2022-10-16,2002,Ice Age,4.701352
1,976,9999991,5,2022-10-16,2003,Tom and Jerry: Paws for a Holiday,4.851564
2,6001,9999991,5,2022-10-16,1992,Tom and Jerry: The Movie,4.900865
3,16660,9999991,5,2022-10-16,1998,The Lion King II: Simba's Pride,5.017689
4,3079,9999991,5,2022-10-16,1994,The Lion King: Special Edition,5.002244
5,16222,9999991,5,2022-10-16,2004,The Lion King 1 1/2,5.039224


In [15]:
predictions = model.recommendForUserSubset(topredict.select('Cust_Id').distinct(), 20) \
                    .withColumn("rec_exp", explode("recommendations")) \
                    .select('Cust_Id', col("rec_exp.Movie_Id"), col("rec_exp.rating"))

In [16]:
dfPredictions = predictions.join(dfMovies, ['Movie_Id'], 'left') \
                         .join(topredict, ['Movie_Id'], 'left_anti') \
                         .filter( predictions.rating > 0  ) \
                         .sort( col('rating').desc() )

In [17]:
dfPredictions.toPandas()

Unnamed: 0,Movie_Id,Cust_Id,rating,Year,MovieName
0,14625,9999991,9.0741,2003,Good Charlotte: Video Collection
1,1730,9999991,8.878574,1995,Dragon Ball: Piccolo Jr. Saga: Part 1
2,8224,9999991,8.874534,2003,Dragon Ball Z: Fusion
3,8975,9999991,8.867136,1997,Dragon Ball Z: Vol. 2: The Saiyans
4,14428,9999991,8.848495,2003,Dragon Ball Z: Babidi
5,11744,9999991,8.846524,2002,Dragon Ball: Tien Shinhan Saga
6,17309,9999991,8.824564,2003,Dragon Ball Z: Majin Buu
7,9669,9999991,8.821672,2002,Dragon Ball Z: Kid Buu Saga
8,5319,9999991,8.788269,2002,Dragon Ball: Red Ribbon Army Saga
9,15195,9999991,8.772015,1999,Dragon Ball Z: Great Saiyaman: Gohan's Secret


In [26]:
#Tratamento para colocar na primeira coluna o Codigo do Filme ao inves da primeira linha
def trataArquivoPontuacao(numero):    
    arquivoOrigem = diretorio + "mv_" + str(numero).zfill(7) + ".txt"
    arquivoDestino = diretorio + "tratado/mv_" + str(numero).zfill(7) + ".txt"
    
    arquivo1 = open( arquivoOrigem, 'r') # Abra o arquivo (leitura)
    arquivo2 = open( arquivoDestino, 'w') # Abre novamente o arquivo (escrita)

    pulouLinha1 = False
    for item in arquivo1:
        if pulouLinha1:
            arquivo2.write(str(numero)+','+ item )
        else:
            pulouLinha1 = True

    arquivo1.close()
    arquivo2.close()

fim =  17770
for i in range(1, fim+1):
    trataArquivoPontuacao(i)

In [None]:
def trataArquivoFilmes():
    arquivo1 = open( 'dados/movie_titles.txt', 'r') # Abra o arquivo (leitura)
    arquivo2 = open( 'dados/movie_titles_f.txt', 'w') # Abre novamente o arquivo (escrita)

    for item in arquivo1:
    
        linha = list(item)
        if linha[1] == ",":
            linha[1] = ';'
            linha[6] = ';'
        else:
            if linha[2] == ',':
                linha[2] = ';'
                linha[7] = ';'
            else:
                if linha[3] == ',':
                    linha[3] = ';'
                    linha[8] = ';'
                else:
                    if linha[4] == ',':
                        linha[4] = ';'
                        linha[9] = ';'
                    else:
                        if linha[5] == ',':
                            linha[5] = ';'
                            linha[10] = ';'

        arquivo2.write(''.join(linha))    # escreva o conteúdo criado anteriormente nele.

    arquivo1.close()
    arquivo2.close()

trataArquivoFilmes()