# Dicas PySpark

In [None]:
#Quando você tem consultas frequentes em grandes tabelas que aplicam filtros (WHERE, JOIN, etc.) em colunas específicas, como date, customer_id, ou region.
OPTIMIZE my_table
ZORDER BY (customer_id, date);

#Isso otimiza a tabela para consultas baseadas em customer_id e date.
#Benefícios:
#Reduz o IO (Input/Output) ao acessar apenas os arquivos relevantes.
#Melhora o desempenho de consultas em tabelas muito grandes com muitos arquivos particionados.

## Usando as propriedades do Delta Lake TIME TRAVEL: "VERSION AS OF", "TIMESTAMP AS OF", "DESCRIBE HISTORY", "RESTORE TABLE"

In [None]:
#Cenário:
#Você gerencia uma tabela de pacientes chamada pacientes_delta que contém dados sensíveis sobre histórico médico. 
# Por engano, um analista deletou ou alterou registros importantes, e agora você precisa restaurar os dados para o estado em que estavam antes do incidente.

CREATE TABLE pacientes_delta (
    id INT,
    nome STRING,
    diagnostico STRING
) USING DELTA;

INSERT INTO pacientes_delta VALUES
    (1, 'Maria', 'Hipertensão'),
    (2, 'João', 'Diabetes'),
    (3, 'Ana', 'Asma');
---
UPDATE pacientes_delta
SET diagnostico = 'Nenhum'
WHERE id = 2;
---
SELECT * FROM pacientes_delta VERSION AS OF 0;
SELECT * FROM pacientes_delta TIMESTAMP AS OF '2025-01-15T10:00:00.000+0000';
RESTORE TABLE pacientes_delta TO VERSION AS OF 0;
--
#Você pode usar o comando para identificar as versões da tabela, para então identificar qual estada deseja fazer o restore. 
DESCRIBE HISTORY nome_da_tabela;

## Usando UDFs:

In [None]:
#Calcular o IMC (Índice de Massa Corporal) de pacientes a partir de suas alturas e pesos. O cálculo não está disponível nativamente no Spark.
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Função Python para calcular o IMC
def calcular_imc(peso, altura):
    if altura > 0:  # Evita divisão por zero
        return peso / (altura ** 2)
    return None

# Registrar a função como UDF
calcular_imc_udf = udf(calcular_imc, DoubleType())

# Criar DataFrame com dados de exemplo
dados = [(1, 70.0, 1.75), (2, 80.0, 1.80), (3, 90.0, 0)]  # ID, Peso, Altura
colunas = ["id", "peso", "altura"]
df = spark.createDataFrame(dados, colunas)

# Aplicar a UDF no DataFrame
df_com_imc = df.withColumn("imc", calcular_imc_udf(df.peso, df.altura))

# Mostrar o resultado
df_com_imc.show()

In [None]:
#Escrever uma UDF para converter strings em letras maiúsculas, simulando uma necessidade customizada.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Função Python para converter texto para maiúsculas
def para_maiusculas(texto):
    if texto:
        return texto.upper()
    return None

# Registrar como UDF
para_maiusculas_udf = udf(para_maiusculas, StringType())

# Criar DataFrame com dados de exemplo
dados = [("João",), ("Maria",), ("Carlos",)]
colunas = ["nome"]
df = spark.createDataFrame(dados, colunas)

# Aplicar a UDF
df_maiusculo = df.withColumn("nome_maiusculo", para_maiusculas_udf(df.nome))

# Mostrar o resultado
df_maiusculo.show()

In [None]:
#Criar uma UDF que classifique a idade em faixas etárias com base em múltiplas condições.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Função para categorizar faixa etária
def faixa_etaria(idade):
    if idade < 18:
        return "Menor de idade"
    elif 18 <= idade < 60:
        return "Adulto"
    else:
        return "Idoso"

# Registrar como UDF
faixa_etaria_udf = udf(faixa_etaria, StringType())

# Criar DataFrame com dados de exemplo
dados = [(15,), (25,), (65,)]
colunas = ["idade"]
df = spark.createDataFrame(dados, colunas)

# Aplicar a UDF
df_faixa = df.withColumn("faixa_etaria", faixa_etaria_udf(df.idade))

# Mostrar o resultado
df_faixa.show()

In [None]:
#Converter strings para letras maiúsculas pode ser feito com uma função nativa do Spark:
from pyspark.sql.functions import upper

# Criar DataFrame com dados de exemplo
dados = [("João",), ("Maria",), ("Carlos",)]
colunas = ["nome"]
df = spark.createDataFrame(dados, colunas)

# Usar função nativa em vez de UDF
df_maiusculo = df.withColumn("nome_maiusculo", upper(df.nome))

# Mostrar o resultado
df_maiusculo.show()


## Lidando com dados PII (Personally Identifiable Information)

In [None]:
#Aplicar máscara em colunas sensíveis como CPF, números de cartão de crédito ou nomes.
from pyspark.sql.functions import lit, concat, substring

# Criar DataFrame com PII
dados = [("João Silva", "123.456.789-00", "5555-4444-3333-2222"),
         ("Maria Santos", "987.654.321-11", "6666-5555-4444-3333")]
colunas = ["nome", "cpf", "cartao_credito"]
df = spark.createDataFrame(dados, colunas)

# Máscara de dados
df_masked = df.withColumn("cpf", concat(lit("***.***."), substring(df["cpf"], 9, 3), lit("-**"))) \
              .withColumn("cartao_credito", concat(lit("****-****-****-"), substring(df["cartao_credito"], 16, 4)))

df_masked.show()


In [None]:
#Substituir números de CPF por tokens:
from pyspark.sql.functions import sha2, concat

# Tokenizar CPF com hash SHA-256
df_tokenized = df.withColumn("cpf_tokenizado", sha2(concat(df["cpf"]), 256))

df_tokenized.show()


In [None]:
#Substituir nomes reais por nomes fictícios:
from faker import Faker
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

fake = Faker()

# Função para gerar nomes fictícios
def pseudonimizar_nome(nome):
    return fake.name()

# Registrar UDF
pseudonimizar_udf = udf(pseudonimizar_nome, StringType())

# Aplicar pseudonimização
df_pseudo = df.withColumn("nome_pseudonimizado", pseudonimizar_udf(df.nome))

df_pseudo.show()


### Resumo das Boas Práticas para Dados PII no Databricks
| **Estratégia**          | **Benefício**                                                                 |
|--------------------------|------------------------------------------------------------------------------|
| **Criptografia**         | Protege dados em trânsito e em repouso.                                      |
| **Máscara de Dados**     | Evita exposição direta de informações sensíveis.                             |
| **Controle de Acesso (RBAC)** | Restringe permissões com base em funções e grupos.                         |
| **Tokenização**          | Substitui valores reais por tokens não reversíveis.                          |
| **Pseudonimização**      | Preserva o formato dos dados, mas substitui valores reais.                   |
| **Monitoramento**        | Rastreia acessos e alterações nos dados para auditoria e compliance.         |
| **Delta Sharing**        | Permite compartilhamento seguro com parceiros externos.                      |

## Verificação de qualidade dos dados

In [None]:
# Check: Contagem Total de Linhas
# Objetivo: Validar se o número de linhas processadas corresponde ao valor esperado.
# Uso: Detectar perda ou duplicação de dados durante ingestões, transformações ou cargas.
# DataFrame de exemplo
dados = [(1, "João", 25), (2, "Maria", 30), (3, "Carlos", 40)]
colunas = ["id", "nome", "idade"]
df = spark.createDataFrame(dados, colunas)

# Contagem total de linhas
contagem_atual = df.count()

# Valor esperado
valor_esperado = 3

# Validação
if contagem_atual == valor_esperado:
    print(f"✅ Contagem de linhas correta: {contagem_atual}")
else:
    print(f"⚠️ Contagem de linhas incorreta! Atual: {contagem_atual}, Esperado: {valor_esperado}")


In [None]:
# Check: Contagem de NULL
# Objetivo: Identificar problemas de JOIN, ETL ou dados ausentes em colunas críticas.
# Uso: Detectar erros em processos de união de tabelas ou ingestões incompletas.
from pyspark.sql.functions import col, when, count

# DataFrame de exemplo com valores NULL
dados = [(1, "João", 25), (2, None, 30), (3, "Carlos", None)]
colunas = ["id", "nome", "idade"]
df = spark.createDataFrame(dados, colunas)

# Contagem de valores NULL por coluna
df.select([
    count(when(col(c).isNull(), c)).alias(f"{c}_nulls")
    for c in df.columns
]).show()


In [None]:
# Check: Contagem de Chaves Exclusivas
# Objetivo: Garantir que não há duplicidade em chaves primárias.
# Uso: Evitar erros em tabelas relacionais e garantir integridade referencial.
# Se tiver retorno, tem algo errado na chave PK
from pyspark.sql.functions import count, col

# DataFrame de exemplo com duplicados
dados = [(1, "João", 25), (2, "Maria", 30), (2, "Maria", 30), (3, "Carlos", 40)]
colunas = ["id", "nome", "idade"]
df = spark.createDataFrame(dados, colunas)

# Contar duplicados na chave primária (id)
df.groupBy("id").count().filter(col("count") > 1).show()

# Se o DataFrame retornar resultados, significa que há duplicados:
duplicados = df.groupBy("id").count().filter(col("count") > 1).count()
if duplicados > 0:
    print(f"⚠️ Foram encontrados {duplicados} IDs duplicados!")
else:
    print("✅ Nenhum ID duplicado encontrado.")


In [None]:
# Adicionando Checks Automatizados ao Pipeline
def verificar_qualidade(df, valor_esperado):
    # 1. Check: Contagem Total de Linhas
    contagem_atual = df.count()
    if contagem_atual != valor_esperado:
        print(f"⚠️ Contagem de linhas incorreta! Atual: {contagem_atual}, Esperado: {valor_esperado}")

    # 2. Check: Contagem de NULL
    null_counts = df.select([
        count(when(col(c).isNull(), c)).alias(f"{c}_nulls")
        for c in df.columns
    ])
    null_counts.show()

    # 3. Check: Chaves Exclusivas
    duplicados = df.groupBy("id").count().filter(col("count") > 1).count()
    if duplicados > 0:
        print(f"⚠️ Foram encontrados {duplicados} IDs duplicados!")
    else:
        print("✅ Nenhum ID duplicado encontrado.")

# Exemplo de aplicação
verificar_qualidade(df, valor_esperado=4)


## Como Minimizar o Impacto dos Shuffles

In [None]:
# Como Identificar Shuffles no Databricks
# Spark UI
# 
# Acesse a aba Stages e procure por tarefas que envolvam Shuffle Read e Shuffle Write.
# Verifique o tamanho dos dados movidos e o tempo gasto.
# Execution Plan
# 
# Analise o plano de execução lógico ou físico para identificar operadores que causam shuffle.

df.explain(True)  # Exibe o plano de execução detalhado


In [None]:
#Escolha colunas adequadas para particionar os dados antes de executar operações.
df = df.repartition("id")  # Particiona os dados por 'id'

#Use coalesce para reduzir partições ao invés de repartition, quando possível, já que coalesce evita shuffle.
df = df.coalesce(5)  # Reduz as partições para 5

#Salve os dados em tabelas Delta com particionamento antes de realizar operações custosas:
df.write.format("delta").partitionBy("categoria").save("/path/delta_table")

#Use Z-Ordering no Delta Lake para otimizar leituras e minimizar a movimentação de dados:
OPTIMIZE delta_table
ZORDER BY (coluna_chave);

#Se as partições estiverem desbalanceadas (skew), trate as chaves problemáticas separadamente:
skewed_keys = df.filter(df["chave"] == "chave_problemática")
balanced_df = df.filter(df["chave"] != "chave_problemática")
result = balanced_df.union(skewed_keys)


### **Por que os Spills Ocorrem?**
Os spills geralmente acontecem devido a:
1. **Falta de Memória Suficiente:**
   - A memória disponível para uma tarefa não é suficiente para armazenar os dados necessários.
   - Por exemplo, ao realizar operações como **sort**, **join** ou **aggregation**, que exigem reorganização ou combinação de grandes volumes de dados.

2. **Dados Muito Grandes em Partições:**
   - Quando uma partição contém um volume de dados desproporcionalmente grande, ela pode exceder a capacidade de memória disponível.

3. **Configuração Inadequada:**
   - Parâmetros de configuração do Spark relacionados à memória e ao tamanho das partições não estão ajustados adequadamente.

---

### **Tipos de Spill**
1. **Shuffle Spill:**
   - Ocorre durante operações de shuffle (ex.: `JOIN`, `GROUP BY`, `ORDER BY`) quando os dados intermediários gerados pelas tarefas não cabem na memória e precisam ser gravados no disco.

2. **Sort Spill:**
   - Ocorre durante operações de ordenação (`ORDER BY` ou `SORT`), quando o Spark tenta ordenar dados que não cabem em memória.

3. **Aggregation Spill:**
   - Acontece em agregações (`SUM`, `COUNT`, etc.), onde o Spark precisa manter estados intermediários em memória para computar os resultados, mas os estados são muito grandes.

---

### **Impactos dos Spills**
Os spills podem causar:
1. **Aumento da Latência:**
   - A gravação e leitura de dados do disco é muito mais lenta do que manter os dados em memória.
   
2. **Sobrecarga no Disco:**
   - Um uso excessivo de spill pode sobrecarregar o sistema de disco do cluster, causando gargalos em tarefas paralelas.

3. **Redução do Desempenho Geral:**
   - Quando spills ocorrem com frequência, o tempo de execução dos jobs aumenta significativamente.

---

### **Como Identificar Spills no Databricks**
1. **Spark UI:**
   - Acesse a aba **SQL** ou **Stages** no Spark UI para identificar tarefas com:
     - **Shuffle Spill** (dados gravados no disco durante o shuffle).
     - **Sort Spill** (dados gravados no disco durante a ordenação).
   - As colunas relevantes incluem:
     - **Shuffle spill (disk):** Quantidade de dados escritos no disco.
     - **Memory spill:** Quantidade de dados transferidos para o disco.

2. **Execution Plan:**
   - Use `explain()` para analisar o plano de execução de uma query e identificar operações que possam causar spill (como shuffles ou sorts):
   ```python
   df.groupBy("coluna").count().explain(True)
   ```

---

### **Como Minimizar Spills**
1. **Aumente a Memória do Executor:**
   - Ajuste a memória disponível para os executores no Spark:
     ```bash
     spark.executor.memory=8g
     spark.executor.memoryOverhead=2g
     ```

2. **Reduza o Tamanho das Partições:**
   - Use particionamento inteligente para evitar partições muito grandes que possam causar spills:
     ```python
     df = df.repartition(10)  # Divida os dados em 10 partições
     ```

3. **Ajuste Parâmetros de Configuração:**
   - **spark.sql.shuffle.partitions:** Ajuste o número de partições para operações de shuffle.
     ```python
     spark.conf.set("spark.sql.shuffle.partitions", "200")
     ```
   - **spark.sql.autoBroadcastJoinThreshold:** Configure o tamanho limite para joins em broadcast (evitando shuffles grandes).
     ```python
     spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
     ```

4. **Use Tabelas Delta Otimizadas:**
   - Compacte e otimize os dados para evitar pequenos arquivos que aumentam o risco de spills:
     ```sql
     OPTIMIZE tabela_delta
     ZORDER BY (coluna_chave);
     ```

5. **Monitoramento Regular:**
   - Acompanhe regularmente os jobs no **Spark UI** e ajuste configurações conforme necessário para evitar spills recorrentes.

---

### **Exemplo de Código com Ajustes**
```python
# Configurações para minimizar spills
spark.conf.set("spark.sql.shuffle.partitions", "200")  # Ajuste o número de partições
spark.conf.set("spark.executor.memory", "8g")  # Aumente a memória do executor
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "20MB")  # Limite para joins em broadcast

# DataFrame de exemplo
dados = [(1, "João", 1000), (2, "Maria", 1500), (3, "Carlos", 2000)]
colunas = ["id", "nome", "salario"]
df = spark.createDataFrame(dados, colunas)

# Reparticionar para evitar grandes partições
df = df.repartition("id")

# Operação que pode causar spill (Join)
outro_df = spark.createDataFrame([(1, "SP"), (2, "RJ"), (3, "MG")], ["id", "estado"])
df_join = df.join(outro_df, "id")

# Exibir o resultado
df_join.show()
```

---

- **Spill** é a gravação temporária de dados no disco quando a memória é insuficiente.
- Pode ocorrer em operações como **joins**, **aggregations**, ou **sorts**.
- **Impacto:** Aumenta o tempo de execução e sobrecarrega os recursos de disco.
- **Como evitar:** Ajuste a configuração de memória, particionamento e otimize os dados.