**Configuração e Inicialização**

# PREPARANDO O AMBIENTE E DADOS

In [1]:
!pip install pyspark # Instala a biblioteca PySpark no ambiente de execução

from pyspark.sql import SparkSession # Importa a classe SparkSession, o ponto de entrada principal para a funcionalidade do Spark

from pyspark.sql.functions import col, lit # col (para referenciar colunas) e lit (para valores literais)



In [2]:
# Cria ou obtém a instância da SparkSession
spark = SparkSession.builder \
.appName("otimizacao") \
.getOrCreate()

print("SparkSession iniciada!") # Imprime uma confirmação da inicialização
print(spark) # Exibe a instância da SparkSession (apresentando seu endereço de memória/ID)

SparkSession iniciada!
<pyspark.sql.session.SparkSession object at 0x7db992bd1550>


**Leitura e Preparação dos Dados**

# LEITURA DOS DATAFRAMES

In [3]:
df_video = spark.read.parquet("/content/videos-preparados.snappy (1).parquet") # Lê o DataFrame df_video a partir de um arquivo Parquet (formato otimizado)
print("'df_video' lido. Registros:", df_video.count()) # Exibe uma confirmação e a contagem total de registros do DF de vídeos

df_comments = spark.read.parquet("/content/videos-comments-tratados.snappy (1).parquet") # Lê o DataFrame df_comments a partir do arquivo Parquet de comentários
print("df_comments lido. Registros:", df_comments.count()) # Exibe uma confirmação e a contagem total de registros do DF de comentários

'df_video' lido. Registros: 1869
df_comments lido. Registros: 18409


In [4]:
df_video = df_video.withColumnRenamed("Video ID",  "video_id_key") # Renomeia a coluna chave em df_video para um padrão consistente (video_id_key)
df_comments = df_comments.withColumnRenamed("Video ID", "video_id_key") # Renomeia a coluna chave em df_comments para a mesma chave

# CRIAÇÃO DE TABELAS TEMPORÁRIAS (SQL)

In [5]:
df_video.createOrReplaceTempView("tabela_videos") # Cria uma View temporária que permite que df_video seja consultado via SQL
df_comments.createOrReplaceTempView("tabela_comments") # Cria uma View temporária para df_comments

print("Tabelas temporárias 'tabela_videos' e 'tabela_comments' criadas") # Confirmação da criação das Views

Tabelas temporárias 'tabela_videos' e 'tabela_comments' criadas


**Join Padrão (Método 1 - SQL)**

# JOIN USANDO SPARK SQL (MÉTODO 1 - SQL)

In [6]:
# Executa o join entre tabela_videos e tabela_comments na chave video_id_key, selecionando todas as colunas de vídeos e as colunas de comentário/sentimento
join_video_comments_sql = spark.sql("""
  SELECT
      v.*,
      c.comment,
      c.Sentiment
  FROM
      tabela_videos v
  INNER JOIN
        tabela_comments c ON v.video_id_key = c.video_id_key
""")

print("Join SQL concluído. Exibindo as 5 linhas:") # Mensagem de Conclusão
join_video_comments_sql.show(5) # Exibe as primeiras 5 linhas do resultado do join SQL

Join SQL concluído. Exibindo as 5 linhas:
+--------------------+------------+------------+-------+-----+--------+------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+--------------------+---------+
|               Title|video_id_key|Published At|Keyword|Likes|Comments| Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|             comment|Sentiment|
+--------------------+------------+------------+-------+-----+--------+------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+--------------------+---------+
|Apple Pay Is Kill...| wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|    8|         23.0|[0.5461641657286636]|[2.07229197864298...|[3407.0,135612.0,...|Let's not forget ...|        1|
|Apple Pay Is Kill...| wAZZ-UWGVHI|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|    8|         23.0|[0.

**Otimização por Particionamento (Método 2 - Repartition/Coalesce)**

# JOIN USANDO MANIPULAÇÃO DE PARTIÇÕES (MÉTODO 2 - PARTIÇÕES)

In [7]:
# 1 Reorganizar Partições (Aumentar ou Redistribuir)

df_video_rep = df_video.repartition(50, "video_id_key") # Redistribui df_video em 50 partições, usando video_id_key como chave de particionamento (particionamento hash). Isso garante que o mesmo ID de vídeo caia na mesma partição
df_comments_rep = df_comments.repartition(50, "video_id_key") # Faz o mesmo com df_comments

print("DataFrames reparticionados em 50 partições pela chave 'video_id_key'.") # Confirmação

DataFrames reparticionados em 50 partições pela chave 'video_id_key'.


In [8]:
# 2 Join usando a API de DataFrame (DataFrames reparticionados)

join_video_comments_rep = df_video_rep.join( # Realiza o join entre os DataFrames reparticionados
    df_comments_rep.select("video_id_key", "Comment", "Sentiment"), # Já faz uma seleção (select) antes do join, aplicando o Projection Pushdown para reduzir o volume de dados
    on="video_id_key", # Define a chave do join
    how="inner" # Define o tipo de join
)

In [10]:
# 3 Coalesce (Diminuir Partições)

join_video_comments_coal = join_video_comments_rep.coalesce(8) # Reduz o número de partições de 50 para 8 sem realizar um full shuffle. Isso é ideal para preparar os dados para escrita, evitando muitos arquivos pequenos

print(f"Join concluído. Partições finais: {join_video_comments_coal.rdd.getNumPartitions()}") # Exibe o número final de partições
join_video_comments_coal.show(5) # Exibe as 5 primeiras linhas do resultado

Join concluído. Partições finais: 8
+------------+--------------------+------------+-------+-----+--------+-----+-----------+----+-----+-------------+--------------------+--------------------+--------------------+--------------------+---------+
|video_id_key|               Title|Published At|Keyword|Likes|Comments|Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|             Comment|Sentiment|
+------------+--------------------+------------+-------+-----+--------+-----+-----------+----+-----+-------------+--------------------+--------------------+--------------------+--------------------+---------+
| 88w5Q03Lcro|අද දෙරණ 6.55 ප්‍ර...|  2022-08-24|   news|  612|      18|60835|      61465|2022|    8|         37.0|[0.8936396747757095]|[3.72745006721875...|[612.0,60835.0,20...|Good luck sri lan...|        2|
| 88w5Q03Lcro|අද දෙරණ 6.55 ප්‍ර...|  2022-08-24|   news|  612|      18|60835|      61465|2022|    8|         37.0|[0.89363967477

**Comparação de Planos de Execução (EXPLAIN)**

#  COMPARAÇÃO COM EXPLAIN

In [11]:
print("\nPlano de execução (método 1: SQL simples):")

join_video_comments_sql.explain(extended=True) # Exibe o plano de execução para o join SQL. O extended=True mostra mais detalhes sobre otimizações e planos lógicos/físicos


Plano de execução (método 1: SQL simples):
== Parsed Logical Plan ==
'Project [v.*, 'c.comment, 'c.Sentiment]
+- 'Join Inner, ('v.video_id_key = 'c.video_id_key)
   :- 'SubqueryAlias v
   :  +- 'UnresolvedRelation [tabela_videos], [], false
   +- 'SubqueryAlias c
      +- 'UnresolvedRelation [tabela_comments], [], false

== Analyzed Logical Plan ==
Title: string, video_id_key: string, Published At: date, Keyword: string, Likes: int, Comments: int, Views: int, Interaction: int, Year: int, Month: int, Keyword Index: double, Features PCA: vector, Features Normal: vector, Features: vector, comment: string, Sentiment: int
Project [Title#0, video_id_key#88, Published At#2, Keyword#3, Likes#4, Comments#5, Views#6, Interaction#7, Year#8, Month#9, Keyword Index#10, Features PCA#11, Features Normal#12, Features#13, comment#56, Sentiment#57]
+- Join Inner, (video_id_key#88 = video_id_key#104)
   :- SubqueryAlias v
   :  +- SubqueryAlias tabela_videos
   :     +- View (`tabela_videos`, [Title#0,v

In [12]:
print("\nPlano de execução (método 2: repartition/coalesce):")

join_video_comments_coal.explain(extended=True) # Exibe o plano de execução para o join com manipulação de particionamento. O plano deve mostrar os passos de Repartition e Coalesce


Plano de execução (método 2: repartition/coalesce):
== Parsed Logical Plan ==
Repartition 8, false
+- Project [video_id_key#88, Title#0, Published At#2, Keyword#3, Likes#4, Comments#5, Views#6, Interaction#7, Year#8, Month#9, Keyword Index#10, Features PCA#11, Features Normal#12, Features#13, Comment#56, Sentiment#57]
   +- Join Inner, (video_id_key#88 = video_id_key#104)
      :- RepartitionByExpression [video_id_key#88], 50
      :  +- Project [Title#0, Video ID#1 AS video_id_key#88, Published At#2, Keyword#3, Likes#4, Comments#5, Views#6, Interaction#7, Year#8, Month#9, Keyword Index#10, Features PCA#11, Features Normal#12, Features#13]
      :     +- Relation [Title#0,Video ID#1,Published At#2,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9,Keyword Index#10,Features PCA#11,Features Normal#12,Features#13] parquet
      +- Project [video_id_key#104, Comment#56, Sentiment#57]
         +- RepartitionByExpression [video_id_key#104], 50
            +- Project [Video ID

**Join Otimizado (Método 3 - Projection and Filter Pushdown)**

# JOIN OTIMIZADO (MÉTODO 3 - SELEÇÃO E FILTRAGEM)

In [13]:
# 7.1 Seleciona apenas as colunas essenciais do DataFrame de Vídeos
# Selecionar apenas as colunas de interesse antes do join (Filter Pushdown)

# Seleciona apenas as 5 colunas estritamente necessárias do DF de vídeos antes de qualquer operação de join, reduzindo o volume de dados a serem processados
df_video_otimizado = df_video.select(
    "video_id_key",
    "Title",
    "Keyword",
    "Likes",
    "Views"
)

In [14]:
# 7.2 Seleciona apenas as colunas essenciais e filtra (se necessário) do DataFrame de Comentários
# Filtra para manter apenas comentários com Sentiment positivo (Sentiment > 0)
# Diminui o volume de dados ANTES da junção

# Seleciona as colunas necessárias do DF de comentários
df_comments_otimizado = df_comments.select(
    "video_id_key",
    "Comment",
    "Sentiment"
).filter(col("Sentiment") > 0) # Filtra os comentários para manter apenas aqueles com Sentiment positivo antes do join. Isso diminui drasticamente o número de linhas que precisam ser unidas

In [15]:
# 7.3 Realiza o Join com a API de DataFrame (Otimizado)
# Utiliza a API de DataFrame (df.join) que é a preferida e mais otimizada
# pelo Catalyst Optimizer do PySpark

# Realiza o join entre os DataFrames já reduzidos e otimizados
join_otimizado = df_video_otimizado.join(
    df_comments_otimizado,
    on="video_id_key",
    how="inner"
)

In [16]:
# 7.4 Aplica o Coalesce para reduzir o número de partições para o salvamento
# Reduz o número de partições para 4 (um número razoável para a maioria dos datasets)
# antes de salvar para evitar muitos arquivos pequenos (Small Files Problem)

join_otimizado = join_otimizado.coalesce(4) # Reduz o resultado para 4 partições, um bom número para escrita

print("join otimizado concluido com 'select' e 'filter' antes da junção.") # Confirmação
join_otimizado.show(5) # Exibe as 5 primeiras linhas do resultado otimizado

print("\nPlano de execução (método 3: otimizado com select e filter):") #
join_otimizado.explain(extended=True) # Exibe o Plano de Execução do método otimizado, que mostrará as otimizações de Filter e Project empurradas para a fonte de dados (Parquet)

join otimizado concluido com 'select' e 'filter' antes da junção.
+------------+--------------------+-------+-----+------+--------------------+---------+
|video_id_key|               Title|Keyword|Likes| Views|             Comment|Sentiment|
+------------+--------------------+-------+-----+------+--------------------+---------+
| wAZZ-UWGVHI|Apple Pay Is Kill...|   tech| 3407|135612|Let's not forget ...|        1|
| wAZZ-UWGVHI|Apple Pay Is Kill...|   tech| 3407|135612|I will forever ac...|        2|
| wAZZ-UWGVHI|Apple Pay Is Kill...|   tech| 3407|135612|Apple Pay is so c...|        2|
| wAZZ-UWGVHI|Apple Pay Is Kill...|   tech| 3407|135612|We’ve been houndi...|        1|
| wAZZ-UWGVHI|Apple Pay Is Kill...|   tech| 3407|135612|We only got Apple...|        2|
+------------+--------------------+-------+-----+------+--------------------+---------+
only showing top 5 rows


Plano de execução (método 3: otimizado com select e filter):
== Parsed Logical Plan ==
Repartition 4, false
+- Proje

**Salvamento e Finalização**

#  SALVAMENTO DO JOIN OTIMIZADO

In [17]:
join_otimizado.write.mode("overwrite").parquet("join-videos-comments-oimizado") # Salva o DataFrame final em formato Parquet no modo "overwrite" no diretório especificado. Serão criados 4 arquivos Parquet (devido ao coalesce(4) anterior)

print("dataframe 'join-videos-comments-otimizado' salvo com sucesso.") # Confirmação

dataframe 'join-videos-comments-otimizado' salvo com sucesso.


# PARANDO A SPARK SESSION

In [18]:
spark.stop() #  Termina a SparkSession e libera todos os recursos de cluster associados
print("SparkSession encerrada.") # Confirmação

SparkSession encerrada.
