# Particionamento e Compressão no Spark

Este notebook cobre:
- Repartition vs Coalesce
- PartitionBy ao escrever
- Formatos de arquivo (Parquet, ORC, JSON, CSV)
- Codecs de compressão
- Estratégias de particionamento

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, rand
from datetime import datetime, timedelta
import random

spark = SparkSession.builder \
    .appName("Partitioning_Compression") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

## 1. Criando Dataset de Exemplo

In [None]:
def generate_sample_data(n_records=10000):
    departamentos = ["Vendas", "TI", "RH", "Financeiro", "Marketing"]
    cidades = ["São Paulo", "Rio de Janeiro", "Belo Horizonte", "Porto Alegre"]
    
    data = []
    base_date = datetime(2022, 1, 1)
    
    for i in range(n_records):
        data.append((
            i,
            f"Funcionario_{i}",
            random.choice(departamentos),
            random.choice(cidades),
            random.uniform(3000, 15000),
            (base_date + timedelta(days=random.randint(0, 730))).strftime("%Y-%m-%d")
        ))
    return data

data = generate_sample_data(10000)
df = spark.createDataFrame(
    data,
    ["id", "nome", "departamento", "cidade", "salario", "data_registro"]
)

# Convertendo e adicionando colunas de data
df = df.withColumn("data_registro", col("data_registro").cast("date")) \
       .withColumn("ano", year("data_registro")) \
       .withColumn("mes", month("data_registro"))

print(f"Total de registros: {df.count()}")
print(f"Partições no DataFrame: {df.rdd.getNumPartitions()}")
df.show(5)

## 2. Repartition vs Coalesce

### Diferenças:
- **Repartition**: Redistribui dados (shuffle completo). Pode aumentar ou diminuir partições.
- **Coalesce**: Apenas diminui partições. Sem shuffle (mais eficiente).

In [None]:
print(f"Partições originais: {df.rdd.getNumPartitions()}")

# REPARTITION - redistribui uniformemente
df_repartitioned = df.repartition(10)
print(f"Após repartition(10): {df_repartitioned.rdd.getNumPartitions()}")

# Repartition por coluna - mesma chave na mesma partição
df_by_dept = df.repartition("departamento")
print(f"Repartition por departamento: {df_by_dept.rdd.getNumPartitions()}")

# Repartition por coluna com número específico
df_by_dept_n = df.repartition(5, "departamento")
print(f"Repartition(5, 'departamento'): {df_by_dept_n.rdd.getNumPartitions()}")

In [None]:
# COALESCE - reduz sem shuffle
df_coalesced = df.coalesce(2)
print(f"Após coalesce(2): {df_coalesced.rdd.getNumPartitions()}")

# Coalesce não pode aumentar partições!
df_coalesced_up = df.coalesce(100)
print(f"Coalesce(100) - não aumenta: {df_coalesced_up.rdd.getNumPartitions()}")

## 3. PartitionBy ao Escrever

In [None]:
output_path = "./data/output"

# Particionamento simples por uma coluna
# Estrutura: /departamento=Vendas/data.parquet
df.write \
    .mode("overwrite") \
    .partitionBy("departamento") \
    .parquet(f"{output_path}/por_departamento")

print("Escrito com partição por departamento")

In [None]:
# Particionamento hierárquico (múltiplas colunas)
# Estrutura: /ano=2023/mes=01/data.parquet
df.write \
    .mode("overwrite") \
    .partitionBy("ano", "mes") \
    .parquet(f"{output_path}/por_data")

print("Escrito com partição hierárquica ano/mes")

## 4. Partition Pruning

Quando você filtra por colunas de partição, Spark lê apenas as partições necessárias.

In [None]:
# Lendo dados particionados
df_partitioned = spark.read.parquet(f"{output_path}/por_data")

# Esta query só lê partições necessárias
df_filtered = df_partitioned.filter((col("ano") == 2023) & (col("mes") == 1))

# Veja "PartitionFilters" no plano
df_filtered.explain(True)

## 5. Formatos de Arquivo

In [None]:
# Dados de exemplo menor para comparação
df_small = df.limit(1000)

# PARQUET - Colunar (RECOMENDADO para analytics)
df_small.write.mode("overwrite").parquet(f"{output_path}/formato_parquet")

# ORC - Colunar (otimizado para Hive)
df_small.write.mode("overwrite").orc(f"{output_path}/formato_orc")

# JSON - Texto legível
df_small.write.mode("overwrite").json(f"{output_path}/formato_json")

# CSV - Universal
df_small.write.mode("overwrite").option("header", "true").csv(f"{output_path}/formato_csv")

print("Arquivos escritos em diferentes formatos")

### Comparação de Formatos

| Formato | Tipo | Compressão | Use Case |
|---------|------|------------|----------|
| Parquet | Colunar | Excelente | Analytics, Data Lake |
| ORC | Colunar | Excelente | Hive, Analytics |
| Avro | Linhas | Boa | Streaming, Kafka |
| JSON | Texto | Ruim | APIs, Debug |
| CSV | Texto | Ruim | Import/Export |

## 6. Codecs de Compressão

In [None]:
# SNAPPY - Rápido, boa compressão (DEFAULT)
df_small.write.mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(f"{output_path}/parquet_snappy")

# GZIP - Maior compressão, mais lento
df_small.write.mode("overwrite") \
    .option("compression", "gzip") \
    .parquet(f"{output_path}/parquet_gzip")

# ZSTD - Bom balanço velocidade/compressão
df_small.write.mode("overwrite") \
    .option("compression", "zstd") \
    .parquet(f"{output_path}/parquet_zstd")

# SEM compressão
df_small.write.mode("overwrite") \
    .option("compression", "none") \
    .parquet(f"{output_path}/parquet_none")

print("Comparação de compressão criada")

### Codecs de Compressão

| Codec | Velocidade | Compressão | Uso Recomendado |
|-------|------------|------------|------------------|
| snappy | Muito rápido | Média | Default, uso geral |
| gzip | Lento | Alta | Armazenamento longo prazo |
| lz4 | Muito rápido | Baixa | Streaming, tempo real |
| zstd | Rápido | Alta | Melhor balanço |
| none | N/A | Nenhuma | Debug, testes |

## 7. Controlando Arquivos por Partição

In [None]:
# Problema comum: Muitos arquivos pequenos
# Solução 1: Coalesce antes de escrever
df.repartition("departamento") \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .partitionBy("departamento") \
    .parquet(f"{output_path}/single_file_per_partition")

print("Um arquivo por partição")

In [None]:
# Solução 2: maxRecordsPerFile
df.write \
    .mode("overwrite") \
    .option("maxRecordsPerFile", 100000) \
    .partitionBy("departamento") \
    .parquet(f"{output_path}/controlled_size")

print("Tamanho de arquivo controlado")

## 8. Dynamic Partition Overwrite

In [None]:
# Por padrão, overwrite substitui TODAS as partições
# Com dynamic, só substitui partições com novos dados

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# Agora só as partições presentes no DF serão substituídas
novos_dados = df.filter(col("departamento") == "TI")
novos_dados.write \
    .mode("overwrite") \
    .partitionBy("departamento") \
    .parquet(f"{output_path}/dynamic_overwrite")

print("Apenas partição TI foi sobrescrita")

## 9. Estratégias de Particionamento

### Regras de Ouro:
1. Partições não devem ser muito pequenas (< 128MB)
2. Partições não devem ser muito grandes (> 1GB)
3. Evite colunas com alta cardinalidade (ex: user_id)
4. Partição ideal: 128MB - 1GB

### Estratégias Comuns:
- **Por DATA**: `partitionBy("ano", "mes")` - time-series
- **Por CATEGORIA**: `partitionBy("regiao", "tipo")` - filtros frequentes
- **Híbrido**: `partitionBy("ano", "mes", "regiao")`

In [None]:
# Analisando distribuição das partições
def show_partition_sizes(df, name):
    print(f"\n{name}:")
    print(f"Número de partições: {df.rdd.getNumPartitions()}")
    partition_sizes = df.rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
    for i, size in enumerate(partition_sizes[:10]):  # Mostra apenas primeiras 10
        print(f"  Partição {i}: {size} registros")

show_partition_sizes(df, "Original")
show_partition_sizes(df.repartition(4), "Repartitioned(4)")
show_partition_sizes(df.repartition("departamento"), "Por departamento")

## 10. Save Modes

In [None]:
# Modos disponíveis:
# - overwrite: substitui se existir
# - append: adiciona aos dados existentes
# - ignore: não faz nada se existir
# - error/errorifexists: erro se existir (default)

df_small.write.mode("overwrite").parquet(f"{output_path}/test_mode")
df_small.write.mode("append").parquet(f"{output_path}/test_mode")
df_small.write.mode("ignore").parquet(f"{output_path}/test_mode")

# Verificando
df_verify = spark.read.parquet(f"{output_path}/test_mode")
print(f"Total após overwrite + append: {df_verify.count()}")

## 11. Configurações Globais

In [None]:
# Configurar compressão padrão
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
spark.conf.set("spark.sql.orc.compression.codec", "snappy")

# Configurar partições de shuffle
spark.conf.set("spark.sql.shuffle.partitions", "200")

# Ver configurações
print("Parquet compression:", spark.conf.get("spark.sql.parquet.compression.codec"))
print("Shuffle partitions:", spark.conf.get("spark.sql.shuffle.partitions"))