In [1]:
# Instala PySpark
!pip install -q pyspark


In [2]:
# Importa e cria a SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Modulo Avancado de Agregacoes") \
    .getOrCreate()


In [3]:
from google.colab import files

uploaded = files.upload()


Saving videos-comments-tratados.snappy (1).parquet to videos-comments-tratados.snappy (1).parquet
Saving videos-preparados.snappy (1).parquet to videos-preparados.snappy (1).parquet


In [5]:
df_video = spark.read.parquet("videos-preparados.snappy.parquet")
df_video.show()


+--------------------+-----------+------------+----------------+------+--------+---------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|               Title|   Video ID|Published At|         Keyword| Likes|Comments|    Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|
+--------------------+-----------+------------+----------------+------+--------+---------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|ASMR MUKBANG DOUB...|--ZI0dSbbNU|  2020-04-18|         mukbang|378858|   18860| 17975269|   18372987|2020|    4|         30.0|[0.6985786560867407]|[0.02303716158264...|[378858.0,1.79752...|
|Deadly car bomb d...|--hxd1CrOqg|  2022-08-22|            news|  6379|    4853|   808787|     820019|2022|    8|         37.0|[0.8936407990235931]|[3.87946679100418...|[6379.0,808787.0,...|
|How Biden&#39;s s...|--ixiTypG8g|  2022-08-2

In [9]:
df_comments = spark.read.parquet("video-comments-tratados.snappy.parquet")
df_comments.show()


+-----------+--------------------+------------+-------+-----+--------+-------+-----------+----+--------------------+---------+-------------+
|   Video ID|               Title|Published At|Keyword|Likes|Comments|  Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+-----------+--------------------+------------+-------+-----+--------+-------+-----------+----+--------------------+---------+-------------+
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672| 135612|     139691|2022|Let's not forget ...|        1|           95|
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672| 135612|     139691|2022|Here in NZ 50% of...|        0|           19|
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672| 135612|     139691|2022|I will forever ac...|        2|          161|
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672| 135612|     139691|2022|Whenever I go to ...|        0|            8|
|wAZZ-UWGVHI|

In [10]:
df_video.createOrReplaceTempView("tb_video")
df_comments.createOrReplaceTempView("tb_comments")


In [11]:
join_video_comments = spark.sql("""
    SELECT *
    FROM tb_video v
    JOIN tb_comments c
    ON v.`Video ID` = c.`Video ID`
""")
join_video_comments.show()


+--------------------+-----------+------------+-------+-----+--------+-------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+-----+--------+-------+-----------+----+--------------------+---------+-------------+
|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|   Video ID|               Title|Published At|Keyword|Likes|Comments|  Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+--------------------+-----------+------------+-------+-----+--------+-------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+-----+--------+-------+-----------+----+--------------------+---------+-------------+
|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23

In [12]:
df_video_rep = df_video.repartition(4)
df_comments_rep = df_comments.repartition(4)

df_video_rep.createOrReplaceTempView("tb_video_rep")
df_comments_rep.createOrReplaceTempView("tb_comments_rep")

join_video_comments_rep = spark.sql("""
    SELECT *
    FROM tb_video_rep v
    JOIN tb_comments_rep c
    ON v.`Video ID` = c.`Video ID`
""")
join_video_comments_rep.show()


+--------------------+-----------+------------+----------------+-------+--------+--------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+----------------+-------+--------+--------+-----------+----+--------------------+---------+-------------+
|               Title|   Video ID|Published At|         Keyword|  Likes|Comments|   Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|   Video ID|               Title|Published At|         Keyword|  Likes|Comments|   Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+--------------------+-----------+------------+----------------+-------+--------+--------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+----------------+-------+--------+--------+-----------+----+--------------------

In [13]:
df_video_coa = df_video.coalesce(1)
df_comments_coa = df_comments.coalesce(1)

df_video_coa.createOrReplaceTempView("tb_video_coa")
df_comments_coa.createOrReplaceTempView("tb_comments_coa")

join_video_comments_coa = spark.sql("""
    SELECT *
    FROM tb_video_coa v
    JOIN tb_comments_coa c
    ON v.`Video ID` = c.`Video ID`
""")
join_video_comments_coa.show()


+--------------------+-----------+------------+-------+-----+--------+-------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+-----+--------+-------+-----------+----+--------------------+---------+-------------+
|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|   Video ID|               Title|Published At|Keyword|Likes|Comments|  Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+--------------------+-----------+------------+-------+-----+--------+-------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+-----+--------+-------+-----------+----+--------------------+---------+-------------+
|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23

In [14]:
print("== Plano com repartition ==")
join_video_comments_rep.explain()

print("\n== Plano com coalesce ==")
join_video_comments_coa.explain()


== Plano com repartition ==
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Video ID#1], [Video ID#86], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=410]
   :  +- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [plan_id=407]
   :     +- Filter isnotnull(Video ID#1)
   :        +- FileScan parquet [Title#0,Video ID#1,Published At#2,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9,Keyword Index#10,Features PCA#11,Features Normal#12,Features#13] Batched: true, DataFilters: [isnotnull(Video ID#1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-preparados.snappy.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(`Video ID`)], ReadSchema: struct<Title:string,Video ID:string,Published At:date,Keyword:string,Likes:int,Comments:int,Views...
   +- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [plan_id=408]
      +- Filt

In [15]:
# 1. Reparticiona os dados com base na chave do join para melhorar a distribuição
df_video_opt = df_video.repartition("Video ID")
df_comments_opt = df_comments.repartition("Video ID")

In [16]:
# 2. Filtra apenas os dados relevantes para reduzir o volume antes do join
df_video_opt = df_video_opt.filter("Year >= 2020")  # exemplo de filtro
df_comments_opt = df_comments_opt.filter("Sentiment >= 0")  # outro filtro útil

In [17]:
# 3. Faz o join diretamente com os DataFrames otimizados
df_join_optimized = df_video_opt.join(df_comments_opt, on="Video ID", how="inner")

In [20]:
# 4. (Opcional) Seleciona apenas colunas de interesse para economizar memória
df_join_optimized = df_join_optimized.select(
    df_video_opt["Video ID"],
    df_video_opt["Title"],
    df_video_opt["Keyword"],
    df_video_opt["Likes"],
    df_video_opt["Comments"],
    df_comments_opt["Sentiment"],
    df_video_opt["Published At"]
)

In [21]:

# 5. Mostra o resultado final
df_join_optimized.show()

+-----------+--------------------+-------+------+--------+---------+------------+
|   Video ID|               Title|Keyword| Likes|Comments|Sentiment|Published At|
+-----------+--------------------+-------+------+--------+---------+------------+
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        1|  2020-08-18|
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        0|  2020-08-18|
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        0|  2020-08-18|
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        2|  2020-08-18|
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        2|  2020-08-18|
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        1|  2020-08-18|
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        2|  2020-08-18|
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        2|  2020-08-18|
|115amzVdV44|How To Fix a Wate...| how-to|910553|   81975|        2|  2020-08-18|
|115amzVdV44|How