# Trabalho Final

## Alunos

- Daniel Sena (dmms@cesar.school)

## Dataset

[Letterboxd (Movies Dataset)](https://www.kaggle.com/datasets/gsimonx37/letterboxd/data?select=movies.csv)

## Importando bibliotecas

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

## Inicializando spark  

In [None]:
spark = SparkSession.builder\
    .appName("ProjetoFinal")\
    .master("local[*]")\
    .config("spark.driver.memory", "1g")\
    .config("spark.executor.memory", "1g")\
    .config("spark.sql.shuffle.partitions", "64")\
    .getOrCreate()    

25/04/06 17:28:40 WARN Utils: Your hostname, dan-pc resolves to a loopback address: 127.0.1.1; using 192.168.0.184 instead (on interface wlp0s20f3)
25/04/06 17:28:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/06 17:28:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Função para leitura dos csv

In [3]:
def read_csv(nome_arquivo):
    caminho_completo = './data/' + nome_arquivo + ".csv"
    return spark.read.csv(caminho_completo, header=True, inferSchema=True)

## Análises

### Quantidade de Filmes por País

In [4]:
from pyspark.sql.functions import count, col
df_countries = read_csv("countries")
movies_by_country = df_countries.groupBy("country")\
    .agg(count("*").alias("qtd_movies"))\
    .orderBy(col("qtd_movies").desc())
movies_by_country.show()

+------------------+----------+
|           country|qtd_movies|
+------------------+----------+
|               USA|    174489|
|            France|     45725|
|                UK|     42914|
|             Japan|     41362|
|           Germany|     41325|
|            Canada|     23054|
|             India|     22393|
|             Italy|     19377|
|            Brazil|     17212|
|             Spain|     16860|
|            Mexico|     13261|
|             China|     13051|
|Russian Federation|     11323|
|       South Korea|     10975|
|              USSR|      9680|
|         Argentina|      9250|
|            Sweden|      8979|
|         Australia|      7815|
|       Philippines|      7703|
|         Hong Kong|      7027|
+------------------+----------+
only showing top 20 rows



### Gêneros por País

In [5]:
df_genre = read_csv("genres")

In [6]:
df_genre_country = df_countries.join(df_genre, "id", "inner")
df_genre_country.printSchema()
df_genre_country.show(5)

root
 |-- id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- genre: string (nullable = true)





+-------+-------+---------------+
|     id|country|          genre|
+-------+-------+---------------+
|1000018|     UK|         Action|
|1000018|     UK|      Adventure|
|1000018|     UK|Science Fiction|
|1000018|    USA|         Action|
|1000018|    USA|      Adventure|
+-------+-------+---------------+
only showing top 5 rows



                                                                                

In [7]:
df_genre_qtd_by_country = df_genre_country.groupBy("country", "genre").agg(count("*").alias("qtd_movies"))
df_genre_qtd_by_country.show(10)

[Stage 16:===>                                                    (1 + 15) / 16]

+-----------+---------+----------+
|    country|    genre|qtd_movies|
+-----------+---------+----------+
|         UK|      War|       958|
|     France|   Family|       997|
|    Ireland|    Drama|       939|
|      Italy|  History|       795|
|     Poland|  Romance|       301|
|     Brazil| Thriller|       698|
|South Korea|Animation|       573|
|    Czechia|Animation|       321|
|     Turkey|   Action|       318|
|      India|Adventure|       502|
+-----------+---------+----------+
only showing top 10 rows



                                                                                

In [8]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc

window_spec = Window.partitionBy("country").orderBy(desc("qtd_movies"))
ranked_genres_by_country = df_genre_qtd_by_country.withColumn("rank", row_number().over(window_spec))
most_popular_genre_by_country = ranked_genres_by_country.filter(col("rank")==1)
most_popular_genre_by_country.orderBy(col("qtd_movies").desc()).show(100)

+--------------------+-----------+----------+----+
|             country|      genre|qtd_movies|rank|
+--------------------+-----------+----------+----+
|                 USA|      Drama|     46837|   1|
|              France|      Drama|     14669|   1|
|             Germany|      Drama|     14418|   1|
|                  UK|      Drama|     12697|   1|
|               Japan|      Drama|     12409|   1|
|               India|      Drama|     11356|   1|
|               Italy|      Drama|      6554|   1|
|              Canada|      Drama|      6451|   1|
|               Spain|      Drama|      5883|   1|
|               China|      Drama|      5483|   1|
|              Brazil|Documentary|      5458|   1|
|         South Korea|      Drama|      5191|   1|
|              Mexico|      Drama|      4052|   1|
|                USSR|      Drama|      3956|   1|
|  Russian Federation|      Drama|      3917|   1|
|         Philippines|      Drama|      3558|   1|
|              Sweden|      Dra

### Top 3 Studios que mais produziram filmes por País

In [9]:
df_studios = read_csv("studios")
df_studios.show(10)

+-------+--------------------+
|     id|              studio|
+-------+--------------------+
|1000001|LuckyChap Enterta...|
|1000001|        Heyday Films|
|1000001|      NB/GG Pictures|
|1000001|              Mattel|
|1000001|Warner Bros. Pict...|
|1000002|        Barunson E&A|
|1000003|           IAC Films|
|1000003|                AGBO|
|1000003|Ley Line Entertai...|
|1000003|     Year of the Rat|
+-------+--------------------+
only showing top 10 rows



In [10]:
df_countries_studios = df_countries.join(df_studios, "id", "inner")
df_countries_studios.printSchema()
df_countries_studios.show(10)

root
 |-- id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- studio: string (nullable = true)

+-------+-------+--------------------+
|     id|country|              studio|
+-------+-------+--------------------+
|1000018|     UK|  Legendary Pictures|
|1000018|     UK|             Syncopy|
|1000018|     UK|Warner Bros. Pict...|
|1000018|    USA|  Legendary Pictures|
|1000018|    USA|             Syncopy|
|1000018|    USA|Warner Bros. Pict...|
|1000028|    USA|  Legendary Pictures|
|1000031|    USA|Hyperobject Indus...|
|1000031|    USA|Searchlight Pictures|
|1000031|    USA|Gary Sanchez Prod...|
+-------+-------+--------------------+
only showing top 10 rows



In [11]:
movie_by_country_and_studio = df_countries_studios.groupBy("country", "studio")\
    .agg(count("*").alias("qtd_movies"))
movie_by_country_and_studio.show(10)

+-----------+--------------------+----------+
|    country|              studio|qtd_movies|
+-----------+--------------------+----------+
|        USA|   American Zoetrope|        94|
|     France|                 SVT|        49|
|Switzerland|Imperative Entert...|         1|
|        USA|Pine District Pic...|         1|
|         UK|  Dune Entertainment|        22|
|      Japan|         Bitters End|        51|
|      Japan|Agency for Cultur...|         3|
|     Canada|   Anonymous Content|         7|
|        USA|Sony Pictures Ani...|        55|
|         UK|20th Century Studios|         4|
+-----------+--------------------+----------+
only showing top 10 rows



In [12]:
window_spec = Window.partitionBy("country").orderBy(desc("qtd_movies"))
top_studios_by_country = movie_by_country_and_studio.withColumn("rank", row_number().over(window_spec))
top_3_studios_by_country = top_studios_by_country.filter(col("rank") <= 3)
top_3_studios_by_country.orderBy(col("qtd_movies").desc()).show(10)

+-------+--------------------+----------+----+
|country|              studio|qtd_movies|rank|
+-------+--------------------+----------+----+
|     UK|                 BBC|      3056|   1|
|    USA|Warner Bros. Pict...|      3023|   1|
|    USA|   Columbia Pictures|      2798|   2|
|    USA| Metro-Goldwyn-Mayer|      2677|   3|
| Canada|           ONF | NFB|      2466|   1|
|Germany|                 ZDF|      2328|   1|
|  Japan|        Toei Company|      2328|   1|
|  Japan|Nikkatsu Corporation|      1864|   2|
|Germany|                 ARD|      1543|   2|
| France|                ARTE|      1470|   1|
+-------+--------------------+----------+----+
only showing top 10 rows



### Atores Mais Frequentes por Gênero de Filme

In [13]:
df_actors = read_csv("actors")
df_actors.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)



                                                                                

In [None]:
df_genre_selected = df_genre.select("id", "genre")
df_actors_selected = df_actors.select("id", "name")

# TODO: remover animation e documentary

In [15]:
df_actors_genre = df_actors_selected.join(df_genre_selected, "id", "inner")
df_freq_actor_genre = df_actors_genre.groupBy("genre", "name").agg(count("*").alias("qtd_movies")).filter(col("qtd_movies") >= 3)

In [22]:
window_spec = Window.partitionBy("genre").orderBy(desc("qtd_movies"))
top_3_actors_by_genre = df_freq_actor_genre.repartition(50, "genre")\
    .withColumn("rank", row_number().over(window_spec))\
    .filter(col("rank") == 1)\
    .select("genre", "name", "qtd_movies", "rank").orderBy("genre", "rank")\
    .show(10)

25/04/06 17:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on



25/04/06 17:30:02 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/06 17:30:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatc



25/04/06 17:30:04 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.








25/04/06 17:30:04 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
25/04/06 17:30:04 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.

+-----------+-----------------+----------+----+
|      genre|             name|qtd_movies|rank|
+-----------+-----------------+----------+----+
|     Action|           Nassar|       219|   1|
|  Adventure|     Frank Welker|       130|   1|
|  Animation|        Mel Blanc|      1012|   1|
|     Comedy|        Mel Blanc|       624|   1|
|      Crime|     Hideo Murota|       115|   1|
|Documentary|     Adolf Hitler|       221|   1|
|      Drama|Jagathy Sreekumar|       346|   1|
|     Family|     Frank Welker|       255|   1|
|    Fantasy|     Frank Welker|        87|   1|
|    History|     Adolf Hitler|       102|   1|
+-----------+-----------------+----------+----+
only showing top 10 rows



                                                                                

### Quantidade de Filmes Produzidos por Ano

In [17]:
df_movies = read_csv("movies")

In [18]:
from pyspark.sql.functions import year
df_movies_with_year = df_movies.withColumn("year", year(col("date")))
df_movies_with_year.show(10)

+-------+--------------------+----+--------------------+--------------------+--------------------+------+----+
|     id|                name|date|             tagline|         description|              minute|rating|year|
+-------+--------------------+----+--------------------+--------------------+--------------------+------+----+
|1000001|              Barbie|2023|She's everything....|Barbie and Ken ar...|                 114|  3.86|2023|
|1000002|            Parasite|2019|Act like you own ...|All unemployed, K...|                 133|  4.56|2019|
|1000003|Everything Everyw...|2022|The universe is s...|An aging Chinese ...|                 140|   4.3|2022|
|1000004|          Fight Club|1999|Mischief. Mayhem....|"A ticking-time-b...| until an eccentr...|   139|1999|
|1000005|          La La Land|2016|Here's to the foo...|Mia, an aspiring ...|                 129|  4.09|2016|
|1000006|         Oppenheimer|2023|The world forever...|The story of J. R...|                 181|  4.23|2023|
|

In [19]:
df_movies_by_year = df_movies_with_year.select("id", "year").dropDuplicates(["id", "year"])
df_movies_by_year.orderBy(col("year").desc()).show(10)

+-------+----+
|     id|year|
+-------+----+
|1011069|2031|
|1940010|2030|
|1449473|2030|
|1834707|2030|
|1010630|2029|
|1327612|2029|
|1504483|2029|
|1720560|2028|
|1582537|2028|
|1509533|2028|
+-------+----+
only showing top 10 rows



In [20]:
result = df_movies_by_year.groupBy("year").agg(count("*").alias("qtd_unique_movies")).orderBy(col("year").desc())
result.show(50)

+----+-----------------+
|year|qtd_unique_movies|
+----+-----------------+
|2031|                1|
|2030|                3|
|2029|                3|
|2028|                4|
|2027|               12|
|2026|               51|
|2025|              341|
|2024|            18974|
|2023|            43408|
|2022|            40304|
|2021|            39685|
|2020|            36700|
|2019|            35792|
|2018|            32378|
|2017|            30121|
|2016|            26782|
|2015|            25438|
|2014|            24173|
|2013|            22412|
|2012|            19971|
|2011|            18335|
|2010|            16747|
|2009|            16228|
|2008|            15205|
|2007|            14327|
|2006|            13870|
|2005|            12590|
|2004|            11392|
|2003|            10318|
|2002|             9226|
|2001|             8414|
|2000|             7704|
|1999|             7497|
|1998|             7085|
|1997|             6907|
|1996|             6431|
|1995|             6425|


In [21]:
#spark.stop()