# üìä Demo Completo: Structured Streaming com Parquet

Notebook completo para demonstrar **Structured Streaming** processando arquivos Parquet arquivo por arquivo.

## üéØ Objetivos
- Processar arquivos um por um com `maxFilesPerTrigger=1`
- Gerar dados sint√©ticos de e-commerce (8 arquivos Parquet)
- Demonstrar processamento vis√≠vel batch por batch
- Criar tabelas Delta Lake automaticamente
- Agrega√ß√µes em tempo real por categoria e regi√£o
- Visualizar m√©tricas completas de processamento

## ‚úÖ Compat√≠vel com Databricks Free Edition
- Sem depend√™ncias externas (Kafka, etc)
- Usa dados sint√©ticos gerados pelo pr√≥prio notebook
- Funciona em Serverless Compute

## 1Ô∏è‚É£ Setup e Configura√ß√£o

In [None]:
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta
import random

# Detectar ambiente
IS_DATABRICKS = 'DATABRICKS_RUNTIME_VERSION' in os.environ

# Configurar paths
if IS_DATABRICKS:
    BASE_PATH = '/FileStore/streaming_demo'
else:
    BASE_PATH = '/tmp/streaming_demo'

INPUT_PATH = f'{BASE_PATH}/input'
OUTPUT_PATH = f'{BASE_PATH}/output'
CHECKPOINT_PATH = f'{BASE_PATH}/checkpoint'

print(f'üîß Ambiente: {"Databricks" if IS_DATABRICKS else "Local"}')
print(f'üìÇ Input: {INPUT_PATH}')
print(f'üíæ Output: {OUTPUT_PATH}')
print(f'üîñ Checkpoint: {CHECKPOINT_PATH}')

## 2Ô∏è‚É£ Gerar Dados Sint√©ticos de E-commerce

In [None]:
# Schema dos dados
schema = StructType([
    StructField('transaction_id', IntegerType(), False),
    StructField('customer_id', IntegerType(), False),
    StructField('product', StringType(), False),
    StructField('category', StringType(), False),
    StructField('quantity', IntegerType(), False),
    StructField('price', DoubleType(), False),
    StructField('total', DoubleType(), False),
    StructField('timestamp', TimestampType(), False),
    StructField('region', StringType(), False),
    StructField('payment_method', StringType(), False)
])

def generate_transactions(n=500, batch_id=0):
    """Gera transa√ß√µes sint√©ticas de e-commerce"""
    categories = ['Electronics', 'Books', 'Clothing', 'Food', 'Sports', 'Home']
    products = {
        'Electronics': ['Laptop', 'Phone', 'Tablet', 'Headphones'],
        'Books': ['Fiction', 'Tech', 'Business', 'Science'],
        'Clothing': ['Shirt', 'Pants', 'Shoes', 'Jacket'],
        'Food': ['Snacks', 'Drinks', 'Frozen', 'Fresh'],
        'Sports': ['Equipment', 'Apparel', 'Accessories'],
        'Home': ['Furniture', 'Decor', 'Kitchen', 'Garden']
    }
    regions = ['North', 'South', 'East', 'West', 'Central']
    payments = ['Credit Card', 'Debit Card', 'Cash', 'Digital Wallet']
    
    data = []
    base_id = batch_id * n
    base_time = datetime.now() - timedelta(hours=24)
    
    for i in range(n):
        category = random.choice(categories)
        product = random.choice(products[category])
        qty = random.randint(1, 5)
        price = round(random.uniform(10, 500), 2)
        
        data.append((
            base_id + i,
            random.randint(1000, 9999),
            product,
            category,
            qty,
            price,
            round(qty * price, 2),
            base_time + timedelta(minutes=random.randint(0, 1440)),
            random.choice(regions),
            random.choice(payments)
        ))
    
    return data

print('‚úÖ Fun√ß√£o de gera√ß√£o definida!')

In [None]:
print('üé≤ Gerando 8 arquivos Parquet para demonstra√ß√£o...\n')

# Limpar pasta de input
if IS_DATABRICKS:
    try:
        dbutils.fs.rm(INPUT_PATH, True)
    except:
        pass
    dbutils.fs.mkdirs(INPUT_PATH)

# Gerar 8 arquivos
for i in range(8):
    data = generate_transactions(n=300, batch_id=i)
    df = spark.createDataFrame(data, schema)
    
    file_path = f'{INPUT_PATH}/transactions_batch_{i:03d}.parquet'
    df.coalesce(1).write.mode('overwrite').parquet(file_path)
    
    print(f'  ‚úÖ Arquivo {i+1}/8: transactions_batch_{i:03d}.parquet ({len(data)} transa√ß√µes)')

print(f'\n‚úÖ Total: {8 * 300} transa√ß√µes em 8 arquivos')
print(f'üìÇ Salvos em: {INPUT_PATH}')

## 3Ô∏è‚É£ Configurar Structured Streaming com maxFilesPerTrigger=1

In [None]:
# Criar streaming DataFrame com maxFilesPerTrigger=1
streaming_df = spark.readStream \
    .schema(schema) \
    .option('maxFilesPerTrigger', 1) \
    .parquet(INPUT_PATH)

# Adicionar colunas de controle
enriched_df = streaming_df \
    .withColumn('processing_time', current_timestamp()) \
    .withColumn('file_name', input_file_name()) \
    .withColumn('date', to_date(col('timestamp'))) \
    .withColumn('hour', hour(col('timestamp'))) \
    .withColumn('revenue', round(col('total'), 2))

print('‚úÖ Streaming DataFrame configurado!')
print(f'\nüìä Schema:')
enriched_df.printSchema()
print(f'\nüîÑ √â streaming? {streaming_df.isStreaming}')
print(f'‚öôÔ∏è  maxFilesPerTrigger: 1 (processa 1 arquivo por vez)')
print(f'üìù Colunas adicionadas: processing_time, file_name, date, hour, revenue')

## 4Ô∏è‚É£ Criar Agrega√ß√µes por Categoria e Regi√£o

In [None]:
# Agrega√ß√£o por categoria e regi√£o
aggregated_df = enriched_df \
    .groupBy('category', 'region', 'date') \
    .agg(
        count('*').alias('total_transactions'),
        sum('quantity').alias('total_items'),
        sum('revenue').alias('total_revenue'),
        avg('revenue').alias('avg_transaction_value'),
        countDistinct('customer_id').alias('unique_customers'),
        min('timestamp').alias('first_transaction'),
        max('timestamp').alias('last_transaction')
    ) \
    .withColumn('avg_transaction_value', round(col('avg_transaction_value'), 2)) \
    .withColumn('total_revenue', round(col('total_revenue'), 2))

print('‚úÖ Pipeline de agrega√ß√£o criado!')
print('üìä M√©tricas calculadas:')
print('   ‚Ä¢ Total de transa√ß√µes')
print('   ‚Ä¢ Total de itens vendidos')
print('   ‚Ä¢ Receita total')
print('   ‚Ä¢ Valor m√©dio por transa√ß√£o')
print('   ‚Ä¢ Clientes √∫nicos')
print('   ‚Ä¢ Primeira e √∫ltima transa√ß√£o')

## 5Ô∏è‚É£ Definir Fun√ß√£o de Processamento Batch (foreachBatch)

In [None]:
# Estat√≠sticas globais
batch_stats = {'count': 0, 'total_records': 0, 'total_revenue': 0}

def process_batch(batch_df, batch_id):
    """Processa cada batch mostrando m√©tricas detalhadas"""
    batch_stats['count'] += 1
    
    print('\n' + '='*100)
    print(f'üì¶ BATCH #{batch_stats["count"]} | Batch ID: {batch_id}')
    print('='*100)
    
    num_records = batch_df.count()
    
    if num_records == 0:
        print('‚ö†Ô∏è  Batch vazio - aguardando novos arquivos...')
        return
    
    batch_stats['total_records'] += num_records
    
    # Informa√ß√µes do arquivo processado
    files = batch_df.select('file_name').distinct().collect()
    if files:
        print(f'\nüìÑ Arquivo(s) processado(s):')
        for f in files:
            fname = f.file_name.split('/')[-1] if f.file_name else 'unknown'
            print(f'   ‚Ä¢ {fname}')
    
    # Estat√≠sticas do batch
    stats = batch_df.agg(
        count('*').alias('transactions'),
        sum('revenue').alias('revenue'),
        countDistinct('customer_id').alias('customers'),
        avg('revenue').alias('avg_ticket')
    ).collect()[0]
    
    batch_stats['total_revenue'] += stats.revenue if stats.revenue else 0
    
    print(f'\nüìä ESTAT√çSTICAS DO BATCH:')
    print(f'   üí≥ Transa√ß√µes: {stats.transactions:,}')
    print(f'   üí∞ Receita: ${stats.revenue:,.2f}')
    print(f'   üë• Clientes: {stats.customers:,}')
    print(f'   üéØ Ticket M√©dio: ${stats.avg_ticket:,.2f}')
    
    # Top categorias
    print(f'\nüèÜ TOP 5 CATEGORIAS (Receita):')
    top_cat = batch_df.groupBy('category') \
        .agg(sum('revenue').alias('revenue')) \
        .orderBy(col('revenue').desc()) \
        .limit(5)
    top_cat.show(truncate=False)
    
    # Distribui√ß√£o regional
    print(f'üåé DISTRIBUI√á√ÉO POR REGI√ÉO:')
    regions = batch_df.groupBy('region') \
        .agg(
            count('*').alias('transactions'),
            sum('revenue').alias('revenue')
        ) \
        .orderBy(col('revenue').desc())
    regions.show(truncate=False)
    
    # Totais acumulados
    print(f'\nüìà TOTAIS ACUMULADOS:')
    print(f'   üì¶ Batches processados: {batch_stats["count"]}')
    print(f'   üìä Transa√ß√µes totais: {batch_stats["total_records"]:,}')
    print(f'   üíµ Receita total: ${batch_stats["total_revenue"]:,.2f}')
    
    print('\n' + '-'*100)
    print(f'‚úÖ Batch #{batch_stats["count"]} conclu√≠do!')
    print('-'*100 + '\n')

print('‚úÖ Fun√ß√£o process_batch() definida!')

## 6Ô∏è‚É£ EXECUTAR Streaming com foreachBatch

**Execute esta c√©lula e observe o processamento arquivo por arquivo!**

In [None]:
# Resetar estat√≠sticas
batch_stats = {'count': 0, 'total_records': 0, 'total_revenue': 0}

print('üöÄ INICIANDO STRUCTURED STREAMING')
print('='*100)
print(f'üìÇ Source: {INPUT_PATH}')
print(f'‚öôÔ∏è  maxFilesPerTrigger: 1')
print(f'‚è±Ô∏è  Trigger: a cada 5 segundos')
print(f'üîñ Checkpoint: {CHECKPOINT_PATH}')
print('='*100)

# Iniciar stream com foreachBatch
query = enriched_df.writeStream \
    .foreachBatch(process_batch) \
    .trigger(processingTime='5 seconds') \
    .option('checkpointLocation', CHECKPOINT_PATH) \
    .start()

print(f'\n‚úÖ Stream ativo!')
print(f'üÜî Query ID: {query.id}')
print(f'üìä Status: {query.status}')
print(f'\nüí° Observe o processamento arquivo por arquivo abaixo...')
print(f'‚è∏Ô∏è  Execute a pr√≥xima c√©lula para parar ap√≥s ~60 segundos')

## 7Ô∏è‚É£ Parar o Streaming

**Aguarde ~60 segundos para processar todos os arquivos, ent√£o execute esta c√©lula**

In [None]:
import time

print('‚è≥ Aguardando 60 segundos para processar todos os arquivos...')
time.sleep(60)

query.stop()

print('\n' + '='*100)
print('üõë STREAMING FINALIZADO')
print('='*100)
print(f'\nüìä RESUMO FINAL:')
print(f'   üì¶ Total de batches: {batch_stats["count"]}')
print(f'   üí≥ Total de transa√ß√µes: {batch_stats["total_records"]:,}')
print(f'   üí∞ Receita total: ${batch_stats["total_revenue"]:,.2f}')
print(f'   ‚úÖ Checkpoint salvo em: {CHECKPOINT_PATH}')
print('='*100)

## 8Ô∏è‚É£ OP√á√ÉO 2: Gravar em Tabela Delta Lake

**Alternativa: processar e gravar diretamente em Delta Lake**

In [None]:
print('üöÄ Iniciando streaming com sa√≠da para Delta Lake...')
print(f'üíæ Tabela: {OUTPUT_PATH}')
print(f'üîñ Checkpoint: {CHECKPOINT_PATH}_delta\n')

# Streaming para Delta com agrega√ß√µes
query_delta = aggregated_df.writeStream \
    .format('delta') \
    .outputMode('complete') \
    .option('checkpointLocation', f'{CHECKPOINT_PATH}_delta') \
    .trigger(processingTime='5 seconds') \
    .start(OUTPUT_PATH)

print(f'‚úÖ Stream Delta ativo! Query ID: {query_delta.id}')
print(f'üí° Execute as pr√≥ximas c√©lulas ap√≥s ~60 segundos')

In [None]:
import time
time.sleep(60)
query_delta.stop()
print('‚úÖ Stream Delta finalizado!')

## 9Ô∏è‚É£ Consultar Tabela Delta

In [None]:
# Ler tabela Delta
delta_df = spark.read.format('delta').load(OUTPUT_PATH)

print(f'üìä Tabela Delta carregada!')
print(f'üìà Total de registros: {delta_df.count():,}\n')

print('üèÜ TOP 10 - Categoria + Regi√£o (por Receita):')
delta_df.orderBy(col('total_revenue').desc()).limit(10).show(truncate=False)

# Totais gerais
totals = delta_df.agg(
    sum('total_transactions').alias('transactions'),
    sum('total_revenue').alias('revenue'),
    sum('unique_customers').alias('customers')
).collect()[0]

print(f'\nüí∞ TOTAIS GERAIS:')
print(f'   üí≥ Transa√ß√µes: {totals.transactions:,.0f}')
print(f'   üíµ Receita: ${totals.revenue:,.2f}')
print(f'   üë• Clientes: {totals.customers:,.0f}')

## üîü Criar Tabela Gerenciada no Catalog (Databricks)

In [None]:
if IS_DATABRICKS:
    # Criar database
    spark.sql('CREATE DATABASE IF NOT EXISTS streaming_demo')
    
    # Criar tabela gerenciada
    delta_df.write \
        .format('delta') \
        .mode('overwrite') \
        .saveAsTable('streaming_demo.sales_summary')
    
    print('‚úÖ Tabela criada: streaming_demo.sales_summary')
    print('\nüìä Amostra da tabela:')
    
    spark.sql('SELECT * FROM streaming_demo.sales_summary LIMIT 10').show(truncate=False)
    
    print('\nüí° Agora voc√™ pode consultar com SQL:')
    print('   SELECT category, SUM(total_revenue) FROM streaming_demo.sales_summary GROUP BY category')
else:
    print('‚ö†Ô∏è Tabelas gerenciadas dispon√≠veis apenas no Databricks')

## 1Ô∏è‚É£1Ô∏è‚É£ Limpeza de Recursos

In [None]:
print('üßπ Limpando recursos...\n')

# Parar todos os streams ativos
for s in spark.streams.active:
    s.stop()
    print(f'‚úì Stream {s.id} parado')

# Remover arquivos (Databricks)
if IS_DATABRICKS:
    paths = [INPUT_PATH, OUTPUT_PATH, CHECKPOINT_PATH, f'{CHECKPOINT_PATH}_delta']
    for path in paths:
        try:
            dbutils.fs.rm(path, True)
            print(f'‚úì Removido: {path}')
        except:
            pass

print('\n‚úÖ Limpeza conclu√≠da!')

---

## üìö Resumo do Notebook

### O que foi demonstrado:
1. ‚úÖ **Structured Streaming** com `maxFilesPerTrigger=1`
2. ‚úÖ Processamento arquivo por arquivo vis√≠vel em tempo real
3. ‚úÖ Agrega√ß√µes din√¢micas por categoria e regi√£o
4. ‚úÖ Grava√ß√£o em **Delta Lake**
5. ‚úÖ Cria√ß√£o de **tabelas gerenciadas** no Catalog
6. ‚úÖ M√©tricas detalhadas: transa√ß√µes, receita, clientes, ticket m√©dio
7. ‚úÖ Monitoramento de streams ativos

### Conceitos-chave para ensinar:
- **maxFilesPerTrigger**: controla quantos arquivos processar por trigger
- **foreachBatch**: permite l√≥gica customizada para cada micro-batch
- **checkpointLocation**: garante exactly-once processing e recupera√ß√£o de falhas
- **Delta Lake**: formato ACID transacional para data lakes
- **outputMode**: 
  - `append` - adiciona novos registros
  - `complete` - reescreve resultado completo (usado com agrega√ß√µes)
  - `update` - atualiza apenas registros modificados

### Configura√ß√µes importantes:
- **trigger**: define quando processar (processingTime, once, continuous)
- **Schema inference**: sempre defina schema expl√≠cito em produ√ß√£o
- **Watermarks**: para processar dados com atraso temporal
- **Particionamento**: organize dados por data, regi√£o, categoria

### Para produ√ß√£o:
- Ajustar `maxFilesPerTrigger` conforme volume de dados
- Configurar triggers apropriados (n√£o usar 5 segundos em prod)
- Implementar watermarks para dados atrasados
- Adicionar tratamento de erros robusto
- Configurar alertas e monitoramento (Spark UI, Ganglia, Grafana)
- Usar particionamento eficiente no Delta Lake
- Testar recupera√ß√£o de falhas (checkpoint)

### Pr√≥ximos passos para seus alunos:
1. Modificar agrega√ß√µes (adicionar mais m√©tricas)
2. Experimentar diferentes outputModes
3. Adicionar watermarks para janelas temporais
4. Testar com volumes maiores de dados
5. Integrar com fontes reais (S3, Azure Blob, GCS)

**üéì Perfeito para ensinar os fundamentos de Structured Streaming!**