# 🏠 Hometown - Analytics Case
## Pipeline Principal - Dados de Aerogeradores SIGEL/ANEEL

Este notebook implementa um pipeline completo e idempotente para:
- **Extração** de dados da API SIGEL/ANEEL
- **Transformação** de JSON para Parquet otimizado
- **Consolidação** em CSV final para Tableau

### Características do Pipeline:
- ✅ **Idempotente**: Não reprocessa dados desnecessariamente
- ✅ **Baseado em dados**: Usa `DATA_ATUALIZACAO` da própria API
- ✅ **Paralelo**: ThreadPool para performance
- ✅ **Robusto**: Retry, validações e logs detalhados
- ✅ **Limpo**: Auto-limpeza de dados antigos

---
## 📦 1. Setup e Configuração Inicial

Configuração do ambiente, imports e validação da conectividade com a API.

In [1]:
# Imports básicos e configuração de paths
import sys
import json
from pathlib import Path
import pandas as pd

# Configurar path do projeto
project_root = Path.cwd().parent
src_path = project_root / "src"
sys.path.insert(0, str(src_path))

print("🔧 Configuração do ambiente...")
print(f"📁 Projeto root: {project_root}")
print(f"📁 Src path: {src_path}")
print("✅ Paths configurados!")

🔧 Configuração do ambiente...
📁 Projeto root: /home/victor-jose/Documents/projetos/hometown
📁 Src path: /home/victor-jose/Documents/projetos/hometown/src
✅ Paths configurados!


In [2]:
# Imports dos módulos do projeto
from extraction.extractors import SigelExtractor
from extraction.validators import validate_api_connection, validate_extraction_results
from transformation.processors import DataProcessor
from consolidation.consolidators import DataConsolidator
from config.settings import SIGEL_CONFIG
from utils.logger import setup_logger
from utils.exceptions import APIConnectionError, ValidationError

# Setup do logger principal
logger = setup_logger(__name__, "pipeline.log")

print("📚 Módulos importados com sucesso!")
print(f"🌐 API URL: {SIGEL_CONFIG['url']}")
print(f"📄 Tamanho da página: {SIGEL_CONFIG['page_size']} registros")
print("✅ Setup concluído!")

📚 Módulos importados com sucesso!
🌐 API URL: https://sigel.aneel.gov.br/arcgis/rest/services/PORTAL/WFS/MapServer/0/query
📄 Tamanho da página: 1000 registros
✅ Setup concluído!


### 🔍 Validação da Conectividade

Testa a conexão com a API SIGEL/ANEEL antes de iniciar o pipeline.

In [3]:
# Testar conectividade com a API
try:
    print("🌐 Testando conectividade com a API SIGEL/ANEEL...")
    
    api_url = SIGEL_CONFIG["url"]
    connection_ok = validate_api_connection(api_url)
    
    if connection_ok:
        print("✅ API conectada com sucesso!")
        logger.info("Validação de conectividade concluída com sucesso")
    else:
        print("❌ Falha na conexão com API")
        
except (APIConnectionError, ValidationError) as e:
    print(f"❌ Erro de validação: {e}")
    logger.error(f"Erro de validação: {e}")
except Exception as e:
    print(f"❌ Erro inesperado: {e}")
    logger.error(f"Erro inesperado durante validação: {e}")

🌐 Testando conectividade com a API SIGEL/ANEEL...
2025-06-01 17:41:43 - extraction.validators - INFO - API conectada com sucesso. Total de registros: 23522
✅ API conectada com sucesso!
2025-06-01 17:41:43 - __main__ - INFO - Validação de conectividade concluída com sucesso


---
## 🧹 2. Comandos de Limpeza e Manutenção

Funções utilitárias para gerenciar dados e fazer limpeza quando necessário.

In [4]:
def show_current_data_status():
    """Exibe status atual dos dados no projeto"""
    print("📊 STATUS ATUAL DOS DADOS:")
    print("="*40)
    
    # Contar arquivos em cada etapa
    raw_files = list(Path("data/raw").glob("aerogeradores_raw_*.json")) if Path("data/raw").exists() else []
    processed_files = list(Path("data/processed").glob("aerogeradores_processed_*.parquet")) if Path("data/processed").exists() else []
    output_files = list(Path("data/output").glob("aerogeradores_consolidado_*.csv")) if Path("data/output").exists() else []
    
    print(f"🗂️ Arquivos JSON (raw): {len(raw_files)}")
    print(f"📦 Arquivos Parquet (processed): {len(processed_files)}")
    print(f"📄 Arquivos CSV (output): {len(output_files)}")
    
    # Calcular tamanhos
    total_size = 0
    for files, label in [(raw_files, "JSONs"), (processed_files, "Parquets"), (output_files, "CSVs")]:
        if files:
            size = sum(f.stat().st_size for f in files) / 1024 / 1024
            total_size += size
            print(f"💾 Tamanho {label}: {size:.2f} MB")
    
    print(f"💾 Tamanho total: {total_size:.2f} MB")
    
    return {
        'raw_count': len(raw_files),
        'processed_count': len(processed_files),
        'output_count': len(output_files),
        'total_size_mb': total_size
    }

def cleanup_all_data():
    """Remove todos os dados para recomeçar do zero"""
    print("🧹 INICIANDO LIMPEZA COMPLETA...")
    
    # Instanciar classes para usar métodos de limpeza
    extractor = SigelExtractor()
    processor = DataProcessor()
    consolidator = DataConsolidator()
    
    # Executar limpeza de cada etapa
    raw_removed = extractor.cleanup_all_raw_data()
    processed_removed = processor.cleanup_all_processed_data()
    output_removed = consolidator.cleanup_all_output_data()
    
    total_removed = raw_removed + processed_removed + output_removed
    
    print(f"📊 RESUMO DA LIMPEZA:")
    print(f"  🗂️ JSONs removidos: {raw_removed}")
    print(f"  📦 Parquets removidos: {processed_removed}")
    print(f"  📄 CSVs removidos: {output_removed}")
    print(f"  🗑️ Total de arquivos: {total_removed}")
    print(f"✅ Limpeza completa concluída!")
    
    return total_removed

# Mostrar status inicial
status = show_current_data_status()

print("\n" + "="*50)
print("🔧 COMANDOS DISPONÍVEIS:")
print("• show_current_data_status() - Mostra status dos dados")
print("• cleanup_all_data() - Remove todos os dados")
print("="*50)

📊 STATUS ATUAL DOS DADOS:
🗂️ Arquivos JSON (raw): 0
📦 Arquivos Parquet (processed): 0
📄 Arquivos CSV (output): 0
💾 Tamanho total: 0.00 MB

🔧 COMANDOS DISPONÍVEIS:
• show_current_data_status() - Mostra status dos dados
• cleanup_all_data() - Remove todos os dados


---
## 🚀 3. Pipeline Principal - Extração de Dados

Extração idempotente de dados da API SIGEL/ANEEL. O sistema verifica automaticamente se os dados mudaram na API antes de fazer nova extração.

In [5]:
# Instanciar extrator e verificar freshness dos dados
print("🔍 VERIFICANDO FRESHNESS DOS DADOS...")
print("="*50)

extractor = SigelExtractor()
freshness = extractor.check_data_freshness()

print(f"📊 STATUS DOS DADOS:")
print(f"  🌐 API última atualização: {freshness['api_latest_update']}")
print(f"  💾 Nossa última extração: {freshness['our_last_extraction']}")
print(f"  🔄 Precisa atualizar: {freshness['needs_refresh']}")

if freshness['last_extraction_time']:
    print(f"  📅 Última extração em: {freshness['last_extraction_time']}")
    print(f"  📊 Registros da última extração: {freshness['last_total_records']:,}")
else:
    print(f"  📅 Primeira execução - nenhuma extração anterior")

print("\n" + "="*50)

🔍 VERIFICANDO FRESHNESS DOS DADOS...
2025-06-01 17:41:53 - extraction.extractors - INFO - Verificando freshness dos dados...
2025-06-01 17:41:54 - extraction.extractors - INFO - API última atualização: 1673359590000
2025-06-01 17:41:54 - extraction.extractors - INFO - Nossa última extração: 0
2025-06-01 17:41:54 - extraction.extractors - INFO - Precisa atualizar: True
📊 STATUS DOS DADOS:
  🌐 API última atualização: 1673359590000
  💾 Nossa última extração: 0
  🔄 Precisa atualizar: True
  📅 Primeira execução - nenhuma extração anterior



In [6]:
# Executar extração (idempotente - só extrai se necessário)
print("🚀 EXECUTANDO EXTRAÇÃO DE DADOS...")
print("(O sistema decide automaticamente se precisa extrair baseado na DATA_ATUALIZACAO da API)\n")

try:
    # Extração idempotente (force_refresh=False por padrão)
    saved_files = extractor.extract_all_data()
    
    print(f"\n📊 RESULTADO DA EXTRAÇÃO:")
    print(f"✅ Arquivos JSON salvos: {len(saved_files)}")
    
    if saved_files:
        # Mostrar alguns arquivos como exemplo
        print(f"\n📋 Arquivos salvos (primeiros 3):")
        for i, file_path in enumerate(saved_files[:3]):
            filename = Path(file_path).name
            size = Path(file_path).stat().st_size / 1024  # KB
            print(f"  {i+1}. {filename} ({size:.1f} KB)")
        
        if len(saved_files) > 3:
            print(f"  ... e mais {len(saved_files) - 3} arquivos")
        
        # Calcular tamanho total
        total_size = sum(Path(f).stat().st_size for f in saved_files) / 1024 / 1024
        print(f"\n💾 Tamanho total: {total_size:.2f} MB")
        print(f"📁 Localização: data/raw/")
    
    logger.info(f"Extração concluída: {len(saved_files)} arquivos")
    
except Exception as e:
    print(f"❌ Erro durante extração: {e}")
    logger.error(f"Erro durante extração: {e}")
    raise

🚀 EXECUTANDO EXTRAÇÃO DE DADOS...
(O sistema decide automaticamente se precisa extrair baseado na DATA_ATUALIZACAO da API)

2025-06-01 17:41:56 - extraction.extractors - INFO - Verificando freshness dos dados...
2025-06-01 17:41:56 - extraction.extractors - INFO - API última atualização: 1673359590000
2025-06-01 17:41:56 - extraction.extractors - INFO - Nossa última extração: 0
2025-06-01 17:41:56 - extraction.extractors - INFO - Precisa atualizar: True
2025-06-01 17:41:56 - extraction.extractors - INFO - 🔄 Dados da API foram atualizados, executando nova extração...
2025-06-01 17:41:57 - extraction.extractors - INFO - 📊 Total de registros: 23522
2025-06-01 17:41:57 - extraction.extractors - INFO - 📄 Total de páginas: 24
2025-06-01 17:41:57 - extraction.extractors - INFO - 📅 Data de atualização da API: 1673359590000
2025-06-01 17:41:57 - extraction.extractors - INFO - Extraindo página 1/24
2025-06-01 17:41:57 - extraction.extractors - INFO - Extraindo página 2/24
2025-06-01 17:41:57 - e

In [13]:
# Validação dos dados extraídos
if 'saved_files' in locals() and saved_files:
    print("🔍 VALIDANDO DADOS EXTRAÍDOS...")
    
    try:
        # Validação usando função do módulo validators
        validation_ok = validate_extraction_results(
            saved_files=saved_files,
            expected_records=0  # A função vai descobrir automaticamente
        )
        
        if validation_ok:
            print("✅ Validação dos dados extraídos concluída!")
        else:
            print("❌ Falha na validação dos dados extraídos")
            
        # Explorar estrutura de um arquivo como exemplo
        print("\n🔍 EXPLORANDO ESTRUTURA DOS DADOS:")
        first_file = saved_files[0]
        print(f"📁 Arquivo exemplo: {Path(first_file).name}")
        
        with open(first_file, 'r', encoding='utf-8') as f:
            sample_data = json.load(f)
        
        print(f"📋 Chaves principais: {list(sample_data.keys())}")
        
        if 'features' in sample_data:
            features = sample_data['features']
            print(f"📊 Features neste arquivo: {len(features)}")
            
            if features:
                first_feature = features[0]
                attrs = first_feature.get('attributes', {})
                print(f"🏷️ Campos disponíveis: {list(attrs.keys())[:8]}...")  # Primeiros 8
                
                # Mostrar campo de data de atualização
                if 'DATA_ATUALIZACAO' in attrs:
                    data_atualizacao = attrs['DATA_ATUALIZACAO']
                    print(f"📅 DATA_ATUALIZACAO exemplo: {data_atualizacao}")
        
        logger.info("Validação dos dados extraídos concluída com sucesso")
        
    except Exception as e:
        print(f"❌ Erro durante validação: {e}")
        logger.error(f"Erro durante validação: {e}")
else:
    print("⚠️ Nenhum arquivo para validar. A extração pode ter sido pulada (idempotência).")

🔍 VALIDANDO DADOS EXTRAÍDOS...
2025-06-01 17:43:23 - extraction.validators - INFO - Validação da extração concluída. 24 arquivos salvos
✅ Validação dos dados extraídos concluída!

🔍 EXPLORANDO ESTRUTURA DOS DADOS:
📁 Arquivo exemplo: aerogeradores_raw_20250601_174156_page_0004.json
📋 Chaves principais: ['displayFieldName', 'fieldAliases', 'geometryType', 'spatialReference', 'fields', 'features', 'exceededTransferLimit']
📊 Features neste arquivo: 1000
🏷️ Campos disponíveis: ['POT_MW', 'ALT_TOTAL', 'ALT_TORRE', 'DIAM_ROTOR', 'DATA_ATUALIZACAO', 'EOL_VERSAO_ID', 'NOME_EOL', 'DEN_AEG']...
📅 DATA_ATUALIZACAO exemplo: 1666625847000
2025-06-01 17:43:23 - __main__ - INFO - Validação dos dados extraídos concluída com sucesso


---
## 🔄 4. Pipeline de Transformação - JSON → Parquet

Transforma os arquivos JSON em Parquet otimizado com dados geográficos processados (lat/long extraídos).

In [14]:
# Instanciar processador e verificar necessidade de transformação
print("🔄 VERIFICANDO NECESSIDADE DE TRANSFORMAÇÃO...")
print("="*50)

processor = DataProcessor()
transform_check = processor.check_transformation_needed()

print(f"📊 STATUS DA TRANSFORMAÇÃO:")
print(f"  🔄 Precisa transformar: {transform_check['needs_transformation']}")


if 'json_count' in transform_check:
    print(f"  🗂️ Arquivos JSON encontrados: {transform_check['json_count']}")
if 'parquet_count' in transform_check:
    print(f"  📦 Arquivos Parquet existentes: {transform_check['parquet_count']}")

print("\n" + "="*50)

🔄 VERIFICANDO NECESSIDADE DE TRANSFORMAÇÃO...
2025-06-01 17:43:27 - transformation.processors - INFO - Descobertos 24 arquivos JSON para processar
📊 STATUS DA TRANSFORMAÇÃO:
  🔄 Precisa transformar: False
  🗂️ Arquivos JSON encontrados: 24
  📦 Arquivos Parquet existentes: 24



In [15]:
# Executar transformação (idempotente - só processa se necessário)
print("🔄 EXECUTANDO TRANSFORMAÇÃO JSON → PARQUET...")
print("(Sistema converte JSON para Parquet com coordenadas lat/long extraídas)\n")

try:
    # Transformação idempotente (force_refresh=False por padrão)
    processed_files = processor.process_all_files()
    
    print(f"\n📊 RESULTADO DA TRANSFORMAÇÃO:")
    print(f"✅ Arquivos Parquet criados: {len(processed_files)}")
    
    if processed_files:
        # Calcular estatísticas dos parquets
        total_size = 0
        total_records = 0
        
        print(f"\n📋 Arquivos Parquet (primeiros 3):")
        for i, file_path in enumerate(processed_files[:3]):
            filename = Path(file_path).name
            size = Path(file_path).stat().st_size / 1024  # KB
            total_size += Path(file_path).stat().st_size
            
            # Contar registros no parquet
            try:
                df = pd.read_parquet(file_path)
                records = len(df)
                total_records += records
                print(f"  {i+1}. {filename} ({size:.1f} KB, {records:,} registros)")
            except:
                print(f"  {i+1}. {filename} ({size:.1f} KB)")
        
        if len(processed_files) > 3:
            print(f"  ... e mais {len(processed_files) - 3} arquivos")
            
            # Contar registros restantes
            for file_path in processed_files[3:]:
                total_size += Path(file_path).stat().st_size
                try:
                    df = pd.read_parquet(file_path)
                    total_records += len(df)
                except:
                    pass
        
        print(f"\n💾 Tamanho total: {total_size / 1024 / 1024:.2f} MB")
        print(f"📊 Total de registros: {total_records:,}")
        print(f"📁 Localização: data/processed/")
        print(f"🗜️ Compressão: {((total_size / 1024 / 1024) / 17) * 100:.1f}% do tamanho original JSON")
    
    logger.info(f"Transformação concluída: {len(processed_files)} parquets")
    
except Exception as e:
    print(f"❌ Erro durante transformação: {e}")
    logger.error(f"Erro durante transformação: {e}")
    raise

🔄 EXECUTANDO TRANSFORMAÇÃO JSON → PARQUET...
(Sistema converte JSON para Parquet com coordenadas lat/long extraídas)

2025-06-01 17:43:30 - transformation.processors - INFO - Descobertos 24 arquivos JSON para processar
❌ Erro durante transformação: 'reason'
2025-06-01 17:43:30 - __main__ - ERROR - Erro durante transformação: 'reason'


KeyError: 'reason'

---
## 📊 5. Pipeline de Consolidação - Parquet → CSV Final

Consolida todos os Parquets em um único CSV otimizado para o Tableau, com validações inteligentes de conteúdo.

In [None]:
# Instanciar consolidador e verificar necessidade de consolidação
print("📊 VERIFICANDO NECESSIDADE DE CONSOLIDAÇÃO...")
print("="*50)

consolidator = DataConsolidator()
consolidation_check = consolidator.check_consolidation_needed()

print(f"📊 STATUS DA CONSOLIDAÇÃO:")
print(f"  🔄 Precisa consolidar: {consolidation_check['needs_consolidation']}")
print(f"  📝 Motivo: {consolidation_check['reason']}")

if 'csv_records' in consolidation_check:
    print(f"  📄 Registros no CSV atual: {consolidation_check['csv_records']:,}")
if 'expected_records' in consolidation_check:
    print(f"  📦 Registros esperados: {consolidation_check['expected_records']:,}")
if 'parquet_count' in consolidation_check:
    print(f"  📦 Arquivos Parquet encontrados: {consolidation_check['parquet_count']}")

print("\n" + "="*50)

In [None]:
# Executar consolidação (idempotente - só consolida se necessário)
print("📊 EXECUTANDO CONSOLIDAÇÃO PARQUET → CSV...")
print("(Sistema cria CSV final otimizado para Tableau com dados limpos e validados)\n")

try:
    # Consolidação idempotente (force_refresh=False por padrão)
    output_path = consolidator.consolidate_all()
    
    print(f"\n📊 RESULTADO DA CONSOLIDAÇÃO:")
    
    if output_path and Path(output_path).exists():
        filename = Path(output_path).name
        file_size = Path(output_path).stat().st_size / 1024 / 1024  # MB
        
        print(f"✅ CSV final criado: {filename}")
        print(f"💾 Tamanho do arquivo: {file_size:.2f} MB")
        print(f"📁 Localização completa: {output_path}")
        
        # Analisar conteúdo do CSV final
        print(f"\n🔍 ANÁLISE DO CSV FINAL:")
        df_final = pd.read_csv(output_path)
        summary = consolidator.get_data_summary(df_final)
        
        print(f"📊 Total de registros: {summary['total_records']:,}")
        print(f"📋 Total de colunas: {summary['total_columns']}")
        print(f"💾 Uso de memória: {summary['memory_usage_mb']:.2f} MB")
        
        # Coordenadas geográficas
        if 'coordinate_stats' in summary:
            coords = summary['coordinate_stats']
            print(f"\n🗺️ COORDENADAS GEOGRÁFICAS:")
            print(f"  📍 Latitude: {coords['lat_min']:.6f} → {coords['lat_max']:.6f}")
            print(f"  📍 Longitude: {coords['lon_min']:.6f} → {coords['lon_max']:.6f}")
            print(f"  🇧🇷 Cobertura: Todo o território brasileiro")
        
        # Estrutura das colunas
        print(f"\n📂 ESTRUTURA DOS DADOS (primeiras 10 colunas):")
        for i, col in enumerate(summary['columns'][:10], 1):
            print(f"  {i:2d}. {col}")
        if len(summary['columns']) > 10:
            print(f"  ... e mais {len(summary['columns']) - 10} colunas")
        
        print(f"\n🎯 CSV PRONTO PARA TABLEAU!")
        print(f"   • Coordenadas no início para mapas automáticos")
        print(f"   • Dados limpos e validados")
        print(f"   • Formato otimizado para análise")
        
    else:
        print(f"⚠️ Nenhum CSV foi criado (possivelmente devido à idempotência)")
    
    logger.info(f"Consolidação concluída: {filename if output_path else 'pulada'}")
    
except Exception as e:
    print(f"❌ Erro durante consolidação: {e}")
    logger.error(f"Erro durante consolidação: {e}")
    raise

---
## 🧪 6. Testes de Idempotência

Validação do comportamento idempotente do pipeline - execuções subsequentes devem pular etapas desnecessárias.

In [None]:
import time

# Teste completo de idempotência do pipeline
print("🧪 TESTE DE IDEMPOTÊNCIA COMPLETO")
print("="*60)
print("Executando pipeline novamente - deve pular todas as etapas se dados não mudaram\n")

# Reinstanciar todas as classes para teste limpo
test_extractor = SigelExtractor()
test_processor = DataProcessor()
test_consolidator = DataConsolidator()

print("🔍 1. TESTE DE EXTRAÇÃO:")
start_time = time.time()
test_files = test_extractor.extract_all_data()  # Deve pular se dados iguais
extraction_time = time.time() - start_time
print(f"   ⏱️ Tempo: {extraction_time:.2f}s")
print(f"   📁 Arquivos: {len(test_files)}")

print("\n🔄 2. TESTE DE TRANSFORMAÇÃO:")
start_time = time.time()
test_parquets = test_processor.process_all_files()  # Deve pular se parquets atualizados
transform_time = time.time() - start_time
print(f"   ⏱️ Tempo: {transform_time:.2f}s")
print(f"   📦 Parquets: {len(test_parquets)}")

print("\n📊 3. TESTE DE CONSOLIDAÇÃO:")
start_time = time.time()
test_csv = test_consolidator.consolidate_all()  # Deve pular se CSV atualizado
consolidation_time = time.time() - start_time
print(f"   ⏱️ Tempo: {consolidation_time:.2f}s")
print(f"   📄 CSV: {Path(test_csv).name if test_csv else 'Nenhum'}")

total_time = extraction_time + transform_time + consolidation_time
print(f"\n⏱️ TEMPO TOTAL: {total_time:.2f}s")
print(f"✅ IDEMPOTÊNCIA {'FUNCIONANDO' if total_time < 5 else 'PODE TER PROBLEMAS'}!")
if total_time < 5:
    print("   Pipeline executou rapidamente = dados não foram reprocessados desnecessariamente")
else:
    print("   Pipeline demorou = pode estar reprocessando dados desnecessariamente")

In [None]:
# Teste de force refresh - deve reprocessar tudo
print("\n🔄 TESTE DE FORCE REFRESH")
print("="*40)
print("⚠️ ATENÇÃO: Isso vai reprocessar todos os dados (pode demorar alguns minutos)")
print("Descomente as linhas abaixo se quiser testar force refresh:\n")

print("# Force refresh - descomente para executar:")
print("# force_files = test_extractor.extract_all_data(force_refresh=True)")
print("# force_parquets = test_processor.process_all_files(force_refresh=True)")
print("# force_csv = test_consolidator.consolidate_all(force_refresh=True)")
print("# print(f'Force refresh: {len(force_files)} JSONs, {len(force_parquets)} Parquets, 1 CSV')")

print("\n💡 O force refresh deve:")
print("   • Limpar dados antigos automaticamente")
print("   • Reprocessar todos os dados do zero")
print("   • Gerar novos timestamps em todos os arquivos")

---
## 🛠️ 7. Comandos de Manutenção Avançada

Ferramentas para limpeza, debug e manutenção do pipeline.

In [None]:
# Comandos de limpeza e manutenção
print("🛠️ COMANDOS DE MANUTENÇÃO DISPONÍVEIS")
print("="*50)

print("📊 STATUS E INFORMAÇÕES:")
print("• show_current_data_status() - Mostra status completo dos dados")
print("• extractor.check_data_freshness() - Verifica se API mudou")
print("• processor.check_transformation_needed() - Verifica necessidade de transformação")
print("• consolidator.check_consolidation_needed() - Verifica necessidade de consolidação")

print("\n🧹 LIMPEZA:")
print("• cleanup_all_data() - Remove TODOS os dados (reset completo)")
print("• extractor.cleanup_all_raw_data() - Remove apenas JSONs")
print("• processor.cleanup_all_processed_data() - Remove apenas Parquets")
print("• consolidator.cleanup_all_output_data() - Remove apenas CSVs")

print("\n🔄 FORCE REFRESH:")
print("• extractor.extract_all_data(force_refresh=True) - Força nova extração")
print("• processor.process_all_files(force_refresh=True) - Força nova transformação")
print("• consolidator.consolidate_all(force_refresh=True) - Força nova consolidação")

print("\n" + "="*50)
print("💡 DICAS:")
print("• Use cleanup_all_data() se quiser recomeçar do zero")
print("• Use force_refresh=True se suspeitar de dados corrompidos")
print("• O pipeline normal (sem parâmetros) é idempotente e eficiente")
print("="*50)

---
## 📈 8. Resumo Final e Próximos Passos

Status final do pipeline e orientações para usar os dados no Tableau.

In [None]:
# Resumo final completo do pipeline
import time

status = show_current_data_status()

print("📈 RESUMO FINAL DO PIPELINE")
print("="*60)

# Status atual dos dados
final_status = show_current_data_status()

print(f"\n🎯 PIPELINE STATUS:")
if final_status['raw_count'] > 0:
    print(f"✅ Extração: {final_status['raw_count']} arquivos JSON")
else:
    print(f"❌ Extração: Nenhum arquivo encontrado")

if final_status['processed_count'] > 0:
    print(f"✅ Transformação: {final_status['processed_count']} arquivos Parquet")
else:
    print(f"❌ Transformação: Nenhum arquivo encontrado")

if final_status['output_count'] > 0:
    print(f"✅ Consolidação: {final_status['output_count']} arquivo(s) CSV")
    
    # Encontrar CSV mais recente
    csv_files = list(Path("data/output").glob("aerogeradores_consolidado_*.csv"))
    if csv_files:
        latest_csv = max(csv_files, key=lambda f: f.stat().st_mtime)
        csv_size = latest_csv.stat().st_size / 1024 / 1024
        
        # Contar registros
        try:
            df = pd.read_csv(latest_csv)
            record_count = len(df)
            print(f"   📄 Arquivo: {latest_csv.name}")
            print(f"   📊 Registros: {record_count:,} aerogeradores")
            print(f"   💾 Tamanho: {csv_size:.2f} MB")
            print(f"   📁 Local: {latest_csv}")
        except:
            print(f"   📄 Arquivo: {latest_csv.name} ({csv_size:.2f} MB)")
else:
    print(f"❌ Consolidação: Nenhum arquivo encontrado")

print(f"\n💾 Espaço total usado: {final_status['total_size_mb']:.2f} MB")

# Verificar se pipeline está completo
pipeline_complete = (final_status['raw_count'] > 0 and 
                    final_status['processed_count'] > 0 and 
                    final_status['output_count'] > 0)

if pipeline_complete:
    print(f"\n🎉 PIPELINE COMPLETO E FUNCIONAL!")
    print(f"\n🔄 CARACTERÍSTICAS IMPLEMENTADAS:")
    print(f"   ✅ Idempotente - não reprocessa dados desnecessariamente")
    print(f"   ✅ Baseado em DATA_ATUALIZACAO da API")
    print(f"   ✅ Processamento paralelo com ThreadPool")
    print(f"   ✅ Retry automático com backoff exponencial")
    print(f"   ✅ Logs detalhados de todas as operações")
    print(f"   ✅ Validações geográficas automáticas")
    print(f"   ✅ Auto-limpeza de dados antigos")
    print(f"   ✅ Otimizado para Tableau (coordenadas primeiro)")
else:
    print(f"\n⚠️ PIPELINE INCOMPLETO")
    print(f"Execute as células anteriores para completar o pipeline.")

print(f"\n" + "="*60)

In [None]:
# Instruções para uso no Tableau
print("📊 PRÓXIMOS PASSOS - TABLEAU DASHBOARD")
print("="*50)

if final_status['output_count'] > 0:
    csv_files = list(Path("data/output").glob("aerogeradores_consolidado_*.csv"))
    if csv_files:
        latest_csv = max(csv_files, key=lambda f: f.stat().st_mtime)
        
        print(f"📁 ARQUIVO PARA TABLEAU:")
        print(f"   {latest_csv}")
        
        print(f"\n🔧 COMO USAR NO TABLEAU:")
        print(f"   1. Abra o Tableau Public/Desktop")
        print(f"   2. Conecte ao arquivo CSV acima")
        print(f"   3. Latitude/Longitude já estão nas primeiras colunas")
        print(f"   4. Tableau deve detectar automaticamente os campos geográficos")
        
        print(f"\n📈 VISUALIZAÇÕES SUGERIDAS:")
        print(f"   • Mapa de pontos dos aerogeradores (latitude/longitude)")
        print(f"   • Scatter plot: POT_MW vs ALT_TOTAL")
        print(f"   • Box plot: distribuição de potências por região")
        print(f"   • Série temporal: usando DATA_ATUALIZACAO")
        print(f"   • Filtros: NOME_EOL, UF, faixas de potência")
        
        print(f"\n📊 CAMPOS PRINCIPAIS:")
        print(f"   • latitude/longitude - Para mapas")
        print(f"   • POT_MW - Potência em MW")
        print(f"   • ALT_TOTAL/ALT_TORRE - Alturas")
        print(f"   • DIAM_ROTOR - Diâmetro do rotor")
        print(f"   • NOME_EOL - Nome do parque eólico")
        print(f"   • DATA_ATUALIZACAO - Data da última atualização")
        
        print(f"\n🎯 OBJETIVO DO CASE:")
        print(f"   • Criar 1 dashboard no Tableau Public")
        print(f"   • Máximo 5 slides no Google Slides com insights")
        print(f"   • Evidenciar insights com prints do dashboard")
else:
    print(f"❌ Nenhum CSV disponível para Tableau")
    print(f"Execute o pipeline completo primeiro.")

print(f"\n" + "="*50)
print(f"🎉 PIPELINE HOMETOWN CONCLUÍDO!")
print(f"💡 Use este notebook como referência para futuras execuções.")
print(f"🔄 O sistema é idempotente - pode executar quantas vezes quiser.")
print(f"="*50)