In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.storagelevel import StorageLevel

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
# Lendo os arquivos
# Reading the files
df_video = spark.read.option("header", "true").parquet("videos-preparados-parquet")
df_comments = spark.read.option("header", "true").parquet("videos-comments-tratados-parquet")

In [4]:
# Usando o show para se certificar que os dataframes estão sendo mostrados corretamente e count para o join futuramente
# Using show to ensure dataframes are displayed correctly and count for future join operations
print(df_video.count())
df_video.show(5)
print(df_comments.count())
df_comments.show(5)

1881
+--------------------+-----------+------------+-------+-----+--------+-------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Year|Month|Keyword Index|            Features|     Features Normal|        Features PCA|
+--------------------+-----------+------------+-------+-----+--------+-------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672| 135612|     139691|2022|    8|         24.0|[3407.0,135612.0,...|[2.07229197864298...|[-0.5379150448038...|
|The most EXPENSIV...|b3x28s61q3c|  2022-08-24|   tech|76779|    4306|1758063|    1839148|2022|    8|         24.0|[76779.0,1758063....|[0.00466873762089...|[-0.5379209409038...|
|My New House Gami...|4mgePWWCAmA|  2022-08-23|   tech|63825|    3338|1564007|    1631170|2022|    8

In [5]:
# Criando tabelas temporárias para ambos os dataframes
# Creating temporary tables for both dataframes
df_video.createOrReplaceTempView('videos')
df_comments.createOrReplaceTempView('comments')

In [6]:
# Criando um join das tabelas criadas anteriormente utilizando o spark.sql no dataframe ‘join_video_comments’
# Creating a join of the previously created tables using spark.sql into the 'join_video_comments' dataframe
join_video_comments = spark.sql("""
    SELECT *
    FROM comments
    INNER JOIN videos
    ON comments.`Video ID` = videos.`Video ID`
""")

In [7]:
# Realizando as mesmas etapas anteriores utilizando repartition
# Performing the same previous steps using repartition
df_video_repart = spark.read.option("header", "true").parquet("videos-preparados-parquet").repartition(2)
df_comments_repart = spark.read.option("header", "true").parquet("videos-comments-tratados-parquet").repartition(2)

print(f"Numero de partições df_video_repart:", df_video_repart.rdd.getNumPartitions())
print(f"Numero de partições df_comments_repart:", df_comments_repart.rdd.getNumPartitions())

df_video_repart.createOrReplaceTempView('videos_repart')
df_comments_repart.createOrReplaceTempView('comments_repart')

join_video_comments_repart_alt = df_comments_repart.join(df_video_repart, 'Video ID')

join_video_comments_repart = spark.sql("""
    SELECT *
    FROM comments_repart
    INNER JOIN videos_repart
    ON comments_repart.`Video ID` = videos_repart.`Video ID`
""")

Numero de partições df_video_repart: 2
Numero de partições df_comments_repart: 2


In [8]:
# Realizando as etapas anteriores utilizando coalesce
# Performing the previous steps using coalesce
df_video_coalesce = spark.read.option("header", "true").parquet("videos-preparados-parquet").coalesce(1)
df_comments_coalesce = spark.read.option("header", "true").parquet("videos-comments-tratados-parquet").coalesce(1)

print(f"Numero de partições df_video_coalesce:", df_video_coalesce.rdd.getNumPartitions())
print(f"Numero de partições df_comments_coalesce:", df_comments_coalesce.rdd.getNumPartitions())

df_video_repart.createOrReplaceTempView('videos_coalesce')
df_comments_repart.createOrReplaceTempView('comments_coalesce')

join_video_comments_coalesce = spark.sql("""
    SELECT *
    FROM comments_coalesce
    INNER JOIN videos_coalesce
    ON comments_coalesce.`Video ID` = videos_coalesce.`Video ID`
""")

Numero de partições df_video_coalesce: 1
Numero de partições df_comments_coalesce: 1


In [9]:
# Utilize o explain para entender melhor as duas formas de realizar as etapas
# Use explain to better understand the two ways of performing the steps
join_video_comments_repart.explain()

join_video_comments_coalesce.explain()

join_video_comments.explain()
# Aqui os dois, reparticionado e coalizado são bastantante parecidos em termos de performance, ambos adicionam um passo extra "REPARTITION_BY_NUM"
# o que significa na prática um embaralhamento/redistribuição dos dados
# Nesse caso, a melhor opção em termos de otimização seriam os dataframes originais, ainda que por uma margem baixa
# Here, both repartitioned and coalesced are quite similar in terms of performance, both add an extra "REPARTITION_BY_NUM" step
# which in practice means shuffling/redistributing the data
# In this case, the best option in terms of optimization would be the original dataframes, even if by a small margin

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [Video ID#156], [Video ID#143], Inner, BuildRight, false
   :- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=222]
   :  +- Filter isnotnull(Video ID#156)
   :     +- FileScan parquet [Video ID#156,Title#157,Published At#158,Keyword#159,Likes#160L,Comments#161L,Views#162L,Interaction#163L,Year#164,Comment#165,Likes Comment#166,Sentiment#167] Batched: true, DataFilters: [isnotnull(Video ID#156)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-comments-tratados-parquet], PartitionFilters: [], PushedFilters: [IsNotNull(`Video ID`)], ReadSchema: struct<Video ID:string,Title:string,Published At:date,Keyword:string,Likes:bigint,Comments:bigint...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=225]
      +- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=223]
         +- Filter isnotnull(Video ID#143)
  

In [10]:
# Count para comparação entre a quantidade de dados
# Count for comparison between data quantities
print(join_video_comments.count())

# Refazendo novamente as etapas anteriores com foco em perfomance
# Re-doing the previous steps again with a focus on performance

# Calculando limites dinâmicos com approxQuantile
# Calculating dynamic limits with approxQuantile
q1_interaction = df_comments.stat.approxQuantile("Interaction", [0.25], 0.01)[0] # Filtrando o primeiro quartil, para valores baixos
# Filtering the first quartile, for low values
p99_interaction = df_comments.stat.approxQuantile("Interaction", [0.99], 0.01)[0] # Filtrando o percentil 99, para outliers altos
# Filtering the 99th percentile, for high outliers

# Utilizando colunas relevantes, nesse caso, somente a coluna Interaction no lugar das 3 colunas que medem engajamento,
# Year e Month no lugar de `Published At` e deixando de lado as colunas Features para melhor performance
# Using relevant columns, in this case, only the Interaction column instead of the 3 engagement columns,
# Year and Month instead of `Published At` and leaving out the Features columns for better performance
join_video_comments = spark.sql("""
    SELECT c.`Video ID`, c.`Title`, c.`Keyword`, v.`Keyword Index`, c.`Interaction`, v.`Year`, v.`Month`, c.`Comment`, c.`Likes Comment`, c.`Sentiment`
    FROM comments c
    INNER JOIN videos v
    ON c.`Video ID` = v.`Video ID`
""").filter((col("Interaction") > q1_interaction) & (col("Interaction") < p99_interaction))

join_video_comments.show()
print(join_video_comments.count())

19117
+-----------+--------------------+-------+-------------+-----------+----+-----+--------------------+-------------+---------+
|   Video ID|               Title|Keyword|Keyword Index|Interaction|Year|Month|             Comment|Likes Comment|Sentiment|
+-----------+--------------------+-------+-------------+-----------+----+-----+--------------------+-------------+---------+
|wAZZ-UWGVHI|Apple Pay Is Kill...|   tech|         24.0|     139691|2022|    8|Let's not forget ...|           95|        1|
|wAZZ-UWGVHI|Apple Pay Is Kill...|   tech|         24.0|     139691|2022|    8|Here in NZ 50% of...|           19|        0|
|wAZZ-UWGVHI|Apple Pay Is Kill...|   tech|         24.0|     139691|2022|    8|I will forever ac...|          161|        2|
|wAZZ-UWGVHI|Apple Pay Is Kill...|   tech|         24.0|     139691|2022|    8|Whenever I go to ...|            8|        0|
|wAZZ-UWGVHI|Apple Pay Is Kill...|   tech|         24.0|     139691|2022|    8|Apple Pay is so c...|           34|     

In [11]:
# Salve o seu join otimizado como 'join-videos-comments-otimizado' no formato parquet
# Save your optimized join as 'join-videos-comments-otimizado' in parquet format
join_video_comments.write.mode("overwrite").parquet("join-videos-comments-otimizado")