# 📥 Pipeline de Ingestão CDC - Upcell

Este notebook implementa o pipeline de ingestão de dados CDC (Change Data Capture) do S3 para o Bronze no Databricks.

## 🎯 Objetivo
- **Full-load**: Carga inicial completa das tabelas
- **CDC**: Ingestão incremental com operações Insert, Update e Delete
- **Delta Lake**: Merge atômico na camada Bronze

## 📋 Requisitos
- Tabelas no S3: `s3://meudatalake-raw/upcell/`
- Catálogo: `bronze.upcell`
- Coluna de controle: `DtAtualizacao` (presente em todos os arquivos)

In [0]:
import delta

## 1️⃣ Importações e Setup

In [0]:
spark.catalog.clearCache()

In [0]:
# Teste: Visualizar arquivos CDC disponíveis
df_test = spark.read.format("parquet").load(f"/Volumes/raw/upcell/cdc/transacao_produto/")
print(f"Total de registros CDC: {df_test.count()}")
print(f"Colunas: {df_test.columns}")
display(df_test.limit(5))

idTransacaoProduto,IdTransacao,IdProduto,QtdeProduto,vlProduto,op,DtAtualizacao
0030d8a6-af29-47a9-96ab-2ef1b5d5668c,deafb051-ff5b-4d63-ae30-490da938d2de,5,1,1,I,2025-10-03T16:37:34.807051
003b43ff-9ade-41c2-a1bc-fb54b21e09ea,e125d9c5-614c-4688-b47d-0abc5fc3c363,5,1,1,I,2025-10-03T16:37:34.807051
0045ff13-09c2-41c2-8ddc-cb5181ace922,67e9fdbe-1935-40af-aad6-c26f3d6e015d,5,1,1,I,2025-10-03T16:37:34.807051
004b2ff9-7980-4cb3-be8c-ffbef2837106,bd4791ea-fc30-47d6-9b65-ea9d2f9c7a1f,5,1,1,I,2025-10-03T16:37:34.807051
00a10130-aaab-41a0-8f37-45d917ae5def,e7855cb3-1efa-49a3-b1bd-cebdcb219813,5,1,1,I,2025-10-03T16:37:34.807051
00b93f8e-2a33-4a8c-bec4-a0882faffa09,24634e16-95cf-4cb0-ab0e-add1fadded12,5,1,1,I,2025-10-03T16:37:34.807051
0100f58e-c5ac-4ba1-a8ee-1871542e9014,238dd9f2-97d3-46b9-b71f-e04dec2e5548,11,1,50,I,2025-10-03T16:37:34.807051
0122a02d-f7ca-4470-96cd-d47b7588bf80,aabd7edd-a066-48d3-88d3-69b9dfb0d7c8,11,1,50,I,2025-10-03T16:37:34.807051
014b384b-39ca-40b8-88d1-dea2ae188a8f,f1a7cec7-b143-465b-b6e7-5e07f36fab6c,11,1,50,I,2025-10-03T16:37:34.807051
016d8225-bd37-4490-9696-b1d7ec533c99,c38f4092-15d6-4614-817d-50eb5c10145c,5,1,1,I,2025-10-03T16:37:34.807051


In [0]:
def table_exists(catalog, database, table):
    count = (spark.sql(f"SHOW TABLES IN `{catalog}`.`{database}`")
               .filter(f"database = '{database}' AND tableName = '{table}'")
               .count())
    return count == 1

In [0]:
catalog = "bronze"
schema = "upcell"
tablename = "transacao_produto"
id_field = "idTransacaoProduto"
timefield = "DtAtualizacao"

## 2️⃣ Configuração da Tabela

Defina a tabela que será processada e os campos de controle.

In [0]:
# Carga inicial: Cria tabela Delta a partir do full-load
if not table_exists(catalog, schema, tablename):
    print(f"⚠️  Tabela {catalog}.{schema}.{tablename} NÃO existe. Criando a partir do full-load...")

    # Lê full-load (já tem DtAtualizacao!)
    df_full = spark.read.format("parquet").load(f"/Volumes/raw/upcell/full-load/{tablename}")
    
    print(f"📊 Total de registros no full-load: {df_full.count():,}")

    # Cria tabela Delta
    (df_full.coalesce(1)
            .write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(f"{catalog}.{schema}.{tablename}"))
    
    print(f"✅ Tabela {catalog}.{schema}.{tablename} criada com sucesso!")
    
else:
    print(f"✅ Tabela {catalog}.{schema}.{tablename} já existe. Pular para o CDC merge.")

tabela nao existe, criando....


## 3️⃣ Full-Load (Carga Inicial)

Se a tabela não existe, cria a partir dos dados de full-load.

In [0]:
%sql
select * from bronze.upcell.transacoes

IdTransacao,IdCliente,DtCriacao,QtdePontos,DescSistemaOrigem,DtAtualizacao
0000520a-a4e5-4977-b360-17be62fa0f2b,24782f0b-4683-4f35-976a-ea21d6714ba6,2025-09-17 12:28:41.864,1,twitch,2025-10-03T17:05:11.008701
000060e8-aa76-4286-a8d7-f30e6fa47edd,252a0923-3f79-45bb-b664-3040235c6c58,2024-07-23 12:49:49.874,1,twitch,2025-10-03T17:05:11.008701
000095de-3daa-4cfb-a0ae-e7b2c8bc3c9b,30f45a6d-ada5-4a17-8338-710e414eb6c6,2025-09-17 13:38:57.479,1,twitch,2025-10-03T17:05:11.008701
0000c010-a592-46f7-8b0b-6f841bee64ba,65662aff-44d6-4f06-b9d9-07445c6e5943,2025-10-03 12:23:39.779,1,twitch,2025-10-03T17:05:11.008701
0000dfbb-e14e-4ea7-a57f-60236869fffe,5f8fcbe0-6014-43f8-8b83-38cf2f4887b3,2024-02-20 13:21:45.613,1,twitch,2025-10-03T17:05:11.008701
0000e94f-fb76-42b7-82e3-5059b6e40703,3884c92f-a635-4b67-9e5a-97730829cdca,2024-02-05 13:27:18.987,1,twitch,2025-10-03T17:05:11.008701
00017768-81eb-45ae-ad56-f189dd657808,3b878853-34a5-44be-9a8e-05642b853dc6,2025-09-30 12:02:16.448,1,twitch,2025-10-03T17:05:11.008701
00019b46-6b25-47c3-aa8f-f550d7dbcb00,e2624c1c-c8af-4622-94e0-f3470037292f,2024-06-20 19:22:45.600,1,twitch,2025-10-03T17:05:11.008701
0001d503-58d0-4c3e-b0f4-eda1d5f97d06,9aba2685-7c0a-4d1f-8e22-0c47ca0af580,2024-03-18 13:04:21.814,1,twitch,2025-10-03T17:05:11.008701
0001fa55-581a-44e6-8e40-1bb44f82de8e,05de679b-9989-4416-9aa4-20c149999c8d,2024-07-11 13:46:05.759,1,twitch,2025-10-03T17:05:11.008701


In [0]:
# Processa CDC: Filtra apenas arquivos novos e deduplica
print("📥 Carregando dados CDC...")

# 1. Busca a última atualização já processada na tabela Bronze
last_processed = spark.sql(f"""
    SELECT COALESCE(MAX(DtAtualizacao), '1900-01-01') as last_dt
    FROM {catalog}.{schema}.{tablename}
""").collect()[0]['last_dt']

print(f"? Última atualização processada: {last_processed}")

# 2. Lê TODOS os arquivos CDC (por enquanto)
(spark.read
    .format("parquet")
    .load(f"/Volumes/raw/upcell/cdc/{tablename}")
    .createOrReplaceTempView(f"view_{tablename}"))

# 3. Filtra apenas registros NOVOS (DtAtualizacao > última processada)
query_filter = f"""
    SELECT *  
    FROM view_{tablename}
    WHERE DtAtualizacao > '{last_processed}'
"""

df_cdc_new = spark.sql(query_filter)
total_new_records = df_cdc_new.count()

print(f"📊 Total de registros CDC NOVOS: {total_new_records:,}")

if total_new_records == 0:
    print("⚠️  Nenhum registro CDC novo encontrado. Pulando merge.")
    df_cdc_unique = df_cdc_new  # DataFrame vazio
else:
    # 4. Deduplica: Pega apenas o último registro de cada chave (nos dados NOVOS)
    query_dedupe = f"""
        SELECT *  
        FROM view_{tablename}
        WHERE DtAtualizacao > '{last_processed}'
        QUALIFY ROW_NUMBER() OVER(PARTITION BY {id_field} ORDER BY {timefield} DESC) = 1
    """
    
    df_cdc_unique = spark.sql(query_dedupe)
    
    print(f"📊 Total de registros CDC únicos (após deduplicação): {df_cdc_unique.count():,}")
    print(f"📋 Operações no CDC:")
    df_cdc_unique.groupBy("op").count().display()
    
    print("\n🔍 Sample de registros CDC:")
    df_cdc_unique.display()

idTransacaoProduto,IdTransacao,IdProduto,QtdeProduto,vlProduto,op,DtAtualizacao
0030d8a6-af29-47a9-96ab-2ef1b5d5668c,deafb051-ff5b-4d63-ae30-490da938d2de,5,1,1,I,2025-10-03T16:37:34.807051
003b43ff-9ade-41c2-a1bc-fb54b21e09ea,e125d9c5-614c-4688-b47d-0abc5fc3c363,5,1,1,I,2025-10-03T16:37:34.807051
0045ff13-09c2-41c2-8ddc-cb5181ace922,67e9fdbe-1935-40af-aad6-c26f3d6e015d,5,1,1,I,2025-10-03T16:37:34.807051
004b2ff9-7980-4cb3-be8c-ffbef2837106,bd4791ea-fc30-47d6-9b65-ea9d2f9c7a1f,5,1,1,I,2025-10-03T16:37:34.807051
00a10130-aaab-41a0-8f37-45d917ae5def,e7855cb3-1efa-49a3-b1bd-cebdcb219813,5,1,1,I,2025-10-03T16:37:34.807051
00b93f8e-2a33-4a8c-bec4-a0882faffa09,24634e16-95cf-4cb0-ab0e-add1fadded12,5,1,1,I,2025-10-03T16:37:34.807051
0100f58e-c5ac-4ba1-a8ee-1871542e9014,238dd9f2-97d3-46b9-b71f-e04dec2e5548,11,1,50,I,2025-10-03T16:37:34.807051
0122a02d-f7ca-4470-96cd-d47b7588bf80,aabd7edd-a066-48d3-88d3-69b9dfb0d7c8,11,1,50,I,2025-10-03T16:37:34.807051
014b384b-39ca-40b8-88d1-dea2ae188a8f,f1a7cec7-b143-465b-b6e7-5e07f36fab6c,11,1,50,I,2025-10-03T16:37:34.807051
016d8225-bd37-4490-9696-b1d7ec533c99,c38f4092-15d6-4614-817d-50eb5c10145c,5,1,1,I,2025-10-03T16:37:34.807051


## 4️⃣ Processamento CDC

Carrega arquivos CDC, deduplica e prepara para o merge.

In [0]:
bronze = delta.DeltaTable.forName(spark, f"{catalog}.{schema}.{tablename}")
bronze


<delta.tables.DeltaTable at 0x7f489bbd7e30>

In [None]:
# 📊 ANTES DO MERGE: Captura estatísticas atuais
print("=" * 60)
print("📊 ESTATÍSTICAS ANTES DO MERGE")
print("=" * 60)

# Contagem total antes
count_before = spark.sql(f"SELECT COUNT(*) as total FROM {catalog}.{schema}.{tablename}").collect()[0]['total']
print(f"\n✅ Total de registros ANTES: {count_before:,}")

# Detalhes da tabela antes
details_before = spark.sql(f"DESCRIBE DETAIL {catalog}.{schema}.{tablename}").select("numFiles", "sizeInBytes").collect()[0]
print(f"📁 Arquivos: {details_before['numFiles']}")
print(f"💾 Tamanho: {details_before['sizeInBytes']:,} bytes ({details_before['sizeInBytes'] / (1024*1024):.2f} MB)")

# Última atualização antes
last_update_before = spark.sql(f"""
    SELECT MAX(DtAtualizacao) as ultima_atualizacao 
    FROM {catalog}.{schema}.{tablename}
""").collect()[0]['ultima_atualizacao']
print(f"🕐 Última atualização: {last_update_before}")

print("\n" + "=" * 60)

In [0]:
# Merge CDC na tabela Delta Bronze
print("🔄 Executando merge CDC na tabela Bronze...")

# Verifica se há dados novos para processar
if df_cdc_unique.count() == 0:
    print("⏭️  Nenhum dado CDC novo. Merge não executado.")
    
    # Define variáveis para comparação (sem mudanças)
    count_after = count_before
    details_after = details_before
    last_update_after = last_update_before
    
else:
    bronze = delta.DeltaTable.forName(spark, f"{catalog}.{schema}.{tablename}")
    
    (bronze.alias("b") 
      .merge(df_cdc_unique.alias("d"), f"b.{id_field} = d.{id_field}") 
      .whenMatchedDelete(condition = "d.op = 'D'")           # Delete se op = 'D'
      .whenMatchedUpdateAll(condition = "d.op = 'U'")        # Update se op = 'U'
      .whenNotMatchedInsertAll(condition = "d.op = 'I'")     # Insert se op = 'I'
      .execute()
    )
    
    print("✅ Merge CDC executado com sucesso!")
    
    # 📊 DEPOIS DO MERGE: Captura estatísticas atualizadas
    print("\n" + "=" * 60)
    print("📊 ESTATÍSTICAS DEPOIS DO MERGE")
    print("=" * 60)
    
    # Contagem total depois
    count_after = spark.sql(f"SELECT COUNT(*) as total FROM {catalog}.{schema}.{tablename}").collect()[0]['total']
    print(f"\n✅ Total de registros DEPOIS: {count_after:,}")
    
    # Detalhes da tabela depois
    details_after = spark.sql(f"DESCRIBE DETAIL {catalog}.{schema}.{tablename}").select("numFiles", "sizeInBytes").collect()[0]
    print(f"📁 Arquivos: {details_after['numFiles']}")
    print(f"💾 Tamanho: {details_after['sizeInBytes']:,} bytes ({details_after['sizeInBytes'] / (1024*1024):.2f} MB)")
    
    # Última atualização depois
    last_update_after = spark.sql(f"""
        SELECT MAX(DtAtualizacao) as ultima_atualizacao 
        FROM {catalog}.{schema}.{tablename}
    """).collect()[0]['ultima_atualizacao']
    print(f"🕐 Última atualização: {last_update_after}")

# 🔄 COMPARAÇÃO: Calcula diferenças
print("\n" + "=" * 60)
print("🔄 COMPARAÇÃO: ANTES vs DEPOIS")
print("=" * 60)

diff_records = count_after - count_before
diff_size = details_after['sizeInBytes'] - details_before['sizeInBytes']
diff_files = details_after['numFiles'] - details_before['numFiles']

print(f"\n📊 Diferença de registros: {diff_records:+,} ({'+' if diff_records >= 0 else ''}{(diff_records/count_before*100 if count_before > 0 else 0):.2f}%)")
print(f"💾 Diferença de tamanho: {diff_size:+,} bytes ({diff_size / (1024*1024):+.2f} MB)")
print(f"📁 Diferença de arquivos: {diff_files:+}")

print("\n" + "=" * 60)

## 5️⃣ Merge CDC na Tabela Delta

Aplica as operações de Insert, Update e Delete na camada Bronze.

In [0]:
%sql
select IdTransacao
 from bronze.upcell.transacoes

IdTransacao,IdCliente,DtCriacao,QtdePontos,DescSistemaOrigem,DtAtualizacao
0000520a-a4e5-4977-b360-17be62fa0f2b,24782f0b-4683-4f35-976a-ea21d6714ba6,2025-09-17 12:28:41.864,1,twitch,2025-10-03T17:05:11.008701
000060e8-aa76-4286-a8d7-f30e6fa47edd,252a0923-3f79-45bb-b664-3040235c6c58,2024-07-23 12:49:49.874,1,twitch,2025-10-03T17:05:11.008701
000095de-3daa-4cfb-a0ae-e7b2c8bc3c9b,30f45a6d-ada5-4a17-8338-710e414eb6c6,2025-09-17 13:38:57.479,1,twitch,2025-10-03T17:05:11.008701
0000c010-a592-46f7-8b0b-6f841bee64ba,65662aff-44d6-4f06-b9d9-07445c6e5943,2025-10-03 12:23:39.779,1,twitch,2025-10-03T17:05:11.008701
0000dfbb-e14e-4ea7-a57f-60236869fffe,5f8fcbe0-6014-43f8-8b83-38cf2f4887b3,2024-02-20 13:21:45.613,1,twitch,2025-10-03T17:05:11.008701
0000e94f-fb76-42b7-82e3-5059b6e40703,3884c92f-a635-4b67-9e5a-97730829cdca,2024-02-05 13:27:18.987,1,twitch,2025-10-03T17:05:11.008701
00017768-81eb-45ae-ad56-f189dd657808,3b878853-34a5-44be-9a8e-05642b853dc6,2025-09-30 12:02:16.448,1,twitch,2025-10-03T17:05:11.008701
00019b46-6b25-47c3-aa8f-f550d7dbcb00,e2624c1c-c8af-4622-94e0-f3470037292f,2024-06-20 19:22:45.600,1,twitch,2025-10-03T17:05:11.008701
0001d503-58d0-4c3e-b0f4-eda1d5f97d06,9aba2685-7c0a-4d1f-8e22-0c47ca0af580,2024-03-18 13:04:21.814,1,twitch,2025-10-03T17:05:11.008701
0001fa55-581a-44e6-8e40-1bb44f82de8e,05de679b-9989-4416-9aa4-20c149999c8d,2024-07-11 13:46:05.759,1,twitch,2025-10-03T17:05:11.008701


In [None]:
# Validação 1: Contagem total de registros
total = spark.sql(f"SELECT COUNT(*) as total FROM {catalog}.{schema}.{tablename}").collect()[0]['total']
print(f"📊 Total de registros na tabela Bronze: {total:,}")

# Validação 2: Verificar se DtAtualizacao está presente
sample = spark.sql(f"SELECT * FROM {catalog}.{schema}.{tablename} LIMIT 5")
print(f"\n✅ Colunas da tabela: {sample.columns}")
sample.display()

In [None]:
# Validação 3: Verificar histórico de versões Delta
print("📜 Histórico de versões da tabela Delta:\n")
spark.sql(f"DESCRIBE HISTORY {catalog}.{schema}.{tablename}").select(
    "version", 
    "timestamp", 
    "operation", 
    "operationMetrics"
).display()

In [None]:
-- Validação 4: Análise de dados por DtAtualizacao
SELECT 
    DATE(DtAtualizacao) as data_atualizacao,
    COUNT(*) as total_registros,
    MIN(DtAtualizacao) as primeira_atualizacao,
    MAX(DtAtualizacao) as ultima_atualizacao
FROM bronze.upcell.transacao_produto
GROUP BY DATE(DtAtualizacao)
ORDER BY data_atualizacao DESC

### 🎯 Próximos Passos:

1. **Executar para outras tabelas**: Altere a variável `tablename` para processar:
   - `clientes`
   - `produtos`
   - `transacoes`

2. **Agendar execução**: Configure um Job no Databricks para rodar periodicamente

3. **Otimizar tabela**: Execute `OPTIMIZE` e `VACUUM` periodicamente:
   ```sql
   OPTIMIZE bronze.upcell.transacao_produto;
   VACUUM bronze.upcell.transacao_produto RETAIN 168 HOURS;
   ```

4. **Monitorar**: Use `DESCRIBE HISTORY` para acompanhar versões

## 6️⃣ Validação Final

Consulta a tabela Bronze para validar os dados.

---

## 📚 Referência: Operações CDC

| Operação | Código | Descrição | Ação no Merge |
|----------|--------|-----------|---------------|
| **Insert** | `I` | Novo registro | `whenNotMatchedInsertAll()` |
| **Update** | `U` | Registro atualizado | `whenMatchedUpdateAll()` |
| **Delete** | `D` | Registro deletado | `whenMatchedDelete()` |

### 🔍 Como Funciona o QUALIFY:

```sql
QUALIFY ROW_NUMBER() OVER(PARTITION BY {id_field} ORDER BY {timefield} DESC) = 1
```

- **PARTITION BY**: Agrupa por chave primária
- **ORDER BY DESC**: Ordena pelo timestamp (mais recente primeiro)
- **ROW_NUMBER() = 1**: Pega apenas o registro mais recente de cada chave

### ✅ Checklist de Validação:

- [ ] Full-load executado com sucesso
- [ ] CDC processado sem erros
- [ ] Merge aplicado corretamente
- [ ] Contagem de registros confere
- [ ] Coluna `DtAtualizacao` presente em todos os registros