<a href="https://colab.research.google.com/github/rgprado/computacao_nuvem/blob/main/ComputacaoEmNuvem_RodrigoPrado.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sobre os dados

O arquivo CSV 'movies' contém o identificador e o nome do filme. O arquivo 'customers_rating' contém informações sobre ID do cliente, nota dada ao filme, qual a data da avaliação e o ID do filme.

**Descrição das colunas:**

Base 1:  
ID do filme  
título e ano de lançamento

Base 2:  
Cust_Id: ID do customer que fez a avaliação  
Rating: avaliação (nota)  
Date: data da avaliação  
Movie_Id: ID do filme  


**Amostra:**  
**movies:**  
1;(Dinosaur Planet, 2003)  
2;(Isle of Man TT 2004 Review, 2004)  
3;(Character, 1997)


**customers_rating:**  
1488844;3.0;2005-09-06;1  
822109;5.0;2005-05-13;1  
885013;4.0;2005-10-19;1

**Nome do arquivo CSV:**  
data/movies.csv  
data/customers_rating.csv

## Sobre as questões
Base1 - Movies: https://drive.google.com/file/d/1gLsCjaMrL91ECdThq58cZAzB9tPxG18g/view?usp=sharing

Base2 - Ratings: https://drive.google.com/file/d/1C_T1w8fc7Oa8MeTo4LMTEcv90IfEOS-6/view?usp=sharing

## Instalação das Libs e imports

In [1]:
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

tar: spark-3.0.1-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '\
    --driver-memory 4G \
    --executor-memory 4G \
    pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime

In [3]:
spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()
spark

## Leitura dos arquivos csv

In [4]:
movies_df = spark.read.csv(f'data/movies.csv', header=False, inferSchema=True, sep=';').toDF('Movie_Id', 'Title')
movies_df.show(5)

+--------+--------------------+
|Movie_Id|               Title|
+--------+--------------------+
|       1|(Dinosaur Planet,...|
|       2|(Isle of Man TT 2...|
|       3|   (Character, 1997)|
|       4|(Paula Abdul's Ge...|
|       5|(The Rise and Fal...|
+--------+--------------------+
only showing top 5 rows



In [5]:
movies_df.printSchema()

root
 |-- Movie_Id: integer (nullable = true)
 |-- Title: string (nullable = true)



In [6]:
ratings_df = spark.read.csv(f'data/customers_rating.csv', header=True, inferSchema=True, sep=';')
ratings_df = ratings_df.na.drop("any")
ratings_df.show(5)

+-------+------+----------+--------+
|Cust_Id|Rating|      Date|Movie_Id|
+-------+------+----------+--------+
|1488844|   3.0|2005-09-06|       1|
| 822109|   5.0|2005-05-13|       1|
| 885013|   4.0|2005-10-19|       1|
|  30878|   4.0|2005-12-26|       1|
| 823519|   3.0|2004-05-03|       1|
+-------+------+----------+--------+
only showing top 5 rows



In [7]:
ratings_df.printSchema()

root
 |-- Cust_Id: integer (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Movie_Id: integer (nullable = true)



In [8]:
ratings_df.count()

11639306

# Exercícios

## 1.1. Quantos filmes estão disponíveis no dataset?


In [9]:
# Fiz um count de valores distintos no dataset para saber a quantidade de filmes
print("Total de filmes:", movies_df.distinct().count())

Total de filmes: 4499


## 1.2. Qual é o nome dos 5 filmes com melhor média de avaliação?


In [10]:
# No dataset ratings eu consigo saber quais os 5 mais bem avaliados, agroupando pelo o 'Movie_ID' e usando a média das notas
# mas é preciso fazer um merge no dataset 'movies' para saber o nome dos filmes
%%time
df = ratings_df.groupby('Movie_Id')\
    .mean()\
    .select(['Movie_Id','avg(Rating)' ])\
    .withColumnRenamed('avg(Rating)', 'Média')\
    .orderBy(F.desc('Média'))
df.show(5)

+--------+-----------------+
|Movie_Id|            Média|
+--------+-----------------+
|    2102|4.581295988606693|
|      13|            4.552|
|    1476|4.469693127060614|
|    2019|           4.4675|
|    1418|4.464824120603015|
+--------+-----------------+
only showing top 5 rows

CPU times: user 177 ms, sys: 27.1 ms, total: 204 ms
Wall time: 28 s


In [11]:
%%time
print('Filmes com melhores avaliações')
df.toPandas()\
    .merge(
        movies_df.select('Movie_Id', 'title').toPandas(), 
        on='Movie_Id', 
        how='inner'
    ).head(5)


Filmes com melhores avaliações
CPU times: user 740 ms, sys: 93.5 ms, total: 834 ms
Wall time: 29.1 s


Unnamed: 0,Movie_Id,Média,title
0,2102,4.581296,"(The Simpsons: Season 6, 1994)"
1,13,4.552,(Lord of the Rings: The Return of the King: Ex...
2,1476,4.469693,"(Six Feet Under: Season 4, 2004)"
3,2019,4.4675,"(Samurai Champloo, 2004)"
4,1418,4.464824,(Inu-Yasha: The Movie 3: Swords of an Honorabl...


## 1.3. Quais os 9 anos com menos lançamentos de filmes?


In [12]:
# Realizaei um split na coluna de titulo para separar nome do filme e ano, foi necessario quebrar porque alguns filmes tem anos no nome
# ex. 2090;(Cleopatra 2525: The Complete Series, 2000)
# ex. 580;(THX 1138: Special Edition, 1971)
%%time
split_cols = F.split(movies_df["title"], ',')

movies_df = movies_df.withColumn('Filme', split_cols.getItem(0)) \
    .withColumn('Ano', split_cols.getItem(1))

movies_df.show()

+--------+--------------------+--------------------+------+
|Movie_Id|               Title|               Filme|   Ano|
+--------+--------------------+--------------------+------+
|       1|(Dinosaur Planet,...|    (Dinosaur Planet| 2003)|
|       2|(Isle of Man TT 2...|(Isle of Man TT 2...| 2004)|
|       3|   (Character, 1997)|          (Character| 1997)|
|       4|(Paula Abdul's Ge...|(Paula Abdul's Ge...| 1994)|
|       5|(The Rise and Fal...|(The Rise and Fal...| 2004)|
|       6|        (Sick, 1997)|               (Sick| 1997)|
|       7|       (8 Man, 1992)|              (8 Man| 1992)|
|       8|(What the #$*! Do...|(What the #$*! Do...| 2004)|
|       9|(Class of Nuke 'E...|(Class of Nuke 'E...| 1991)|
|      10|     (Fighter, 2001)|            (Fighter| 2001)|
|      11|(Full Frame: Docu...|(Full Frame: Docu...| 1999)|
|      12|(My Favorite Brun...|(My Favorite Brun...| 1947)|
|      13|(Lord of the Ring...|(Lord of the Ring...| 2003)|
|      14|(Nature: Antarcti...| (Nature:

In [13]:
# Criei uma regex simples para retornar o ano como inteiro, ex. '1982)' --> 1982, baseado na tabela da celula acima
%%time
regex = "\d{4}"
movies_df = movies_df.withColumn("date_col", F.regexp_extract(movies_df["Ano"], regex, 0)) 
movies_df = movies_df.withColumn("date_col", F.to_date(movies_df["date_col"])) 
movies_df = movies_df.withColumn("Lancamento", F.year(movies_df["date_col"]))
movies_df = movies_df.drop('date_col', 'Filme', 'Ano')
movies_df.show(5)

movies_df.printSchema()

+--------+--------------------+----------+
|Movie_Id|               Title|Lancamento|
+--------+--------------------+----------+
|       1|(Dinosaur Planet,...|      2003|
|       2|(Isle of Man TT 2...|      2004|
|       3|   (Character, 1997)|      1997|
|       4|(Paula Abdul's Ge...|      1994|
|       5|(The Rise and Fal...|      2004|
+--------+--------------------+----------+
only showing top 5 rows

root
 |-- Movie_Id: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Lancamento: integer (nullable = true)

CPU times: user 10.4 ms, sys: 1.11 ms, total: 11.5 ms
Wall time: 322 ms


In [14]:
# Entao é só agrupar pelo ano e fazendo um count de movies pelo ano, dei um orderBy ASC no count
%%time
print('Anos com menos lancamentos:')
movies_df\
  .groupby('Lancamento')\
  .count()\
  .orderBy(F.asc('count'))\
  .show(9)

Anos com menos lancamentos:
+----------+-----+
|Lancamento|count|
+----------+-----+
|      1926|    1|
|      1922|    1|
|      1917|    1|
|      1915|    1|
|      1924|    2|
|      1918|    2|
|      1916|    2|
|      1931|    2|
|      1929|    2|
+----------+-----+
only showing top 9 rows

CPU times: user 14.1 ms, sys: 2.46 ms, total: 16.6 ms
Wall time: 830 ms


## 1.4. Quantos filmes que possuem avaliação maior ou igual a 4.7, considerando apenas os filmes avaliados na última data de avaliação do dataset?
  

In [15]:
# Para essa questao, (1) foi necessario primeiro achar qual a ultima avaliacao de cada filme
%%time
datas_recentes = ratings_df.groupBy("Movie_Id").agg(F.max("Date").alias("ultima_data"))
datas_recentes = datas_recentes.withColumnRenamed("Movie_Id", "Mov_Id") # essa linha foi para renomear a coluna para nao gerar duplucidade a hora do join

# (2) fiz o join com o dataset ratings para saber qual foi a avaliacao para a ultima data
ultimas_avaliacoes = ratings_df.join(datas_recentes, (ratings_df["Movie_Id"] == datas_recentes["Mov_Id"]) & (ratings_df["Date"] == datas_recentes["ultima_data"]))

# (3) achar quais filmes tiveram avaliacao maiore que 4.7
melhores_filmes = ultimas_avaliacoes.filter(ultimas_avaliacoes["Rating"] >= 4.7)

# (4) fazer a contagem da quantidade fe filmes
count_filmes = melhores_filmes.count()

print("Número de filmes com avaliação maior ou igual a 4.7: ", count_filmes)


Número de filmes com avaliação maior ou igual a 4.7:  1196
CPU times: user 289 ms, sys: 33.8 ms, total: 323 ms
Wall time: 46.8 s


## 1.5. Dos filmes encontrados na questão anterior, quais são os 10 filmes com as piores notas e quais as notas?


In [16]:
# Quando o filtro da questao 4 foi aplicado, so teve resultados com nota 5, entao, todos os 178 resultados esta com nota 5
%%time
asc_melhores = melhores_filmes\
  .select('Movie_Id', 'Rating')\
  .orderBy(F.asc('Rating'))

print('Os 10 filmes com as piores notas e quais as notas:')
asc_melhores.toPandas()\
    .merge(
        movies_df.select('Movie_Id', 'title').toPandas(), 
        on='Movie_Id', 
        how='inner'
    ).head(10)

Os 10 filmes com as piores notas e quais as notas:
CPU times: user 378 ms, sys: 44.1 ms, total: 423 ms
Wall time: 49.4 s


Unnamed: 0,Movie_Id,Rating,title
0,56,5.0,"(Carandiru, 2004)"
1,206,5.0,(Unconstitutional: The War on Our Civil Libert...
2,555,5.0,"(Cadfael: A Morbid Taste for Bones, 1996)"
3,498,5.0,"(Glory: Bonus Material, 1989)"
4,909,5.0,"(Too Young to Die, 1990)"
5,380,5.0,"(Live Wire, 1992)"
6,432,5.0,"(Les Miserables in Concert, 1996)"
7,432,5.0,"(Les Miserables in Concert, 1996)"
8,433,5.0,"(Untamed Heart, 1993)"
9,118,5.0,"(Rambo: First Blood Part II, 1985)"


## 1.6. Quais os id's dos 5 customers que mais avaliaram filmes e quantas avaliações cada um fez?

In [17]:
# essa questao da para ser resolvida somente com o dataset de ratings, fazendo um groupBy pelo Cust_Id 
# e fazendo o count da quantidade de avaliacoes desse usuario. Renomiei as colunas para melhor visualizacao da tabela final
%%time
print('Os 5 customers que mais avaliaram filmes e quantas avaliações cada um fez')
ratings_df\
  .withColumnRenamed('Cust_Id', 'Customer_IDs')\
  .groupby('Customer_IDs')\
  .count()\
  .withColumnRenamed('count', 'Avaliacoes')\
  .orderBy(F.desc('Avaliacoes'))\
  .show(5)

Os 5 customers que mais avaliaram filmes e quantas avaliações cada um fez
+------------+----------+
|Customer_IDs|Avaliacoes|
+------------+----------+
|      305344|      2190|
|      387418|      2172|
|     2439493|      2053|
|     1664010|      1975|
|     2118461|      1853|
+------------+----------+
only showing top 5 rows

CPU times: user 225 ms, sys: 25.4 ms, total: 251 ms
Wall time: 36.9 s
