In [118]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [119]:
!pip install --upgrade pyspark



In [120]:
import os
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '\
    --driver-memory 2G \
    --executor-memory 2G \
    pyspark-shell'

In [133]:
spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()
data_movie = spark.read.csv('drive/MyDrive/computacaoNuvem/movies.csv', sep=';', header=None, inferSchema=True)
data_movie = data_movie.withColumnRenamed('_c0', 'id')
data_movie = data_movie.withColumnRenamed('_c1', 'descricao')
data_movie.printSchema()


spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()
data_customers_rating = spark.read.csv('drive/MyDrive/computacaoNuvem/customers_rating.csv', sep=';', header=True, inferSchema=True)
data_customers_rating.printSchema()

root
 |-- id: integer (nullable = true)
 |-- descricao: string (nullable = true)

root
 |-- Cust_Id: integer (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Movie_Id: integer (nullable = true)



In [134]:
data_movie.createOrReplaceTempView("tab_movies") # cria uma view = Tabela temporaria

data_customers_rating.createOrReplaceTempView("tab_customers_rating") # cria uma view = Tabela temporaria

In [135]:
data_movie.show(20)

+---+--------------------+
| id|           descricao|
+---+--------------------+
|  1|(Dinosaur Planet,...|
|  2|(Isle of Man TT 2...|
|  3|   (Character, 1997)|
|  4|(Paula Abdul's Ge...|
|  5|(The Rise and Fal...|
|  6|        (Sick, 1997)|
|  7|       (8 Man, 1992)|
|  8|(What the #$*! Do...|
|  9|(Class of Nuke 'E...|
| 10|     (Fighter, 2001)|
| 11|(Full Frame: Docu...|
| 12|(My Favorite Brun...|
| 13|(Lord of the Ring...|
| 14|(Nature: Antarcti...|
| 15|(Neil Diamond: Gr...|
| 16|   (Screamers, 1996)|
| 17|   (7 Seconds, 2005)|
| 18|(Immortal Beloved...|
| 19|(By Dawn's Early ...|
| 20|(Seeta Aur Geeta,...|
+---+--------------------+
only showing top 20 rows



In [136]:
data_customers_rating.show(20)

+-------+------+----------+--------+
|Cust_Id|Rating|      Date|Movie_Id|
+-------+------+----------+--------+
|1488844|   3.0|2005-09-06|       1|
| 822109|   5.0|2005-05-13|       1|
| 885013|   4.0|2005-10-19|       1|
|  30878|   4.0|2005-12-26|       1|
| 823519|   3.0|2004-05-03|       1|
| 893988|   3.0|2005-11-17|       1|
| 124105|   4.0|2004-08-05|       1|
|1248029|   3.0|2004-04-22|       1|
|1842128|   4.0|2004-05-09|       1|
|2238063|   3.0|2005-05-11|       1|
|1503895|   4.0|2005-05-19|       1|
|2207774|   5.0|2005-06-06|       1|
|2590061|   3.0|2004-08-12|       1|
|   2442|   3.0|2004-04-14|       1|
| 543865|   4.0|2004-05-28|       1|
|1209119|   4.0|2004-03-23|       1|
| 804919|   4.0|2004-06-10|       1|
|1086807|   3.0|2004-12-28|       1|
|1711859|   4.0|2005-05-08|       1|
| 372233|   5.0|2005-11-23|       1|
+-------+------+----------+--------+
only showing top 20 rows



## Explorando os datasets

# 1.1Quantos filmes estão disponíveis no dataset?

In [140]:
data_movie.count()

4499

# 1.2 Qual é o nome dos 5 filmes com melhor média de avaliação?


In [144]:
from pyspark.sql.functions import col, avg
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Realize uma junção entre os DataFrames
joined_data = data_movie.join(data_customers_rating, data_movie["id"] == data_customers_rating["Movie_Id"], "inner")

# Calcule a média de classificação para cada filme
avg_ratings = joined_data.groupBy("id", "descricao").agg(avg("Rating").alias("AvgRating"))

# Crie uma janela para classificar os filmes com base na média de avaliação
windowSpec = Window.orderBy(col("AvgRating").desc())

# Adicione uma coluna de classificação com base na média de avaliação
ranked_data = avg_ratings.withColumn("Rank", rank().over(windowSpec))

# Selecione os 5 melhores filmes
top_5_movies = ranked_data.filter(col("Rank") <= 5)

# Exiba os nomes dos 5 melhores filmes
top_5_movies.select("descricao", "AvgRating").show()


+--------------------+------------------+
|           descricao|         AvgRating|
+--------------------+------------------+
|(Lost: Season 1, ...|4.6709891019450955|
|(Ghost in the She...| 4.586363636363636|
|(The Simpsons: Se...| 4.581295988606693|
|   (Inu-Yasha, 2000)| 4.554434413170473|
|(Lord of the Ring...|             4.552|
+--------------------+------------------+



# 1.3 Quais os 9 anos com menos lançamentos de filmes?

In [147]:
from pyspark.sql.functions import year


# Extrai o ano da coluna "Date" no DataFrame data_customers_rating
data_customers_rating = data_customers_rating.withColumn("Ano", year(data_customers_rating["Date"]))

# Conte quantos filmes foram lançados em cada ano
contagem_por_ano = data_customers_rating.groupBy("Ano").count()

# Classifique os anos em ordem crescente de lançamentos
anos_menos_lancamentos = contagem_por_ano.orderBy("count").limit(9)

# Exiba os 9 anos com menos lançamentos de filmes
anos_menos_lancamentos.show()



+----+--------+
| Ano|   count|
+----+--------+
|1999|     426|
|2000|  193255|
|2001|  370691|
|2002|  959548|
|2003| 2397989|
|2004| 7569628|
|2005|12562227|
+----+--------+



# 1.4 Quantos filmes que possuem avaliação maior ou igual a 4.7, considerando apenas os filmes avaliados na última data de avaliação do dataset?

In [148]:
from pyspark.sql.functions import max

# Encontre a última data de avaliação no DataFrame data_customers_rating
ultima_data_avaliacao = data_customers_rating.select(max("Date")).collect()[0][0]

# Filtrar os filmes com avaliação maior ou igual a 4.7 na última data de avaliação
filmes_filtrados = data_customers_rating.filter((data_customers_rating["Date"] == ultima_data_avaliacao) & (data_customers_rating["Rating"] >= 4.7))

# Conte o número de filmes que atendem aos critérios
numero_de_filmes = filmes_filtrados.select("Movie_Id").distinct().count()

# Exiba o resultado
print("Número de filmes com avaliação maior ou igual a 4.7 na última data de avaliação:", numero_de_filmes)


Número de filmes com avaliação maior ou igual a 4.7 na última data de avaliação: 780


# 1.5 Dos filmes encontrados na questão anterior, quais são os 10 filmes com as piores notas e quais as notas?

In [149]:
from pyspark.sql.functions import max, col

# Encontre a última data de avaliação no DataFrame data_customers_rating
ultima_data_avaliacao = data_customers_rating.select(max("Date")).collect()[0][0]

# Filtrar os filmes com avaliação maior ou igual a 4.7 na última data de avaliação
filmes_filtrados = data_customers_rating.filter((data_customers_rating["Date"] == ultima_data_avaliacao) & (data_customers_rating["Rating"] >= 4.7))

# Ordene os filmes por nota em ordem crescente
filmes_piores_notas = filmes_filtrados.orderBy("Rating").limit(10)

# Exiba os 10 filmes com as piores notas e as notas
filmes_piores_notas.select("Movie_Id", "Rating").show()


+--------+------+
|Movie_Id|Rating|
+--------+------+
|    1026|   5.0|
|    3756|   5.0|
|    2862|   5.0|
|    3769|   5.0|
|    1939|   5.0|
|    3769|   5.0|
|    2862|   5.0|
|    3777|   5.0|
|       8|   5.0|
|    3782|   5.0|
+--------+------+



# 1.6 Quais os id's dos 5 customers que mais avaliaram filmes e quantas avaliações cada um fez?

In [150]:
from pyspark.sql.functions import count

# Agrupe as avaliações por Customer ID e conte quantas avaliações cada cliente fez
contagem_avaliacoes = data_customers_rating.groupBy("Cust_Id").agg(count("*").alias("TotalAvaliacoes"))

# Ordene os clientes por número de avaliações em ordem decrescente
clientes_mais_avaliaram = contagem_avaliacoes.orderBy("TotalAvaliacoes", ascending=False).limit(5)

# Exiba os IDs dos 5 clientes que mais avaliaram e a quantidade de avaliações que cada um fez
clientes_mais_avaliaram.show()


+-------+---------------+
|Cust_Id|TotalAvaliacoes|
+-------+---------------+
| 305344|           4467|
| 387418|           4422|
|2439493|           4195|
|1664010|           4019|
|2118461|           3769|
+-------+---------------+

