## **==================OPÇÕES DE OTIMIZAÇÃO==================**

## **EXPLAIN**

  

##### A função EXPLAIN() é uma ferramenta essencial para inspecionar este plano e pode ajudar a identificar gargalos e oportunidades de otimização.

##### Ao usar EXPLAIN(), você pode especificar diferentes níveis de detalhe:

1. **`simple`**: Exibe apenas o plano físico.
2. **`extended`**: Exibe os planos lógico, otimizado e físico.
3. **`codegen`**: Exibe o código gerado para a execução da consulta.
4. **`formatted`**: Exibe uma versão detalhada e bem formatada dos planos.

In [None]:
#EXEMPLO DE DF
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["id", "value"])

#TRANSFORMAÇÃO
df_filtered = df.filter(df.id > 1)

#EXPLAIN()
df_filtered.explain()

In [None]:
#EXEMPLO DE DF
df1 = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
df2 = spark.createDataFrame([(1, "X"), (2, "Y")], ["id", "label"])

#JOIN
df_joined = df1.join(df2, "id")
df_joined.explain("formatted")

#verifique se o Spark está usando um broadcast join para conjuntos de dados pequenos 
#ou um sort-merge join para conjuntos maiores. 
#Se necessário, force o uso de broadcast joins:

from pyspark.sql.functions import broadcast

df_joined = df1.join(broadcast(df2), "id")
df_joined.explain("formatted")

#### Analisando o Plano de Execução

**Logical Plan:**        
 _Análise de Alto Nível:_ Mostra as operações na forma de uma árvore. Útil para entender a sequência de transformações aplicadas.
 _Otimizações Lógicas:_ Inclui pushdown de filtros, reordenação de joins, etc.

**Optimized Logical Plan:**        
_Verificação de Otimizações:_ Confirma se otimizações esperadas, como a projeção de colunas, foram aplicadas.
_Eliminação de Redundâncias:_ Verifica se operações desnecessárias foram removidas.

**Physical Plan:**       
 _Execução Real:_ Mostra como as operações lógicas serão executadas no cluster.
_Estratégias de Join:_ Identifica o tipo de join (e.g., broadcast, sort-merge).
_Exchange Operations:_ Mostra onde os dados serão reparticionados ou redistribuídos.

\=======================================================================================

## **TÉCNICA OPTIMIZATIONS CATALYTS**

##### O Catalyst é o otimizador de consultas do Spark, responsável por transformar e otimizar o plano de execução das consultas Spark. Ele aplica diversas técnicas para melhorar o desempenho.

### **Predicate Pushdown (Empurramento de Filtros)**
##### O Predicate Pushdown refere-se à técnica onde o Catalyst empurra filtros para o nível mais baixo possível do plano de execução. Isso significa que o Spark tenta aplicar filtros o mais cedo possível no processo de leitura dos dados, antes de realizar operações adicionais como joins e agregações. Isso reduz o volume de dados que precisam ser processados, melhorando significativamente o desempenho das consultas.

##### Neste exemplo, o filtro df["coluna"] > 10 é aplicado antes do groupBy, reduzindo o número de linhas que participam na agregação.

In [None]:
df.filter(df["coluna"] > 10).groupBy("outra_coluna").count()

### **Column Pruning (Remoção de Colunas)**
##### A técnica de Column Pruning envolve a remoção de colunas não utilizadas antes da execução da consulta. O Catalyst analisa o plano de execução da consulta e identifica quais colunas são necessárias para satisfazer a operação solicitada. Colunas que não são necessárias para o resultado final da consulta são removidas antes da execução, reduzindo a sobrecarga de leitura e processamento de dados desnecessários.

##### Neste caso, apenas a coluna "coluna_utilizada" é mantida após a filtragem, eliminando a necessidade de processar outras colunas do DataFrame.

In [None]:
df.select("coluna_utilizada").filter(df["outra_coluna"] > 20)

#### Funcionamento Interno do Catalyst

O Catalyst opera em três fases principais durante a otimização de consultas:

_Análise (Analysis):_ Durante esta fase, o Catalyst analisa a estrutura da consulta e constrói uma árvore lógica representando a operação solicitada.

_Otimização Lógica (Logical Optimization):_ Nesta fase, o Catalyst aplica transformações na árvore lógica para simplificar e reorganizar as operações, aplicando técnicas como predicate pushdown e column pruning.

_Otimização Física (Physical Optimization):_ Na última fase, o Catalyst converte a árvore lógica otimizada em um plano de execução físico, considerando detalhes como particionamento de dados, métodos de join e estratégias de execução para cada operação.

\=======================================================================================

## **REDUCE**

##### A técnica de usar REDUCE() para unir DataFrames no PySpark é muito útil, mas há situações em que pode não ser a melhor abordagem.

In [None]:
#EXEMPLO DE DF
df1 = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
df2 = spark.createDataFrame([(3, "C"), (4, "D")], ["id", "value"])
df3 = spark.createDataFrame([(5, "E"), (6, "F")], ["id", "value"])

#LISTA DE DF
dataframes = [df1, df2, df3]

#USO DO REDUCE
df_final = reduce(DataFrame.unionAll, dataframes)

##### Evite usar REDUCE para unir DataFrames nos seguintes casos:

- Quando os DataFrames têm esquemas diferentes.
- Quando o número de DataFrames é muito grande.
- Quando há conflitos de particionamento.
- Quando a união depende de condições específicas.
- Quando são necessárias operações complexas de dados antes ou depois da união.

\=======================================================================================

## **BROADCAST**

##### O BROADCAST é uma técnica usada para otimizar operações de join ao distribuir <u>uma pequena tabela</u> (ou DataFrame) para todos os nós do cluster.

#### Vantagens do broadcast:

_Redução de Movimentação de Dados:_ Como a tabela pequena é enviada para todos os nós, não há necessidade de redistribuir a tabela grande, reduzindo a movimentação de dados pela rede.

_Desempenho Melhorado:_ A realização de joins localmente em cada nó pode acelerar significativamente a operação, especialmente em casos de joins complexos ou grandes volumes de dados.

_Uso Eficiente de Recursos:_ A técnica de broadcast aproveita melhor os recursos do cluster, evitando operações de shuffle custosas.

In [None]:
#BIBLIOTECA
from pyspark.sql.functions import broadcast

# JOIN COM BROADCAST
df_joined_broadcast = df1.join(broadcast(df2), "id")

#PARA VER A DISTRIBUIÇÃO DE TABELAS
df_joined_broadcast.explain("formatted")

#### Quando Usar broadcast:

_Tabela Pequena:_ Use broadcast quando uma das tabelas é relativamente pequena (geralmente cabe na memória de cada nó).

_Join Assimétrico:_ Ideal para joins entre uma tabela grande e uma tabela pequena.

_Desempenho de Join:_ Quando você notar que o desempenho de join é um gargalo devido à movimentação de dados.

\=======================================================================================

## **SORT-MERGE**

##### É eficiente para grandes volumes de dados. Spark classifica ambos os DataFrames e os une de maneira ordenada. Esse tipo de join é o padrão no Spark quando ambas as tabelas são grandes.

##### Funcionamento do Sort-Merge Join:

**Sorting (Ordenação):**
Ambos os DataFrames são ordenados pelas colunas de join. Esta etapa garante que os dados relacionados estejam próximos uns dos outros.

**Merging (Mesclagem):**
Após a ordenação, o Spark realiza uma varredura linear para unir os registros correspondentes das duas tabelas ordenadas.

#### Vantagens do Sort-Merge Join

_Eficiência para Grandes Dados:_ Sort-merge join é muito eficiente para unir grandes conjuntos de dados porque minimiza a quantidade de dados que precisam ser redistribuídos entre os nós.

_Escalabilidade:_ O sort-merge join pode escalar facilmente com o aumento do tamanho dos dados, pois é projetado para lidar com grandes volumes de dados distribuídos.

_Redução de Movimentação de Dados:_ Ao ordenar os dados antes do join, o sort-merge join garante que os dados relacionados estejam nas mesmas partições, reduzindo significativamente a movimentação de dados pela rede.

_Adequado para Dados Particionados:_ É especialmente útil quando os dados já estão particionados ou podem ser facilmente particionados nas colunas de join.

_Determinismo:_ O processo de sort-merge join é determinístico, o que significa que os resultados são consistentes e previsíveis.

In [None]:
# EXEMPLO DF GRANDE
df1 = spark.range(0, 1e8).toDF("id").withColumn("value", expr("id % 1000"))
df2 = spark.range(0, 1e8).toDF("id").withColumn("label", expr("id % 500"))

# CONFIGURAÇÃO SORT-MERGE PARA TABELAS GRANDES COM JOIN
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

# JOIN
df_joined = df1.join(df2, "id")

#PARA VER A DISTRIBUIÇÃO DE TABELAS
df_joined.explain("formatted")

#### Quando Usar Sort-Merge Join

_Grandes Conjuntos de Dados:_ Quando ambas as tabelas a serem unidas são grandes e não cabem na memória de cada nó.

_Operações de Join Pesadas:_ Quando a operação de join é complexa e envolve muitas linhas em cada tabela.

_Dados Particionados ou Ordenados:_ Quando os dados já estão particionados ou podem ser ordenados facilmente nas colunas de join.

_Quando Broadcast Não é Viável:_ Quando as tabelas são grandes demais para serem transmitidas (broadcast) eficientemente para todos os nós do cluster.

\=======================================================================================

## **SKEW JOIN**

##### Um cenário de dados desbalanceados (ou "skewed data") ocorre quando a distribuição dos dados em uma coluna específica não é uniforme, ou seja, alguns valores aparecem com muito mais frequência do que outros. Isso pode levar a um desequilíbrio significativo na carga de trabalho durante operações de join, causando problemas de desempenho.

###### _Categorias de Produtos:_ Em um catálogo de produtos, algumas categorias (como eletrônicos) podem ter muito mais produtos do que outras (como livros).

In [None]:
from pyspark.sql import SparkSession

#EXEMPLO: 100 MB
spark = SparkSession.builder \
    .appName("ExemploBroadcastJoin") \
    .config("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) \ 
    .getOrCreate()

In [None]:
#VALOR ATUAL
print("Valor atual de spark.sql.autoBroadcastJoinThreshold:",
 spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))

#### Quando Usar Skew Join

Usar técnicas de skew join é recomendado quando você detecta que os dados estão desbalanceados e isso está afetando o desempenho das suas operações de join. As técnicas de skew join ajudam a redistribuir a carga de trabalho de forma mais equilibrada.

O Spark tem uma configuração específica _(spark.sql.autoBroadcastJoinThreshold)_ para ajudar a gerenciar dados desbalanceados automaticamente, mas isso depende do tamanho da tabela.

#### **Considerações**

_Tamanho Adequado:_ Escolha um valor apropriado para spark.sql.autoBroadcastJoinThreshold com base na memória disponível nos nós do cluster e no tamanho típico das suas tabelas.

_Monitoramento:_ Monitore o desempenho das suas consultas para ajustar o valor conforme necessário. Um valor muito baixo pode causar muitos broadcast joins, consumindo muita memória, enquanto um valor muito alto pode resultar em shuffles desnecessários.

\=======================================================================================

## **PARTITIONBY**

In [None]:
#FORMATO DELTA E PARTIÇÕES
spark.read.json('gravação')\
    .write.format('delta')\
    .partitionBy('COLUNA_ESCOLHIDA') #VERIFIQUE QUE A COLUNA SERÁ MESMO UTILIZADA EM FILTRO
    .mode('overwrite')\
    .saveAsTable('SALVANDO NA TABELA')

\=======================================================================================

## **OPTMIZE e ZORDER BY**

#### São técnicas avançadas de otimização que podem ser aplicadas para melhorar o desempenho de consultas, especialmente em cenários onde a performance é crucial.
##### COLUNAS PARTICIONADAS NÃO PODEM SER COLOCADAS EM INDICE, DARÁ ERRO.

###### OPTIMIZE refere-se a uma operação específica no contexto do Delta Lake, uma extensão do Apache Spark que oferece funcionalidades adicionais para gerenciar e otimizar data lakes. 

###### ZORDER BY escolha colunas que são frequentemente usadas em consultas como critérios de filtro para obter os melhores resultados de desempenho. Como se fossem indices.

In [None]:
from pyspark.sql.functions import col

df.write\
    .format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("optimize", "true")\
    .option("zorderBy", "col_name")\
    .save("/caminho/para/salvar/tabela")

\=======================================================================================