## 4. Ingestão CDC via Streaming

### Modos de Execução

**Modo Atual: `availableNow=True`**
- Processa todos os dados disponíveis e para automaticamente
- Ideal para execução via Jobs/Pipelines
- Sem custo de cluster ocioso

**Modo Contínuo (comentado abaixo):**
- Mantém o streaming ativo indefinidamente
- Processa novos arquivos conforme chegam
- Requer cluster sempre ligado

# Pipeline de Ingestão CDC - Upcell

Este notebook implementa o pipeline de ingestão de dados CDC (Change Data Capture) do S3 para o Bronze no Databricks.

## Objetivo
- Full-load: Carga inicial completa das tabelas
- CDC: Ingestão incremental com operações Insert, Update e Delete
- Delta Lake: Merge atômico na camada Bronze

## Requisitos
- Tabelas no S3: `s3://meudatalake-raw/upcell/`
- Catálogo: `bronze.upcell`
- Coluna de controle: `DtAtualizacao` (presente em todos os arquivos)

## 1. Importações e Setup

In [0]:
import delta
def table_exists(catalog, database, table):
    count = (spark.sql(f"SHOW TABLES IN `{catalog}`.`{database}`")
               .filter(f"database = '{database}' AND tableName = '{table}'")
               .count())
    return count == 1

In [0]:
catalog = "bronze"
schema = "upcell"

tablename = "clientes"
id_field = "idcliente"
timefield = "DtAtualizacao"

In [0]:
# Captura o schema dos arquivos CDC para usar no streaming
df_sample = spark.read.format("parquet").load(f"/Volumes/raw/upcell/cdc/{tablename}/")
df_schema = df_sample.schema
print(f"Schema capturado: {len(df_schema.fields)} colunas")

In [0]:
if not table_exists(catalog, schema, tablename):
    print("tabela nao existe")
    df_full = spark.read.format("parquet").load(f"/Volumes/raw/upcell/full-load/{tablename}/")
    
    (df_full.coalesce(1)
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(f"{catalog}.{schema}.{tablename}"))
else:
    print("tabela ja existe")


## 2. Configuração e Full-Load

Define variáveis de configuração e cria a tabela inicial se não existir.

In [0]:
# Função de upsert para processar cada batch do streaming
def upsert(df, batchId):
    print(f"Processando batch {batchId}...")
    
    # Conta registros no batch
    batch_count = df.count()
    print(f"  Registros no batch: {batch_count}")
    
    if batch_count == 0:
        print("  Batch vazio, pulando...")
        return
    
    # Cria view temporária
    df.createOrReplaceTempView(f"view_{tablename}")
    
    # Deduplica: pega apenas o último registro de cada chave
    query = f"""
    SELECT *  
    FROM view_{tablename}
    QUALIFY ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY {timefield} DESC) = 1    
    """
    df_cdc = spark.sql(query)
    dedup_count = df_cdc.count()
    print(f"  Após deduplicação: {dedup_count} registros")
    
    # Conta operações por tipo
    ops = df_cdc.groupBy("op").count().collect()
    for row in ops:
        print(f"    - {row['op']}: {row['count']}")
    
    # Aplica merge na tabela Delta
    bronze_table = delta.DeltaTable.forName(spark, f"{catalog}.{schema}.{tablename}")
    
    (bronze_table.alias("b") 
      .merge(df_cdc.alias("d"), f"b.{id_field} = d.{id_field}") 
      .whenMatchedDelete(condition = "d.op = 'D'")
      .whenMatchedUpdateAll(condition = "d.op = 'U'")
      .whenNotMatchedInsertAll(condition = "d.op = 'I'")
      .execute()
    )
    
    print(f"  Batch {batchId} processado com sucesso!\n")

# Configura streaming com Cloud Files (Auto Loader)
df_stream = (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "parquet")
                .schema(df_schema)
                .load(f"/Volumes/raw/upcell/cdc/{tablename}/"))

# Configura writeStream com trigger availableNow (processa e para)
stream = (df_stream.writeStream
                  .trigger(availableNow=True)
                  .option("checkpointLocation", f"/Volumes/raw/upcell/cdc/{tablename}/_checkpoints/")
                  .foreachBatch(upsert)
)

# Para modo contínuo, substitua por:
# .trigger(processingTime="10 seconds")  # Processa a cada 10 segundos
# E adicione .awaitTermination() após .start()

In [0]:
# Inicia o streaming
query = stream.start()

# Aguarda a finalização (com availableNow=True, processa e para automaticamente)
query.awaitTermination()

# Exibe diagnóstico
print("\n" + "="*60)
print("DIAGNÓSTICO DO STREAMING")
print("="*60)
print(f"Status: {query.status}")
print(f"\nÚltimo progresso:")
if query.lastProgress:
    print(f"  - Batch ID: {query.lastProgress.get('batchId', 'N/A')}")
    print(f"  - Linhas lidas: {query.lastProgress.get('numInputRows', 0)}")
    print(f"  - Duração: {query.lastProgress.get('durationMs', {}).get('triggerExecution', 0)} ms")
else:
    print("  Nenhum batch processado (sem dados novos)")

if query.exception():
    print(f"\nErro: {query.exception()}")
else:
    print("\nNenhum erro detectado")
print("="*60)