# Análise de dados com PySpark

Neste notebook, vamos recriar com PySpark as análises feitas anteriormente com SQL no Hive.<br>
Para tanto, iremos:
1. Baixar e extrair os arquivos utilizados.
2. Configurar e iniciar o ambiente do PySpark.
3. Realizar as análises em PySpark, com as queries SQL servindo de referência comparativa.

### Baixando e extraindo os dados utilizados anteriormente no projeto de Infraestrutura Hadoop

Para este passo, prossiga com o upload do arquivo `download_and_extract.sh` que se encontra no repositório.

In [1]:
# Executa o script responsável pelo download e a extração dos dados utilizados
!bash download_and_extract.sh

Iniciando o processo de download e extração em Mon Oct  7 01:58:17 PM UTC 2024
Baixando o arquivo name.basics.tsv.gz...
--2024-10-07 13:58:17--  https://datasets.imdbws.com/name.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 3.167.212.55, 3.167.212.123, 3.167.212.68, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|3.167.212.55|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 273487929 (261M) [binary/octet-stream]
Saving to: ‘./datasets/name.basics.tsv.gz’


2024-10-07 13:58:19 (225 MB/s) - ‘./datasets/name.basics.tsv.gz’ saved [273487929/273487929]

O download do arquivo name.basics.tsv.gz foi realizado com sucesso.
Baixando o arquivo title.akas.tsv.gz...
--2024-10-07 13:58:19--  https://datasets.imdbws.com/title.akas.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 3.167.212.55, 3.167.212.123, 3.167.212.68, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|3.167.212.55|:443... connected.
HTTP request sen

## Configuração e inicialização do ambiente PySpark

In [2]:
# Instalando o pacote do PySpark no ambiente
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
# Importando e inicializando o PySpark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import split, col

spark = SparkSession \
        .builder \
        .appName("IMDb") \
        .master("local[*]") \
        .getOrCreate()

### Criação dos DataFrames

Vamos criar os DataFrames para cada um dos arquivos de dados.

#### Person

In [4]:
person_schema = StructType([
    StructField("nconst", StringType(), True),
    StructField("primaryName", StringType(), True),
    StructField("birthYear", IntegerType(), True),
    StructField("deathYear", IntegerType(), True),
    StructField("primaryProfession", StringType(), True),
    StructField("knownForTitles", StringType(), True)
])

df_person = spark.read.csv(
    './datasets/name.basics.tsv',
    header=True,
    sep="\t",
    schema=person_schema,
    multiLine=False
)

df_person = df_person.withColumn("primaryProfession", split(df_person["primaryProfession"], ","))
df_names = df_person.withColumn("knownForTitles", split(df_person["knownForTitles"], ","))

df_names.show(5, truncate=False)

+---------+---------------+---------+---------+--------------------------------------+--------------------------------------------+
|nconst   |primaryName    |birthYear|deathYear|primaryProfession                     |knownForTitles                              |
+---------+---------------+---------+---------+--------------------------------------+--------------------------------------------+
|nm0000001|Fred Astaire   |1899     |1987     |[actor, miscellaneous, producer]      |[tt0072308, tt0050419, tt0053137, tt0027125]|
|nm0000002|Lauren Bacall  |1924     |2014     |[actress, soundtrack, archive_footage]|[tt0037382, tt0075213, tt0117057, tt0038355]|
|nm0000003|Brigitte Bardot|1934     |NULL     |[actress, music_department, producer] |[tt0057345, tt0049189, tt0056404, tt0054452]|
|nm0000004|John Belushi   |1949     |1982     |[actor, writer, music_department]     |[tt0072562, tt0077975, tt0080455, tt0078723]|
|nm0000005|Ingmar Bergman |1918     |2007     |[writer, director, actor]    

#### Title translation

In [5]:
# title.akas.tsv
title_translation_schema = StructType([
    StructField("titleId", StringType(), True),
    StructField("ordering", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("region", StringType(), True),
    StructField("language", StringType(), True),
    StructField("types", StringType(), True),
    StructField("attributes", StringType(), True),
    StructField("isOriginalTitle", IntegerType(), True)
])

df_title_translation = spark.read.csv(
    './datasets/title.akas.tsv',
    header=True,
    sep="\t",
    schema=title_translation_schema,
    multiLine=False,
    nullValue=r"\N"
)

df_title_translation = df_title_translation.withColumn("types", split(df_title_translation["types"], ","))
df_title_translation = df_title_translation.withColumn("attributes", split(df_title_translation["attributes"], ","))

df_title_translation.show(5, truncate=False)

+---------+--------+-------------------------+------+--------+-------------+---------------+---------------+
|titleId  |ordering|title                    |region|language|types        |attributes     |isOriginalTitle|
+---------+--------+-------------------------+------+--------+-------------+---------------+---------------+
|tt0000001|1       |Carmencita               |NULL  |NULL    |[original]   |NULL           |1              |
|tt0000001|2       |Carmencita               |DE    |NULL    |NULL         |[literal title]|0              |
|tt0000001|3       |Carmencita               |US    |NULL    |[imdbDisplay]|NULL           |0              |
|tt0000001|4       |Carmencita - spanyol tánc|HU    |NULL    |[imdbDisplay]|NULL           |0              |
|tt0000001|5       |Καρμενσίτα               |GR    |NULL    |[imdbDisplay]|NULL           |0              |
+---------+--------+-------------------------+------+--------+-------------+---------------+---------------+
only showing top 5 

#### Title

In [6]:
title_schema = StructType([
    StructField("tconst", StringType(), True),
    StructField("titleType", StringType(), True),
    StructField("primaryTitle", StringType(), True),
    StructField("originalTitle", StringType(), True),
    StructField("isAdult", IntegerType(), True),
    StructField("startYear", IntegerType(), True),
    StructField("endYear", IntegerType(), True),
    StructField("runtimeMinutes", IntegerType(), True),
    StructField("genres", StringType(), True)
])

df_title = spark.read.csv(
    './datasets/title.basics.tsv',
    header=True,
    sep="\t",
    schema=title_schema,
    multiLine=False
)

df_title = df_title.withColumn("genres", split(df_title["genres"], ","))

df_title.show(5, truncate=False)

+---------+---------+----------------------+----------------------+-------+---------+-------+--------------+----------------------------+
|tconst   |titleType|primaryTitle          |originalTitle         |isAdult|startYear|endYear|runtimeMinutes|genres                      |
+---------+---------+----------------------+----------------------+-------+---------+-------+--------------+----------------------------+
|tt0000001|short    |Carmencita            |Carmencita            |0      |1894     |NULL   |1             |[Documentary, Short]        |
|tt0000002|short    |Le clown et ses chiens|Le clown et ses chiens|0      |1892     |NULL   |5             |[Animation, Short]          |
|tt0000003|short    |Pauvre Pierrot        |Pauvre Pierrot        |0      |1892     |NULL   |5             |[Animation, Comedy, Romance]|
|tt0000004|short    |Un bon bock           |Un bon bock           |0      |1892     |NULL   |12            |[Animation, Short]          |
|tt0000005|short    |Blacksmith Sc

#### Crew

In [7]:
crew_schema = StructType([
    StructField("tconst", StringType(), True),
    StructField("directors", StringType(), True),
    StructField("writers", StringType(), True)
])

df_crew = spark.read.csv(
    './datasets/title.crew.tsv',
    header=True,
    sep="\t",
    schema=crew_schema,
    multiLine=False,
    nullValue=r"\N"
)

df_crew = df_crew.withColumn("directors", split(df_crew["directors"], ","))
df_crew = df_crew.withColumn("writers", split(df_crew["writers"], ","))

df_crew.show(5, truncate=False)

+---------+-----------+-------+
|tconst   |directors  |writers|
+---------+-----------+-------+
|tt0000001|[nm0005690]|NULL   |
|tt0000002|[nm0721526]|NULL   |
|tt0000003|[nm0721526]|NULL   |
|tt0000004|[nm0721526]|NULL   |
|tt0000005|[nm0005690]|NULL   |
+---------+-----------+-------+
only showing top 5 rows



#### Episode

In [8]:
episode_schema = StructType([
    StructField("tconst", StringType(), True),
    StructField("parentTconst", StringType(), True),
    StructField("seasonNumber", IntegerType(), True),
    StructField("episodeNumber", IntegerType(), True)
])

df_episode = spark.read.csv(
    './datasets/title.episode.tsv',
    header=True,
    sep="\t",
    schema=episode_schema,
    multiLine=False,
)

df_episode.show(5, truncate=False)

+---------+------------+------------+-------------+
|tconst   |parentTconst|seasonNumber|episodeNumber|
+---------+------------+------------+-------------+
|tt0031458|tt32857063  |NULL        |NULL         |
|tt0041951|tt0041038   |1           |9            |
|tt0042816|tt0989125   |1           |17           |
|tt0042889|tt0989125   |NULL        |NULL         |
|tt0043426|tt0040051   |3           |42           |
+---------+------------+------------+-------------+
only showing top 5 rows



#### Principal

In [9]:
principal_schema = StructType([
    StructField("tconst", StringType(), True),
    StructField("ordering", IntegerType(), True),
    StructField("nconst", StringType(), True),
    StructField("category", StringType(), True),
    StructField("job", StringType(), True),
    StructField("characters", StringType(), True)
])

df_principal = spark.read.csv(
    './datasets/title.principals.tsv',
    header=True,
    sep="\t",
    schema=principal_schema,
    multiLine=False,
    nullValue=r"\N"
)

df_principal.show(5, truncate=False)

+---------+--------+---------+---------------+-----------------------+----------+
|tconst   |ordering|nconst   |category       |job                    |characters|
+---------+--------+---------+---------------+-----------------------+----------+
|tt0000001|1       |nm1588970|self           |NULL                   |["Self"]  |
|tt0000001|2       |nm0005690|director       |NULL                   |NULL      |
|tt0000001|3       |nm0005690|producer       |producer               |NULL      |
|tt0000001|4       |nm0374658|cinematographer|director of photography|NULL      |
|tt0000002|1       |nm0721526|director       |NULL                   |NULL      |
+---------+--------+---------+---------------+-----------------------+----------+
only showing top 5 rows



#### Rating

In [10]:
rating_schema = StructType([
    StructField("tconst", StringType(), True),
    StructField("averageRating", FloatType(), True),
    StructField("numVotes", IntegerType(), True)
])

df_rating = spark.read.csv(
    './datasets/title.ratings.tsv',
    header=True,
    sep="\t",
    schema=rating_schema,
    multiLine=False
)

df_rating.show(5, truncate=False)

+---------+-------------+--------+
|tconst   |averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|5.7          |2089    |
|tt0000002|5.6          |283     |
|tt0000003|6.5          |2097    |
|tt0000004|5.4          |183     |
|tt0000005|6.2          |2832    |
+---------+-------------+--------+
only showing top 5 rows



### Criação da tabela base para as análises posteriores

O recorte base é o conjunto de 250 títulos com maior quantidade de avaliações.<br>
Todas as 5 análises posteriores usarão esse conjunto de títulos, e por esse motivo optei por guardar os dados em forma de um DataFrame individual.

Código SQL referência
```sql
CREATE TABLE top_titles AS
    (
        SELECT
            t.tconst,
            t.primaryTitle,
            r.averageRating,
            r.numVotes,
            t.startYear,
            t.titleType,
            t.runtimeMinutes
        FROM rating AS r
        INNER JOIN title AS t
        USING(tconst)
        ORDER BY numVotes DESC
        LIMIT 250
    );
```

Implementação PySpark

In [11]:
# Executando o join entre df_rating e df_title, trazendo as colunas devidas
df_top_titles = df_rating.alias("r").join(
    df_title.alias("t"),
    on="tconst",
    how="inner"
).select(
    col("t.tconst"),
    col("t.primaryTitle"),
    col("r.averageRating"),
    col("r.numVotes"),
    col("t.startYear"),
    col("t.titleType"),
    col("t.runtimeMinutes")
).orderBy(col("r.numVotes").desc())

# Limitando aos 250 primeiros registros ordenados
df_top_titles = df_top_titles.limit(250)

# Mostrando 10 dos registros
df_top_titles.show(10, truncate=False)

+---------+------------------------+-------------+--------+---------+---------+--------------+
|tconst   |primaryTitle            |averageRating|numVotes|startYear|titleType|runtimeMinutes|
+---------+------------------------+-------------+--------+---------+---------+--------------+
|tt0111161|The Shawshank Redemption|9.3          |2947499 |1994     |movie    |142           |
|tt0468569|The Dark Knight         |9.0          |2927813 |2008     |movie    |152           |
|tt1375666|Inception               |8.8          |2599180 |2010     |movie    |148           |
|tt0137523|Fight Club              |8.8          |2379193 |1999     |movie    |139           |
|tt0944947|Game of Thrones         |9.2          |2350263 |2011     |tvSeries |60            |
|tt0109830|Forrest Gump            |8.8          |2305408 |1994     |movie    |142           |
|tt0110912|Pulp Fiction            |8.9          |2263378 |1994     |movie    |154           |
|tt0903747|Breaking Bad            |9.5          |

### Análise 1

Distribuição de quantidade de títulos por década e por tipo.

Código SQL referência
```sql
SELECT
    CAST(FLOOR(startYear / 10) * 10 AS INT) AS decade,
    titleType,
    COUNT(*) AS count
FROM top_titles
WHERE startYear IS NOT NULL
GROUP BY FLOOR(startYear / 10) * 10, titleType
ORDER BY FLOOR(startYear / 10) * 10 DESC, titleType DESC;
```



Implementação PySpark

In [12]:
from pyspark.sql import functions as F

# Transformação da query em PySpark
df_decades = df_top_titles.filter(F.col("startYear").isNotNull()) \
    .groupBy(
        (F.floor(F.col("startYear") / 10) * 10).cast("int").alias("decade"),
        F.col("titleType")
    ) \
    .agg(F.count("*").alias("count")) \
    .orderBy(F.col("decade").desc(), F.col("titleType").desc())

# Resultado
df_decades.show(truncate=False)

+------+------------+-----+
|decade|titleType   |count|
+------+------------+-----+
|2020  |movie       |5    |
|2010  |tvSeries    |10   |
|2010  |tvMiniSeries|1    |
|2010  |movie       |91   |
|2000  |tvSeries    |5    |
|2000  |movie       |73   |
|1990  |tvSeries    |1    |
|1990  |movie       |38   |
|1980  |movie       |12   |
|1970  |movie       |10   |
|1960  |movie       |3    |
|1950  |movie       |1    |
+------+------------+-----+



### Análise 2

Média da quantidade de idiomas para os quais os títulos foram traduzidos.

Código SQL referência
```sql
WITH translation_counts AS (
    SELECT
        titleId AS tconst,
        COUNT(*) AS translation_count
    FROM
        title_translation
    GROUP BY
        titleId
)

SELECT
    ROUND(AVG(tc.translation_count)) AS avg_translations
FROM
    top_titles tt
INNER JOIN
    translation_counts tc ON tt.tconst = tc.tconst;
```



Implementação PySpark

In [13]:
from pyspark.sql import functions as F

# Criando o DataFrame com a contagem de traduções
df_translation_counts = df_title_translation.groupBy("titleId") \
    .agg(F.count("*").alias("translation_count")) \
    .withColumnRenamed("titleId", "tconst")

# Realizando o join com df_top_titles
df_avg_translations = df_top_titles.alias("tt") \
    .join(df_translation_counts.alias("tc"), on="tconst", how="inner") \
    .agg(F.round(F.avg("tc.translation_count")).alias("avg_translations"))

# Resultado
df_avg_translations.show(truncate=False)

+----------------+
|avg_translations|
+----------------+
|70.0            |
+----------------+



### Análise 3

Top 10 atores e atrizes com mais participações.

Código SQL referência
```sql
WITH actor_appearances AS (
    SELECT
        pr.nconst AS actor_id,
        p.primaryName AS actor_name,
        COUNT(*) AS appearance_count
    FROM
        principal pr
    JOIN
        top_titles tt ON pr.tconst = tt.tconst
    JOIN
        person p ON pr.nconst = p.nconst
    WHERE
        pr.category in ('actor', 'actress')
    GROUP BY
        pr.nconst, p.primaryName
)

SELECT
    actor_name,
    appearance_count
FROM
    actor_appearances
ORDER BY
    appearance_count DESC
LIMIT 10;
```



Implementação PySpark

In [14]:
from pyspark.sql import functions as F

# Filtrando atores e atrizes em df_principal
df_actor_appearances = df_principal.filter(F.col("category").isin("actor", "actress")) \
    .join(df_top_titles, on="tconst", how="inner") \
    .join(df_person, on="nconst", how="inner") \
    .groupBy("nconst", "primaryName") \
    .agg(F.count("*").alias("appearance_count")) \
    .orderBy(F.col("appearance_count").desc())

# Selecionando os 10 principais atores/atrizes por número de aparições
df_top_actors = df_actor_appearances.select("primaryName", "appearance_count") \
    .limit(10)

# Resultado
df_top_actors.show(truncate=False)

+------------------+----------------+
|primaryName       |appearance_count|
+------------------+----------------+
|Robert Downey Jr. |17              |
|Scarlett Johansson|16              |
|Chris Evans       |15              |
|Samuel L. Jackson |14              |
|Mark Ruffalo      |13              |
|Don Cheadle       |10              |
|Brad Pitt         |10              |
|Natalie Portman   |10              |
|Leonardo DiCaprio |9               |
|Jeremy Renner     |9               |
+------------------+----------------+



### Análise 4

Top 5 diretores com mais participações e se estão vivos

Código SQL referência
```sql
WITH director_appearances AS (
    SELECT
        c.tconst,
        director AS nconst
    FROM
        crew c
    LATERAL VIEW explode(c.directors) d AS director
    WHERE
        c.tconst IN (SELECT tconst FROM top_titles)
),
director_count AS (
    SELECT
        da.nconst,
        COUNT(*) AS appearance_count
    FROM
        director_appearances da
    GROUP BY
        da.nconst
)

SELECT
    dc.nconst,
    p.primaryName AS director_name,
    dc.appearance_count,
    CASE
        WHEN p.deathYear IS NOT NULL THEN 'Deceased'
        ELSE 'Alive'
    END AS status
FROM
    director_count dc
JOIN
    person p ON dc.nconst = p.nconst
ORDER BY
    dc.appearance_count DESC
LIMIT 5;
```



Implementação PySpark

In [15]:
from pyspark.sql import functions as F

# Filtrando os títulos presentes em df_top_titles
df_director_appearances = df_crew.filter(F.col("tconst").isin([row["tconst"] for row in df_top_titles.collect()])) \
    .select("tconst", F.explode("directors").alias("nconst"))

# Contando as aparições dos diretores
df_director_count = df_director_appearances.groupBy("nconst") \
    .agg(F.count("*").alias("appearance_count"))

# Fazendo join com df_person para obter o nome e status dos diretores
df_director_status = df_director_count.join(df_person, on="nconst", how="inner") \
    .select(
        "nconst",
        "primaryName",
        "appearance_count",
        F.when(F.col("deathYear").isNotNull(), "Deceased").otherwise("Alive").alias("status")
    )

# Ordenando pelos diretores com mais aparições e limitando a 5
df_top_directors = df_director_status.orderBy(F.col("appearance_count").desc()).limit(5)

# Resultado
df_top_directors.show(truncate=False)

+---------+-----------------+----------------+------+
|nconst   |primaryName      |appearance_count|status|
+---------+-----------------+----------------+------+
|nm0634240|Christopher Nolan|9               |Alive |
|nm0000233|Quentin Tarantino|9               |Alive |
|nm0000229|Steven Spielberg |7               |Alive |
|nm0000631|Ridley Scott     |5               |Alive |
|nm0000399|David Fincher    |5               |Alive |
+---------+-----------------+----------------+------+



### Análise 5

Séries ordenadas por tempo total de duração em minutos, com total de temporadas, total de episódios e duração nominal por episódio.

Código SQL referência
```sql
WITH series_info AS (
    SELECT
        t.tconst,
        t.primaryTitle,
        COUNT(DISTINCT e.seasonNumber) AS num_seasons,
        COUNT(*) AS num_episodes,
        MAX(t.runtimeMinutes) AS episode_duration
    FROM
        top_titles t
    LEFT JOIN
        episode e ON t.tconst = e.parentTconst
    WHERE
        t.titleType = 'tvSeries'
    GROUP BY
        t.tconst, t.primaryTitle
)

SELECT
    primaryTitle,
    num_seasons AS total_seasons,
    num_episodes AS total_episodes,
    episode_duration,
    num_episodes * episode_duration AS total_runtime
FROM
    series_info
ORDER BY
    num_episodes * episode_duration DESC;
```



Implementação PySpark

In [17]:
from pyspark.sql import functions as F

# Filtrando apenas séries de TV em df_top_titles
df_series_info = df_top_titles.filter(F.col("titleType") == "tvSeries") \
    .join(df_episode, df_top_titles["tconst"] == df_episode["parentTconst"], how="left") \
    .groupBy(df_top_titles["tconst"], df_top_titles["primaryTitle"]) \
    .agg(
        F.countDistinct("seasonNumber").alias("num_seasons"),
        F.count("*").alias("num_episodes"),
        F.max(df_top_titles["runtimeMinutes"]).alias("episode_duration")
    )

# Calculando o total de runtime
df_series_info = df_series_info.withColumn(
    "total_runtime", F.col("num_episodes") * F.col("episode_duration")
)

# Selecionando as colunas necessárias e ordenando pelo total de runtime
df_series_info_ordered = df_series_info.select(
    "primaryTitle",
    F.col("num_seasons").alias("total_seasons"),
    F.col("num_episodes").alias("total_episodes"),
    "episode_duration",
    "total_runtime"
).orderBy(F.col("total_runtime").desc())

# Resultado
df_series_info_ordered.show(truncate=False)

+---------------------+-------------+--------------+----------------+-------------+
|primaryTitle         |total_seasons|total_episodes|episode_duration|total_runtime|
+---------------------+-------------+--------------+----------------+-------------+
|The Walking Dead     |11           |177           |45              |7965         |
|The Big Bang Theory  |12           |280           |22              |6160         |
|Dexter               |8            |96            |60              |5760         |
|Friends              |10           |235           |22              |5170         |
|How I Met Your Mother|9            |208           |23              |4784         |
|Game of Thrones      |8            |74            |60              |4440         |
|The Office           |9            |188           |22              |4136         |
|Better Call Saul     |6            |63            |45              |2835         |
|Breaking Bad         |5            |62            |45              |2790   