# Particionamento e `repartition()` vs. `coalesce()` 

🔧 O que é particionamento no Spark?
O particionamento define como os dados de um DataFrame são distribuídos fisicamente entre os nós e núcleos do cluster. Cada partição representa uma fatia de dados que pode ser processada paralelamente.

✔️ Quanto melhor os dados estiverem particionados, maior o paralelismo, menor o custo de shuffle e melhor o uso de CPU e memória.

⚙️ Métodos de controle de particionamento
## `repartition(numPartitions)`
✅ O que faz:
- Cria novas partições redistribuindo completamente os dados via shuffle total.
- Garante que as partições fiquem mais uniformes.

🧠 Quando usar:
- Quando o DataFrame está desequilibrado (skewed).
- Antes de um join pesado, para garantir distribuição adequada.
- Para aumentar o paralelismo de uma operação como write().

## coalesce(numPartitions)
✅ O que faz:
- Reduz o número de partições sem embaralhar os dados (shuffle evitado).
- Simplesmente funde partições já existentes.

🧠 Quando usar:
- Antes de um salvamento para disco único (por exemplo, .toPandas(), .write.csv()).
- Ao final do pipeline para evitar muitos arquivos pequenos no output.

| Aspecto                  | `repartition()`                       | `coalesce()`                          |
| ------------------------ | ------------------------------------- | ------------------------------------- |
| Tipo de operação         | Com shuffle                           | Sem shuffle                           |
| Pode aumentar partições? | ✅ Sim                                 | ❌ Não (só reduz)                      |
| Custo computacional      | Alto (embaralha dados)                | Baixo (mantém dados locais)           |
| Ideal para               | Equalizar carga antes de joins/writes | Otimizar escrita com menos arquivos   |
| Exemplo comum            | `df.repartition(200)`                 | `df.coalesce(1)` para salvar em 1 CSV |

In [0]:
from pyspark.sql.functions import col, expr, sequence
from pyspark.sql.types import StringType, DateType
import random
from datetime import datetime, timedelta

# Gerar uma lista de CPFs aleatórios
def gerar_cpf():
    return ''.join([str(random.randint(0, 9)) for _ in range(11)])

cpfs = [gerar_cpf() for _ in range(10000)]

# Gerar uma lista de datas de referência
data_inicial = datetime(2025, 1, 1).date()
datas_ref = [data_inicial + timedelta(days=i) for i in range(10000)]

# Criar DataFrame
df = spark.createDataFrame(zip(cpfs, datas_ref), schema=["CPF", "DT_REF"])

# Reparticionar o DataFrame
df_reparticionado = df.repartition(10)

display(df_reparticionado)

# Broadcast joins e quando utilizá-los

📘 O que é um Broadcast Join?
É uma técnica usada quando uma das tabelas do join é significativamente menor que a outra.

O Spark envia essa tabela pequena para todos os executores (nós do cluster), permitindo que o join ocorra localmente em cada particionamento da tabela maior.

⚙️ Como funciona internamente
Spark estima o tamanho das tabelas com base no metastore e estatísticas de cardinalidade.

Se detectar que uma das tabelas está abaixo de um limite configurável (`spark.sql.autoBroadcastJoinThreshold`, padrão 10 MB), ele automaticamente aplica o broadcast join.

> PySpark Broadcast Join is an important part of the SQL execution engine, With broadcast join, PySpark broadcast the smaller DataFrame to all executors and the executor keeps this DataFrame in memory and the larger DataFrame is split and distributed across all executors so that PySpark can perform a join without shuffling any data from the larger DataFrame as the data required for join colocated on every executor.

Ou você pode forçar com `broadcast()` manualmente.

| Vantagem                 | Explicação                                                       |
| ------------------------ | ---------------------------------------------------------------- |
| 🚀 Performance           | Evita o **shuffle** da tabela maior → menos I/O e tempo de rede. |
| 💾 Eficiência de memória | Útil quando a tabela pequena **cabe na memória dos executores**. |
| 🔁 Join Local            | Operações ocorrem localmente por partição, o que reduz latência. |


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import broadcast

# ================================
# 1. Criar a tabela de contratos
# ================================
schema_contratos = StructType([
    StructField("contrato_id", IntegerType(), True),
    StructField("cliente_id", IntegerType(), True),
    StructField("score_credito", IntegerType(), True)
])

dados_contratos = [
    (101, 1, 745),
    (102, 2, 620),
    (103, 3, 590),
    (104, 4, 710),
    (105, 5, 800),
    (106, 6, 670),
]

df_contratos = spark.createDataFrame(dados_contratos, schema=schema_contratos)

# ==========================================
# 2. Criar a tabela de faixas de score (pequena)
# ==========================================

schema_faixas = StructType([
    StructField("faixa_min", IntegerType(), True),
    StructField("faixa_max", IntegerType(), True),
    StructField("segmento", StringType(), True)
])

dados_faixas = [
    (300, 599, "Alto Risco"),
    (600, 699, "Médio Risco"),
    (700, 850, "Baixo Risco")
]

df_faixas_score = spark.createDataFrame(dados_faixas, schema=schema_faixas)

# ==========================================
# 3. Realizar o broadcast join
# ==========================================

df_join = df_contratos.join(
    broadcast(df_faixas_score),
    (df_contratos["score_credito"] >= df_faixas_score["faixa_min"]) &
    (df_contratos["score_credito"] <= df_faixas_score["faixa_max"]),
    how="left"
)

# ==========================================
# 4. Exibir resultado
# ==========================================
display(df_join.select("contrato_id", "cliente_id", "score_credito", "segmento"))

# Caching com `cache()` e `persist()`

No Apache Spark, os métodos `cache()` e `persist()` são utilizados para armazenar em memória (ou em disco) os dados intermediários de um DataFrame ou RDD, com o objetivo de acelerar reprocessamentos subsequentes. Isso é especialmente útil em pipelines de transformação que reutilizam os mesmos dados várias vezes (ex.: treinamentos iterativos de modelos de machine learning ou ETL intensivo). A escolha entre eles depende de necessidades específicas de performance e uso de memória.

##🔹 `cache()`: armazenamento padrão em memória
- **Definição:** `cache()` é um atalho para `persist(StorageLevel.MEMORY_AND_DISK)` em PySpark, embora nos bastidores use por padrão `MEMORY_AND_DISK` ou `MEMORY_ONLY`, dependendo da versão e linguagem.

- **Comportamento:** Tenta armazenar os dados em memória; se não couberem completamente, Spark recomputa as partições não armazenadas sempre que forem requisitadas.

- **Uso típico:** Quando os dados cabem razoavelmente na memória e o custo de recomputação não é crítico para as partições faltantes.

Exemplo:

```
df.cache()
df.count()  # materializa o cache
```

Benefício real:

```
1ª chamada: df.count() -> 5.11 segundos
2ª chamada: df.count() -> 0.44 segundos
```

##🔹 `persist(storageLevel):` controle granular de armazenamento
- **Definição:** Permite escolher o nível de persistência desejado entre várias opções oferecidas pela enumeração StorageLevel, como:

- `MEMORY_ONLY`
- `MEMORY_AND_DISK`
- `DISK_ONLY`
- `MEMORY_AND_DISK_SER` (versão serializada)
- `OFF_HEAP`

- **Comportamento:** Armazena os dados no(s) nível(is) definidos. Se a memória não for suficiente e o nível permitir, armazena o excedente no disco.

Exemplo:

```
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
df.count()  # materializa a persistência
```

> Ganho similar ao cache(), com maior flexibilidade e controle sobre o comportamento em ambientes com pressão de memória

##🔹 Quando utilizar?
Use `cache()`:

- Para operações leves e uso repetido dos dados durante uma única sessão;
- Quando os dados cabem confortavelmente em memória.

Use `persist()`:

- Quando os dados não cabem totalmente em memória;
- Quando se deseja armazenamento serializado (reduz uso de heap) ou com fallback para disco;
- Quando é necessário replicar os dados entre executores para tolerância a falhas (`MEMORY_ONLY_2`, etc.).

##🔹 Boas práticas
- **Materialização:** o cache só é efetivado após a execução de uma ação que varre completamente o DataFrame, como `count()` ou `collect()`. Ações parciais como `take(1)` cacheiam apenas uma partiçãoLearning Spark 2nd.

- **Unpersist:** sempre remova dados da memória após o uso com `df.unpersist()` para liberar recursos.