In [51]:
# Instala o PySpark (caso esteja rodando no Google Colab ou ambiente que exija instalação manual)
!pip install pyspark



In [52]:
# Importa as bibliotecas necessárias
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [53]:
# Cria ou obtém a sessão Spark
spark = SparkSession.builder.getOrCreate()

In [54]:
# Leitura do arquivo 'videos-preparados.snappy.parquet'
df_video = spark.read.parquet('/content/drive/MyDrive/aulas/videos-preparados-parquet')

# Leitura do arquivo 'video-comments-tratados.snappy.parquet'
df_comments = spark.read.parquet('/content/drive/MyDrive/aulas/videos-comments-tratados-parquet')

In [55]:
# Visualiza os dados lidos (opcional)
df_video.show()
df_comments.show()

+--------------------+-----------+------------+----------------+------+--------+---------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|               Title|   Video ID|Published At|         Keyword| Likes|Comments|    Views|Interaction|Year|Month|Keyword Index|        Features PCA|            Features|     Features Normal|
+--------------------+-----------+------------+----------------+------+--------+---------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|ASMR MUKBANG DOUB...|--ZI0dSbbNU|  2020-04-18|         mukbang|378858|   18860| 17975269|   18372987|2020|    4|         30.0|[-1.7977902050831...|[378858.0,1.79752...|[0.02303716158264...|
|Deadly car bomb d...|--hxd1CrOqg|  2022-08-22|            news|  6379|    4853|   808787|     820019|2022|    8|         37.0|[-808813.5260760847]|[6379.0,808787.0,...|[3.87946679100418...|
|How Biden&#39;s s...|--ixiTypG8g|  2022-08-2

In [56]:
# Mostra a contagem de registros em cada DataFrame
print(df_video.count())
print(df_comments.count())

1869
18409


In [57]:
# Renomeia a coluna 'Video ID' para 'Video_ID' para evitar espaços e facilitar o uso no Spark SQL
df_comments = df_comments.withColumnRenamed('Video ID', 'Video_ID')
df_video = df_video.withColumnRenamed('Video ID', 'Video_ID')

# Cria tabelas temporárias com os dados originais
df_video.createOrReplaceTempView('video_temp')
df_comments.createOrReplaceTempView('comments_temp')

# Realiza o join padrão com as tabelas temporárias
join_video_comments = spark.sql("""
      SELECT v.*
      FROM video_temp v
      JOIN comments_temp c
      ON c.Video_ID = v.Video_ID
""")

In [58]:
# Repartition — reorganiza os dados para melhorar o paralelismo (divide em 5 partições por 'Video_ID')
df_video_r = df_video.repartition(5, 'Video_ID')
df_comments_r = df_comments.repartition(5, 'Video_ID')

In [19]:
# Cria tabelas temporárias com os dados reparticionados
df_video_r.createOrReplaceTempView('video_temp_r')
df_comments_r.createOrReplaceTempView('comments_temp_r')

In [34]:
# Realiza o join com os DataFrames reparticionados
join_video_comments_r = spark.sql("""
      SELECT v.*
      FROM video_temp_r v
      JOIN comments_temp_r c
      ON c.Video_ID = v.Video_ID
""")

In [26]:
# Coalesce — reduz o número de partições (para 1) para otimizar escrita sequencial em disco
df_video_c = df_video.coalesce(1)
df_comments_c = df_comments.coalesce(1)

In [27]:
# Cria tabelas temporárias com os dados coalescidos
df_video_c.createOrReplaceTempView('video_temp_c')
df_comments_c.createOrReplaceTempView('comments_temp_c')

In [33]:
# Realiza o join com os DataFrames coalescidos
join_video_comments_c = spark.sql("""
      SELECT v.*
      FROM video_temp_c v
      JOIN comments_temp_c c
      ON c.Video_ID = v.Video_ID
""")

In [59]:
# EXPLAIN — analisa o plano de execução dos três joins
# Isso mostra como o Spark vai executar cada join (broadcast, shuffle, merge, etc.)
join_video_comments.explain()       # Join padrão
join_video_comments_r.explain()     # Join com repartição
join_video_comments_c.explain()     # Join com coalesce

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Title#1191, Video_ID#1392, Published At#1193, Keyword#1194, Likes#1195, Comments#1196, Views#1197, Interaction#1198, Year#1199, Month#1200, Keyword Index#1201, Features PCA#1202, Features#1203, Features Normal#1204]
   +- BroadcastHashJoin [Video_ID#1392], [Video_ID#1380], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [plan_id=1822]
      :  +- Project [Title#1191, Video ID#1192 AS Video_ID#1392, Published At#1193, Keyword#1194, Likes#1195, Comments#1196, Views#1197, Interaction#1198, Year#1199, Month#1200, Keyword Index#1201, Features PCA#1202, Features#1203, Features Normal#1204]
      :     +- Filter isnotnull(Video ID#1192)
      :        +- FileScan parquet [Title#1191,Video ID#1192,Published At#1193,Keyword#1194,Likes#1195,Comments#1196,Views#1197,Interaction#1198,Year#1199,Month#1200,Keyword Index#1201,Features PCA#1202,Features#1203,Features 

In [60]:
# Otimização — Refaz as etapas 1 a 4, mas agora com boas práticas aplicadas:
# Renomeia a coluna para evitar espaços
df_video = spark.read.parquet('/content/drive/MyDrive/aulas/videos-preparados-parquet')
df_comments = spark.read.parquet('/content/drive/MyDrive/aulas/videos-comments-tratados-parquet')
df_video = df_video.withColumnRenamed("Video ID", "video_id")
df_comments = df_comments.withColumnRenamed("Video ID", "video_id")

In [61]:
# Cria novas tabelas temporárias
df_video.createOrReplaceTempView("video_temp")
df_comments.createOrReplaceTempView("comment_temp")

In [62]:
# Realiza o JOIN otimizado:
# - Filtra apenas vídeos que têm comentários (DISTINCT video_id)
# - Junta com as tabelas definitivas
# - Seleciona apenas colunas relevantes
join_video_comments = spark.sql("""
      SELECT v.Title, v.Video_ID, v.Year, v.Interaction, c.Sentiment
      FROM video_temp v
      JOIN (
        SELECT DISTINCT video_id
        FROM comment_temp
      ) filtro ON filtro.video_id = v.video_id
      JOIN comment_temp c
      ON c.video_id = v.video_id
""")

# Visualiza o resultado final do join
join_video_comments.show()

# Analisa o plano de execução do join otimizado
join_video_comments.explain()

+--------------------+-----------+----+-----------+---------+
|               Title|   Video_ID|Year|Interaction|Sentiment|
+--------------------+-----------+----+-----------+---------+
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      2.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      2.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      2.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      2.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      1.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      2.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      2.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      0.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      0.0|
|How To Fix a Wate...|115amzVdV44|2020|   53053975|      1.0|
|How to Start a Bl...|m7Jw3a7CpNA|2021|     458123|      1.0|
|How to Start a Bl...|m7Jw3a7CpNA|2021|     458123|      2.0|
|How to Start a Bl...|m7Jw3a7CpNA|2021|     458123|      2.0|
|How to 

In [48]:
# Salva o resultado otimizado em formato Parquet no Google Drive
join_video_comments.write.parquet('/content/drive/MyDrive/aulas/join-video-comments-otimizado')

In [50]:
# Encerra a sessão Spark
spark.stop()