# üè† Explorando o Data Lakehouse (Delta Table)

Neste notebook vamos explorar os dados armazenados no **Data Lakehouse** (Delta Tables).

O objetivo √© demonstrar as vantagens do Lakehouse sobre o Data Lake:
- ‚úÖ Transa√ß√µes ACID
- ‚úÖ Time Travel (versionamento)
- ‚úÖ Schema Enforcement
- ‚úÖ Suporte a MERGE/UPSERT

In [None]:
import duckdb
import pandas as pd
from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import os

# Paths
BASE_DIR = os.path.abspath('..')
BRONZE = os.path.join(BASE_DIR, 'data', 'lakehouse', '01_bronze', 'online_retail')
SILVER = os.path.join(BASE_DIR, 'data', 'lakehouse', '02_silver')
GOLD = os.path.join(BASE_DIR, 'data', 'lakehouse', '03_gold')

## 1. Bronze Layer
Dados brutos em formato Delta Table.

In [None]:
# Verificar os arquivos na pasta ‚Äî note o _delta_log!
print('Conte√∫do da pasta Bronze (Delta Table):')
for f in os.listdir(BRONZE):
    full_path = os.path.join(BRONZE, f)
    if os.path.isdir(full_path):
        print(f'  üìÅ {f}/ (transaction log)')
    else:
        size_mb = os.path.getsize(full_path) / (1024 * 1024)
        print(f'  üìÑ {f} ({size_mb:.2f} MB)')

In [None]:
# Carregar Delta Table usando a lib deltalake
dt_bronze = DeltaTable(BRONZE)

print(f'Vers√£o atual: {dt_bronze.version()}')
print(f'Schema: {dt_bronze.schema()}')
print(f'Arquivos: {dt_bronze.file_uris()}')

In [None]:
# Explorar dados usando DuckDB
con = duckdb.connect()
con.register('bronze', dt_bronze.to_pyarrow_dataset())

con.execute('SELECT * FROM bronze LIMIT 5').df()

In [None]:
# Contagem
con.execute('SELECT COUNT(*) as total_rows FROM bronze').df()

## 2. Silver Layer
Dados limpos e modelados em Star Schema.

In [None]:
# Listar tabelas na Silver
print('Tabelas Delta na Silver:')
for table in os.listdir(SILVER):
    table_path = os.path.join(SILVER, table)
    if os.path.isdir(table_path):
        try:
            dt = DeltaTable(table_path)
            print(f'  üìä {table} (vers√£o {dt.version()})')
        except:
            print(f'  üìÅ {table} (n√£o √© Delta Table)')

In [None]:
# Fact Sales
dt_fact = DeltaTable(os.path.join(SILVER, 'fact_sales'))
con.register('fact_sales', dt_fact.to_pyarrow_dataset())
con.execute('SELECT * FROM fact_sales LIMIT 5').df()

In [None]:
# Dim Customer
dt_cust = DeltaTable(os.path.join(SILVER, 'dim_customer'))
con.register('dim_customer', dt_cust.to_pyarrow_dataset())
con.execute('SELECT * FROM dim_customer LIMIT 5').df()

In [None]:
# Dim Product
dt_prod = DeltaTable(os.path.join(SILVER, 'dim_product'))
con.register('dim_product', dt_prod.to_pyarrow_dataset())
con.execute('SELECT * FROM dim_product LIMIT 5').df()

In [None]:
# Contagem por tabela
for table in ['fact_sales', 'dim_customer', 'dim_product']:
    count = con.execute(f'SELECT COUNT(*) FROM {table}').fetchone()[0]
    print(f'{table}: {count:,} registros')

## 3. Gold Layer
Agrega√ß√µes prontas para an√°lise.

In [None]:
# Daily Sales
dt_daily = DeltaTable(os.path.join(GOLD, 'daily_sales'))
con.register('daily_sales', dt_daily.to_pyarrow_dataset())
con.execute('SELECT * FROM daily_sales LIMIT 10').df()

In [None]:
# Sales by Country
dt_country = DeltaTable(os.path.join(GOLD, 'sales_by_country'))
con.register('sales_by_country', dt_country.to_pyarrow_dataset())
con.execute('SELECT * FROM sales_by_country LIMIT 10').df()

## ‚úÖ Vantagens do Lakehouse (Delta Table)

### 1. Time Travel
Podemos acessar vers√µes anteriores dos dados.

In [None]:
# Verificar hist√≥rico de vers√µes
dt_bronze = DeltaTable(BRONZE)
print(f'Vers√£o atual da Bronze: {dt_bronze.version()}')
print(f'\nHist√≥rico de opera√ß√µes:')
for action in dt_bronze.history():
    print(f"  Vers√£o {action.get('version', '?')}: {action.get('operation', '?')} em {action.get('timestamp', '?')}")

In [None]:
# Acessar vers√£o espec√≠fica (Time Travel!)
dt_v0 = DeltaTable(BRONZE, version=0)
print(f'Dados da vers√£o 0:')
print(f'  Linhas: {len(dt_v0.to_pandas()):,}')

In [None]:
# Acessar vers√£o espec√≠fica (Time Travel!)
dt_v1 = DeltaTable(BRONZE, version=1)
print(f'Dados da vers√£o 1:')
print(f'  Linhas: {len(dt_v1.to_pandas()):,}')

### 2. Schema Enforcement
O Delta Table impede a grava√ß√£o de dados com schema incompat√≠vel.

In [None]:
# Tentar escrever dados com schema diferente
df_wrong = pd.DataFrame({
    'coluna_errada': [1, 2, 3],
    'outra_coluna': ['a', 'b', 'c']
})

try:
    write_deltalake(BRONZE, df_wrong, mode='append')
    print('ERRO: N√£o deveria ter conseguido escrever!')
except Exception as e:
    print(f'‚úÖ Schema Enforcement funcionou!')
    print(f'Erro capturado: {type(e).__name__}')
    print(f'Mensagem: {e}')

### 3. Transa√ß√µes ACID
Cada opera√ß√£o de escrita √© at√¥mica ‚Äî ou √© 100% salva ou nada acontece.

In [None]:
# O _delta_log cont√©m o registro de todas as transa√ß√µes
log_path = os.path.join(BRONZE, '_delta_log')
print('Arquivos no transaction log:')
for f in sorted(os.listdir(log_path)):
    print(f'  {f}')

print('\nCada arquivo .json representa uma transa√ß√£o at√¥mica.')
print('Se uma escrita falhar, o log n√£o √© atualizado e os dados permanecem consistentes.')

### 4. Compara√ß√£o Visual: Pasta Data Lake vs Pasta Lakehouse

In [None]:
# Compara√ß√£o das pastas
lake_bronze = os.path.join(BASE_DIR, 'data', 'lake', '01_bronze')

print('=== DATA LAKE (Parquet) ===')
for f in os.listdir(lake_bronze):
    print(f'  {f}')

print()
print('=== DATA LAKEHOUSE (Delta Table) ===')
for f in os.listdir(BRONZE):
    full = os.path.join(BRONZE, f)
    marker = 'üìÅ' if os.path.isdir(full) else 'üìÑ'
    print(f'  {marker} {f}')

print()
print('A diferen√ßa principal √© a pasta _delta_log/ que cont√©m o transaction log.')
print('√â isso que d√° ao Lakehouse ACID, Time Travel e Schema Enforcement.')